Documentation
¶
Index ¶
- Constants
- type Config
- type ConsumeFn
- type ConsumerConfig
- type Cronsumer
- type Header
- type Message
- type MessageBuilder
- func (mb *MessageBuilder) Build() Message
- func (mb *MessageBuilder) WithHeaders(headers []Header) *MessageBuilder
- func (mb *MessageBuilder) WithHighWatermark(highWaterMark int64) *MessageBuilder
- func (mb *MessageBuilder) WithKey(key []byte) *MessageBuilder
- func (mb *MessageBuilder) WithPartition(partition int) *MessageBuilder
- func (mb *MessageBuilder) WithTopic(topic string) *MessageBuilder
- func (mb *MessageBuilder) WithValue(value []byte) *MessageBuilder
- type Offset
- type ProducerConfig
- type SASLConfig
Constants ¶
View Source
const ( OffsetEarliest = "earliest" OffsetLatest = "latest" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { Brokers []string `yaml:"brokers"` Consumer ConsumerConfig `yaml:"consumer"` Producer ProducerConfig `yaml:"producer"` SASL SASLConfig `yaml:"sasl"` LogLevel logger.Level `yaml:"logLevel"` Logger logger.Interface `yaml:"-"` ClientID string `yaml:"clientId"` }
func (*Config) SetDefaults ¶
func (c *Config) SetDefaults()
type ConsumerConfig ¶
type ConsumerConfig struct { ClientID string `yaml:"clientId"` GroupID string `yaml:"groupId"` Topic string `yaml:"topic"` DeadLetterTopic string `yaml:"deadLetterTopic"` MinBytes int `yaml:"minBytes"` MaxBytes int `yaml:"maxBytes"` MaxRetry int `yaml:"maxRetry"` MaxWait time.Duration `yaml:"maxWait"` CommitInterval time.Duration `yaml:"commitInterval"` HeartbeatInterval time.Duration `yaml:"heartbeatInterval"` SessionTimeout time.Duration `yaml:"sessionTimeout"` RebalanceTimeout time.Duration `yaml:"rebalanceTimeout"` StartOffset Offset `yaml:"startOffset"` RetentionTime time.Duration `yaml:"retentionTime"` Concurrency int `yaml:"concurrency"` Duration time.Duration `yaml:"duration"` Cron string `yaml:"cron"` }
type Cronsumer ¶
type Cronsumer interface { // Start starts the kafka consumer KafkaCronsumer with a new goroutine so its asynchronous operation (non-blocking) Start() // Run runs the kafka consumer KafkaCronsumer with the caller goroutine so its synchronous operation (blocking) Run() // Stop stops the cron and kafka KafkaCronsumer consumer Stop() // WithLogger for injecting custom log implementation WithLogger(logger logger.Interface) // Produce produces the message to kafka KafkaCronsumer producer. Offset and Time fields will be ignored in the message. Produce(message Message) error // ProduceBatch produces the list of messages to kafka KafkaCronsumer producer. ProduceBatch(messages []Message) error // GetMetricCollectors for the purpose of making metric collectors available to other libraries GetMetricCollectors() []prometheus.Collector }
type MessageBuilder ¶ added in v0.6.2
type MessageBuilder struct {
// contains filtered or unexported fields
}
func NewMessageBuilder ¶ added in v0.6.2
func NewMessageBuilder() *MessageBuilder
func (*MessageBuilder) Build ¶ added in v0.6.2
func (mb *MessageBuilder) Build() Message
func (*MessageBuilder) WithHeaders ¶ added in v0.6.2
func (mb *MessageBuilder) WithHeaders(headers []Header) *MessageBuilder
func (*MessageBuilder) WithHighWatermark ¶ added in v0.6.2
func (mb *MessageBuilder) WithHighWatermark(highWaterMark int64) *MessageBuilder
func (*MessageBuilder) WithKey ¶ added in v0.6.2
func (mb *MessageBuilder) WithKey(key []byte) *MessageBuilder
func (*MessageBuilder) WithPartition ¶ added in v0.6.2
func (mb *MessageBuilder) WithPartition(partition int) *MessageBuilder
func (*MessageBuilder) WithTopic ¶ added in v0.6.2
func (mb *MessageBuilder) WithTopic(topic string) *MessageBuilder
func (*MessageBuilder) WithValue ¶ added in v0.6.2
func (mb *MessageBuilder) WithValue(value []byte) *MessageBuilder
type ProducerConfig ¶
type SASLConfig ¶
Click to show internal directories.
Click to hide internal directories.