kafka

package
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 3, 2023 License: MIT Imports: 4 Imported by: 2

Documentation

Index

Constants

View Source
const (
	OffsetEarliest = "earliest"
	OffsetLatest   = "latest"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Brokers  []string         `yaml:"brokers"`
	Consumer ConsumerConfig   `yaml:"consumer"`
	Producer ProducerConfig   `yaml:"producer"`
	SASL     SASLConfig       `yaml:"sasl"`
	LogLevel logger.Level     `yaml:"logLevel"`
	Logger   logger.Interface `yaml:"-"`
	ClientID string           `yaml:"clientId"`
}

func (*Config) SetDefaults

func (c *Config) SetDefaults()

func (*Config) Validate

func (c *Config) Validate()

type ConsumeFn

type ConsumeFn func(message Message) error

ConsumeFn function describes how to consume messages from specified topic

type ConsumerConfig

type ConsumerConfig struct {
	ClientID          string        `yaml:"clientId"`
	GroupID           string        `yaml:"groupId"`
	Topic             string        `yaml:"topic"`
	DeadLetterTopic   string        `yaml:"deadLetterTopic"`
	MinBytes          int           `yaml:"minBytes"`
	MaxBytes          int           `yaml:"maxBytes"`
	MaxRetry          int           `yaml:"maxRetry"`
	MaxWait           time.Duration `yaml:"maxWait"`
	CommitInterval    time.Duration `yaml:"commitInterval"`
	HeartbeatInterval time.Duration `yaml:"heartbeatInterval"`
	SessionTimeout    time.Duration `yaml:"sessionTimeout"`
	RebalanceTimeout  time.Duration `yaml:"rebalanceTimeout"`
	StartOffset       Offset        `yaml:"startOffset"`
	RetentionTime     time.Duration `yaml:"retentionTime"`
	Concurrency       int           `yaml:"concurrency"`
	Duration          time.Duration `yaml:"duration"`
	Cron              string        `yaml:"cron"`
}

type Cronsumer

type Cronsumer interface {
	// Start starts the kafka consumer KafkaCronsumer with a new goroutine so its asynchronous operation (non-blocking)
	Start()

	// Run runs the kafka consumer KafkaCronsumer with the caller goroutine so its synchronous operation (blocking)
	Run()

	// Stop stops the cron and kafka KafkaCronsumer consumer
	Stop()

	// WithLogger for injecting custom log implementation
	WithLogger(logger logger.Interface)

	// Produce produces the message to kafka KafkaCronsumer producer. Offset and Time fields will be ignored in the message.
	Produce(message Message) error

	// ProduceBatch produces the list of messages to kafka KafkaCronsumer producer.
	ProduceBatch(messages []Message) error
}
type Header struct {
	Key   string
	Value []byte
}

type Message

type Message struct {
	Topic         string
	Partition     int
	Offset        int64
	HighWaterMark int64
	Key           []byte
	Value         []byte
	Headers       []Header
	Time          time.Time
}

type MessageBuilder added in v0.6.2

type MessageBuilder struct {
	// contains filtered or unexported fields
}

func NewMessageBuilder added in v0.6.2

func NewMessageBuilder() *MessageBuilder

func (*MessageBuilder) Build added in v0.6.2

func (mb *MessageBuilder) Build() Message

func (*MessageBuilder) WithHeaders added in v0.6.2

func (mb *MessageBuilder) WithHeaders(headers []Header) *MessageBuilder

func (*MessageBuilder) WithHighWatermark added in v0.6.2

func (mb *MessageBuilder) WithHighWatermark(highWaterMark int64) *MessageBuilder

func (*MessageBuilder) WithKey added in v0.6.2

func (mb *MessageBuilder) WithKey(key []byte) *MessageBuilder

func (*MessageBuilder) WithPartition added in v0.6.2

func (mb *MessageBuilder) WithPartition(partition int) *MessageBuilder

func (*MessageBuilder) WithTopic added in v0.6.2

func (mb *MessageBuilder) WithTopic(topic string) *MessageBuilder

func (*MessageBuilder) WithValue added in v0.6.2

func (mb *MessageBuilder) WithValue(value []byte) *MessageBuilder

type Offset

type Offset string

func ToStringOffset added in v0.6.4

func ToStringOffset(offset int64) Offset

func (Offset) Value

func (o Offset) Value() int64

type ProducerConfig

type ProducerConfig struct {
	BatchSize    int           `yaml:"batchSize"`
	BatchTimeout time.Duration `yaml:"batchTimeout"`
}

type SASLConfig

type SASLConfig struct {
	Enabled            bool   `yaml:"enabled"`
	AuthType           string `yaml:"authType"` // plain or scram
	Username           string `yaml:"username"`
	Password           string `yaml:"password"`
	RootCAPath         string `yaml:"rootCAPath"`
	IntermediateCAPath string `yaml:"intermediateCAPath"`
	Rack               string `yaml:"rack"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳