Documentation
¶
Index ¶
- type BaseProducer
- type BaseProducerOption
- type Consumer
- type ConsumerErrorHandler
- type ConsumerHandler
- type ConsumerOption
- func ConsumerConcurrency[T any](concurrency uint) ConsumerOption[T]
- func ConsumerDynamicTopicsDiscovery[T any]() ConsumerOption[T]
- func ConsumerDynamicTopicsDiscoveryInterval[T any](interval time.Duration) ConsumerOption[T]
- func ConsumerInitialPartitionsCount[T any](count uint) ConsumerOption[T]
- func ConsumerTopicNamesExactMatch[T any]() ConsumerOption[T]
- func ConsumerTopicNamesRegexMatch[T any]() ConsumerOption[T]
- func ConsumerWithErrorLogger[T any](logger kafka.Logger) ConsumerOption[T]
- func ConsumerWithLogger[T any](logger kafka.Logger) ConsumerOption[T]
- func ConsumerWithMaxBlockingTasks[T any](count uint) ConsumerOption[T]
- func ConsumerWithOnFailCommitHandler[T any](handler ConsumerErrorHandler[T]) ConsumerOption[T]
- func ConsumerWithReadMessageHandler[T any](handler ConsumerErrorHandler[T]) ConsumerOption[T]
- func ConsumerWithReaderConfig[T any](config kafka.ReaderConfig) ConsumerOption[T]
- func ConsumerWithTopicsListUpdatedHandler[T any](handler ConsumerTopicsListUpdatedHandler[T]) ConsumerOption[T]
- func ConsumerWithWrongMessageHandler[T any](handler ConsumerErrorHandler[T]) ConsumerOption[T]
- type ConsumerTopicsListUpdatedHandler
- type ErrorHandler
- type LoggerContainer
- type Producer
- type Stream
- type StreamOption
- func StreamWithConsumerOptions[T any](opts ...ConsumerOption[T]) StreamOption[T]
- func StreamWithErrorLogger[T any](logger kafka.Logger) StreamOption[T]
- func StreamWithLogger[T any](logger kafka.Logger) StreamOption[T]
- func StreamWithParallelJobs[T any](parallelJobs uint) StreamOption[T]
- func StreamWithProducerOptions[T any](opts ...BaseProducerOption) StreamOption[T]
- type StreamRoutingRule
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseProducer ¶ added in v0.0.10
type BaseProducer struct { LoggerContainer // contains filtered or unexported fields }
BaseProducer is a wrapper around kafka.Writer
func InitBaseProducer ¶ added in v0.0.10
func InitBaseProducer( brokers []string, opts ...BaseProducerOption, ) (producer *BaseProducer, close func() error)
InitBaseProducer creates a new producer instance
type BaseProducerOption ¶ added in v0.0.10
type BaseProducerOption func(kafka *BaseProducer) error
BaseProducerOption is a function that sets some option on the producer
func BaseProducerInitialPartitionsCount ¶ added in v0.0.12
func BaseProducerInitialPartitionsCount(count uint) BaseProducerOption
BaseProducerInitialPartitionsCount sets initial partitions count
func BaseProducerWithErrorLogger ¶ added in v0.0.13
func BaseProducerWithErrorLogger(logger kafka.Logger) BaseProducerOption
BaseProducerWithErrorLogger sets error logger
func BaseProducerWithLogger ¶ added in v0.0.13
func BaseProducerWithLogger(logger kafka.Logger) BaseProducerOption
BaseProducerWithLogger sets logger
func BaseProducerWithWriterConfig ¶ added in v0.0.12
func BaseProducerWithWriterConfig(writer *kafka.Writer) BaseProducerOption
BaseProducerWithWriterConfig sets writer config
type Consumer ¶
type Consumer[T any] struct { LoggerContainer // contains filtered or unexported fields }
Consumer is a wrapper around kafka.Reader
func InitConsumer ¶
func InitConsumer[T any]( brokers []string, topicsList []string, groupId string, opts ...ConsumerOption[T], ) (consumer *Consumer[T], close func() error)
InitConsumer creates a new consumer instance
type ConsumerErrorHandler ¶
type ConsumerHandler ¶
ConsumerHandler is a function that handles messages
type ConsumerOption ¶
ConsumerOption is a function that sets some option
func ConsumerConcurrency ¶
func ConsumerConcurrency[T any](concurrency uint) ConsumerOption[T]
ConsumerConcurrency sets parallel tasks count
func ConsumerDynamicTopicsDiscovery ¶ added in v0.0.8
func ConsumerDynamicTopicsDiscovery[T any]() ConsumerOption[T]
ConsumerDynamicTopicsDiscovery enable dynamic topics discovery
func ConsumerDynamicTopicsDiscoveryInterval ¶ added in v0.0.9
func ConsumerDynamicTopicsDiscoveryInterval[T any](interval time.Duration) ConsumerOption[T]
ConsumerDynamicTopicsDiscoveryInterval sets dynamic topics discovery interval
func ConsumerInitialPartitionsCount ¶
func ConsumerInitialPartitionsCount[T any](count uint) ConsumerOption[T]
ConsumerInitialPartitionsCount sets initial partitions count
func ConsumerTopicNamesExactMatch ¶ added in v0.0.10
func ConsumerTopicNamesExactMatch[T any]() ConsumerOption[T]
ConsumerTopicNamesExactMatch enable exact match for topic names
func ConsumerTopicNamesRegexMatch ¶ added in v0.0.10
func ConsumerTopicNamesRegexMatch[T any]() ConsumerOption[T]
ConsumerTopicNamesRegexMatch enable regex match for topic names
func ConsumerWithErrorLogger ¶ added in v0.0.13
func ConsumerWithErrorLogger[T any](logger kafka.Logger) ConsumerOption[T]
ConsumerWithErrorLogger sets error logger
func ConsumerWithLogger ¶ added in v0.0.13
func ConsumerWithLogger[T any](logger kafka.Logger) ConsumerOption[T]
ConsumerWithLogger sets logger
func ConsumerWithMaxBlockingTasks ¶ added in v0.0.8
func ConsumerWithMaxBlockingTasks[T any](count uint) ConsumerOption[T]
ConsumerWithMaxBlockingTasks sets max blocking tasks
func ConsumerWithOnFailCommitHandler ¶ added in v0.0.8
func ConsumerWithOnFailCommitHandler[T any](handler ConsumerErrorHandler[T]) ConsumerOption[T]
ConsumerWithOnFailCommitHandler sets handler for commit error
func ConsumerWithReadMessageHandler ¶ added in v0.0.8
func ConsumerWithReadMessageHandler[T any](handler ConsumerErrorHandler[T]) ConsumerOption[T]
ConsumerWithReadMessageHandler sets handler for read message
func ConsumerWithReaderConfig ¶ added in v0.0.8
func ConsumerWithReaderConfig[T any](config kafka.ReaderConfig) ConsumerOption[T]
ConsumerWithReaderConfig sets reader config
func ConsumerWithTopicsListUpdatedHandler ¶ added in v0.0.8
func ConsumerWithTopicsListUpdatedHandler[T any](handler ConsumerTopicsListUpdatedHandler[T]) ConsumerOption[T]
ConsumerWithTopicsListUpdatedHandler sets handler for topics list update
func ConsumerWithWrongMessageHandler ¶ added in v0.0.8
func ConsumerWithWrongMessageHandler[T any](handler ConsumerErrorHandler[T]) ConsumerOption[T]
ConsumerWithWrongMessageHandler sets handler for wrong message
type ConsumerTopicsListUpdatedHandler ¶ added in v0.0.8
type ErrorHandler ¶
ErrorHandler is a function that handles errors
type LoggerContainer ¶ added in v0.0.13
type LoggerContainer struct {
// contains filtered or unexported fields
}
type Producer ¶
type Producer[T any] struct { *BaseProducer }
Producer is a wrapper around BaseProducer
func InitProducer ¶
func InitProducer[T any]( brokers []string, opts ...BaseProducerOption, ) (producer *Producer[T], close func() error)
InitProducer initializes a new Producer instance
type Stream ¶ added in v0.0.10
type Stream[T any] struct { LoggerContainer // contains filtered or unexported fields }
Stream is a wrapper around Consumer and Producer
func InitStream ¶ added in v0.0.10
func InitStream[T any]( brokers []string, topicsList []string, groupId string, opts ...StreamOption[T], ) (*Stream[T], func() error)
InitStream initializes a new Stream instance
type StreamOption ¶ added in v0.0.10
StreamOption is a function that modifies a Stream instance
func StreamWithConsumerOptions ¶ added in v0.0.10
func StreamWithConsumerOptions[T any](opts ...ConsumerOption[T]) StreamOption[T]
StreamWithConsumerOptions sets consumer options
func StreamWithErrorLogger ¶ added in v0.0.13
func StreamWithErrorLogger[T any](logger kafka.Logger) StreamOption[T]
StreamWithErrorLogger sets error logger
func StreamWithLogger ¶ added in v0.0.13
func StreamWithLogger[T any](logger kafka.Logger) StreamOption[T]
StreamWithLogger sets logger
func StreamWithParallelJobs ¶ added in v0.0.15
func StreamWithParallelJobs[T any](parallelJobs uint) StreamOption[T]
StreamWithParallelJobs sets parallel jobs count for stream consumer
func StreamWithProducerOptions ¶ added in v0.0.10
func StreamWithProducerOptions[T any](opts ...BaseProducerOption) StreamOption[T]
StreamWithProducerOptions sets producer options
type StreamRoutingRule ¶ added in v0.0.10
type StreamRoutingRule[T any] func(message *T, kafkaMessage *kafka.Message) (newMessage *kafka.Message, err error)
StreamRoutingRule is a function that modifies a message