kq

package
v0.0.0-...-4fb1835 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2023 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MustNewQueue

func MustNewQueue(c KafkaConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue

func NewQueue

func NewQueue(c KafkaConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error)

Types

type ConsumeHandle

type ConsumeHandle func(key, value string) error

type ConsumeHandler

type ConsumeHandler interface {
	Consume(key, value string) error
}

func WithHandle

func WithHandle(handle ConsumeHandle) ConsumeHandler

type KafkaConf

type KafkaConf struct {
	service.ServiceConf
	Brokers     []string
	Group       string
	Topic       string
	Offset      string `json:",options=first|last,default=last"`
	Conns       int    `json:",default=1"`
	Consumers   int    `json:",default=8"`
	Processors  int    `json:",default=8"`
	MinBytes    int    `json:",default=10240"`    // 10K
	MaxBytes    int    `json:",default=10485760"` // 10M
	Username    string `json:",optional"`
	Password    string `json:",optional"`
	ForceCommit bool   `json:",default=true"`
	// Maximum amount of time to wait for new data to come when fetching batches
	MaxWait int `json:",default=10"`
	// ReadBatchTimeout amount of time to wait to fetch message from kafka messages batch.
	ReadBatchTimeout int `json:",default=10"`
	// Limit of how many attempts will be made before delivering the error.
	MaxAttempts int `json:",default=3"`
}

type PushOption

type PushOption func(options *chunkOptions)

func WithChunkSize

func WithChunkSize(chunkSize int) PushOption

func WithFlushInterval

func WithFlushInterval(interval time.Duration) PushOption

type Pusher

type Pusher struct {
	// contains filtered or unexported fields
}

func NewPusher

func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher

func (*Pusher) Close

func (p *Pusher) Close() error

func (*Pusher) Name

func (p *Pusher) Name() string

func (*Pusher) Push

func (p *Pusher) Push(v string) error

type QueueOption

type QueueOption func(options *queueOptions)

func WithCommitInterval

func WithCommitInterval(interval time.Duration) QueueOption

func WithMaxWait

func WithMaxWait(wait time.Duration) QueueOption

func WithMetrics

func WithMetrics(metrics *stat.Metrics) QueueOption

func WithQueueCapacity

func WithQueueCapacity(queueCapacity int) QueueOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳