Documentation
¶
Index ¶
- func GetConsumerGroupName(deployment, jobName string, aresCluster string) string
- func NewKafkaConsumer(jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig) (consumer.Consumer, error)
- type KafkaConsumer
- func (c *KafkaConsumer) Close() error
- func (c *KafkaConsumer) Closed() <-chan struct{}
- func (c *KafkaConsumer) CommitUpTo(msg consumer.Message) error
- func (c *KafkaConsumer) Errors() <-chan error
- func (c *KafkaConsumer) Messages() <-chan consumer.Message
- func (c *KafkaConsumer) Name() string
- func (c *KafkaConsumer) Topics() []string
- type KafkaMessage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetConsumerGroupName ¶
GetConsumerGroupName will return the consumer group name to use or being used for given deployment and job name
func NewKafkaConsumer ¶
func NewKafkaConsumer(jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig) (consumer.Consumer, error)
NewKafkaConsumer creates kafka consumer by using https://github.com/confluentinc/confluent-kafka-go.
Types ¶
type KafkaConsumer ¶
type KafkaConsumer struct { *kafkaConfluent.Consumer kafkaConfluent.ConfigMap sync.Mutex TopicArray []string Logger *zap.Logger Scope tally.Scope ErrCh chan error MsgCh chan consumer.Message // WARNING: The following channels should not be closed by the lib users CloseAttempted bool CloseErr error CloseCh chan struct{} }
KafkaConsumer implements Consumer interface
func (*KafkaConsumer) Close ¶
func (c *KafkaConsumer) Close() error
func (*KafkaConsumer) Closed ¶
func (c *KafkaConsumer) Closed() <-chan struct{}
Closed returns a channel that unblocks when the consumer successfully shuts down.
func (*KafkaConsumer) CommitUpTo ¶
func (c *KafkaConsumer) CommitUpTo(msg consumer.Message) error
CommitUpTo marks this message and all previous messages in the same partition as processed. The last processed offset for each partition is periodically flushed to ZooKeeper; on startup, consumers begin processing after the last stored offset.
func (*KafkaConsumer) Errors ¶
func (c *KafkaConsumer) Errors() <-chan error
Errors returns a channel of errors for the topic. To prevent deadlocks, users must read from the error channel.
All errors returned from this channel can be safely cast to the consumer.Error interface, which allows structured access to the topic name and partition number.
func (*KafkaConsumer) Messages ¶
func (c *KafkaConsumer) Messages() <-chan consumer.Message
Messages returns a channel of messages for the topic.
If the consumer is not configured with nonzero buffer size, the Errors() channel must be read in conjunction with Messages() to prevent deadlocks.
func (*KafkaConsumer) Name ¶
func (c *KafkaConsumer) Name() string
Name returns the name of this consumer group.
func (*KafkaConsumer) Topics ¶
func (c *KafkaConsumer) Topics() []string
Topics returns the names of the topics being consumed.
type KafkaMessage ¶
type KafkaMessage struct { *kafkaConfluent.Message Consumer consumer.Consumer ClusterName string }
KafkaMessage implements Message interface
func (*KafkaMessage) Ack ¶
func (m *KafkaMessage) Ack()
func (*KafkaMessage) Cluster ¶
func (m *KafkaMessage) Cluster() string
func (*KafkaMessage) Key ¶
func (m *KafkaMessage) Key() []byte
func (*KafkaMessage) Nack ¶
func (m *KafkaMessage) Nack()
func (*KafkaMessage) Offset ¶
func (m *KafkaMessage) Offset() int64
func (*KafkaMessage) Partition ¶
func (m *KafkaMessage) Partition() int32
func (*KafkaMessage) Topic ¶
func (m *KafkaMessage) Topic() string
func (*KafkaMessage) Value ¶
func (m *KafkaMessage) Value() []byte