Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var NewConsumerWrapper = func(configMap *kafka.ConfigMap) (ConsumerInterface, error) { return kafka.NewConsumer(configMap) }
Kafka Function Reference Variable To Facilitate Mocking In Unit Tests
Functions ¶
This section is empty.
Types ¶
type ConsumerInterface ¶
type ConsumerInterface interface { Subscribe(topic string, rebalanceCb kafka.RebalanceCb) error Poll(int) kafka.Event CommitMessage(*kafka.Message) ([]kafka.TopicPartition, error) Close() error OffsetsForTimes(times []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) StoreOffsets(offsets []kafka.TopicPartition) (storedOffsets []kafka.TopicPartition, err error) Commit() ([]kafka.TopicPartition, error) }
Confluent Client Doesn't Code To Interfaces Or Provide Mocks So We're Wrapping Our Usage Of The Consumer For Testing
func CreateConsumer ¶
func CreateConsumer(brokers string, groupId string, offset string, username string, password string) (ConsumerInterface, error)
Create A Kafka Consumer (Optional SASL Authentication)
Click to show internal directories.
Click to hide internal directories.