Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( DefaultOptions = Options{} DefaultMetricNamespace = "metrics" DefaultMetricSubsystem = "gkc" DefaultChannelSize = 2000 )
Functions ¶
func DefaultConfluentConfig ¶
func DefaultConfluentConfig(config *ConsumerConfig) *kafka.ConfigMap
Types ¶
type Consumer ¶
type Consumer interface { // Name returns the name of this consumer group. Name() string // Topics returns the names of the topics being consumed. Topics() []string // Start starts the consumer Start() error // Stop stops the consumer Stop() error // Closed returns a channel which will be closed after this consumer is completely shutdown Closed() <-chan struct{} // Messages return the message channel for this consumer // Messages() <-chan Message Messages() <-chan *Message DisableLog() }
func NewConsumer ¶
func NewConsumer(config *ConsumerConfig) (Consumer, error)
type ConsumerConfig ¶
type ConsumerConfig struct { // GroupName identifies your consumer group. Unless your application creates // multiple consumer groups (in which case it's suggested to have application name as // prefix of the group name), this should match your application name. GroupName string // Topic is the name of topic to consume from. Topics []string // Broker is the list of brokers in the kafka cluster to consume from. Broker string // Defines the logic after processing the kafka message MessageHook Hook // Defines the logic after processing the failed kafka message from DLQ ErrorHook Hook // Enable prometheus metrics ExposeMetrics bool // Prometheus address to export metrics on Address string }
type Counter ¶
func NewCounter ¶
type Hook ¶
func NewHookFunc ¶
Click to show internal directories.
Click to hide internal directories.