Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AsCloudEvent ¶ added in v0.3.0
func AsCloudEvent(message kafka.Message) (cloudevents.Event, error)
AsCloudEvent Helper function to unmarshal Kafka Message into a CloudEvent
func DumpMessage ¶
DumpMessage simple handler function that can be used as HandleMessageFunc and simply dumps information about the received Kafka Message and the payload container therein
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client represents a high level Kafka Consumer Client
func NewClientFromEnv ¶
NewClientFromEnv delegated to NewClient amd returns a properly configured and ready-to-use Client that invoked the callback function for every received messages using the default KafkaConsumerTopic Spec: See https://github.com/segmentio/kafka-go#reader-
func (*Client) Poll ¶ added in v0.3.0
func (c *Client) Poll(ctx context.Context, rc kafka.ReaderConfig, msgHandler HandleMessageFunc) error
Poll uses kafka-go Reader which automatically handles reconnections and offset management, and exposes an API that supports asynchronous cancellations and timeouts using Go contexts. See https://github.com/segmentio/kafka-go#reader- and this nice tutorial https://www.sohamkamani.com/golang/working-with-kafka/ doneChan chan<- struct{}
func (*Client) WaitForClose ¶ added in v0.3.0
WaitForClose blocks until the Consumer WaitGroup counter is zero, or timeout is reached
type ErrorLoggerWrapper ¶
type ErrorLoggerWrapper struct {
// contains filtered or unexported fields
}
func (ErrorLoggerWrapper) Printf ¶
func (l ErrorLoggerWrapper) Printf(format string, v ...interface{})
type HandleMessageFunc ¶
HandleMessageFunc consumer will pass received messages to a function that matches this type
type LoggerWrapper ¶
type LoggerWrapper struct {
// contains filtered or unexported fields
}
LoggerWrapper wraps zerolog logger so we can used it as logger in kafka-go ReaderConfig Example:
r := kafka.NewReader(kafka.ReaderConfig{ Logger: LoggerWrapper{delegate: k.logger}, })
func (LoggerWrapper) Printf ¶
func (l LoggerWrapper) Printf(format string, v ...interface{})
type MessageReader ¶
type MessageReader interface { ReadMessage(ctx context.Context) (kafka.Message, error) Close() error }
MessageReader interface that makes it easy to mock the real kafka.Reader in Poll() for testing purposes
type Options ¶
type Options struct { BootstrapServers string `required:"false" default:"localhost:9092" desc:"Kafka Bootstrap server(s)" split_words:"true"` // ProducerClientID string `required:"false" default:"kafkaClient" desc:"Client Id for Message Producer" split_words:"true"` ConsumerAPIKey string `required:"false" default:"" desc:"Kafka API Key Key for consumer (user)" split_words:"true"` ConsumerAPISecret string `required:"false" default:"" desc:"Kafka API Secret for consumer (password)" split_words:"true"` ConsumerGroupID string `required:"false" default:"default" desc:"Used as default id for KafkaConsumerGroups" split_words:"true"` ConsumerMaxReceive int32 `required:"false" default:"-1" desc:"Max num of received messages, default -1 (unlimited), useful for dev" split_words:"true"` ConsumerStartLast bool `required:"false" default:"false" desc:"Whether to start consuming at the last offset (default: first)" split_words:"true"` Debug bool `default:"false" desc:"Debug mode, registers logger for kafka packages" split_words:"true"` }
Options Kafka Context params populated by envconfig in NewClientFromEnv...()
func NewOptionsFromEnv ¶
NewOptionsFromEnv uses environment configuration with default prefix "kafka" to init Options
func NewOptionsFromEnvWithPrefix ¶
NewOptionsFromEnvWithPrefix same as NewOptionsFromEnv but allows custom prefix
func (Options) StartOffset ¶
StartOffset provides the reader options depending on ConsumerStartLast (true == first, else last) LastOffset int64 = -1 // The most recent offset available for a partition. FirstOffset int64 = -2 // The least recent offset available for a partition.