Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Conf ¶
type Conf struct { // kafka broker addresses Brokers []string `json:"brokers" yaml:"brokers"` // topic name Topic string `json:"topic" yaml:"topic"` // consumer group name Group string `json:"group" yaml:"group"` // whether to create topic if topic is missing, default false AutoCreateTopic *bool `json:"auto_create_topic" yaml:"auto_create_topic"` // the count of the topics to create, default 4 Partitions int `json:"topic_partitions" yaml:"topic_partitions"` // the replication count of each topic partition, default 3 Replications int `json:"replications" yaml:"replications"` // the count of workers that consumes synchronously, default is the count of topic partition Processors int `json:"processors" yaml:"processors"` // default 10K MinBytes int `json:"min_bytes" yaml:"min_bytes"` // default 10M MaxBytes int `json:"max_bytes" yaml:"max_bytes"` // certificate file path for connecting to kafka CaFile string `json:"ca_file" yaml:"ca_file"` // username for connecting to kafka Username string `json:"username" yaml:"username"` // password for connecting to kafka Password string `json:"password" yaml:"password"` }
func LoadConfig ¶
LoadConfig loads Conf from specified file path
func MustLoadConfig ¶
MustLoadConfig loads Conf from specified file path,panics on error
type ConsumeHandler ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func MustNewConsumer ¶
func MustNewConsumer(cfg *Conf, handler ConsumeHandler, opts ...ConsumerOption) *Consumer
MustNewConsumer returns a consumer, if it fails, panic
func NewConsumer ¶
func NewConsumer(cfg *Conf, handler ConsumeHandler, opts ...ConsumerOption) (*Consumer, error)
NewConsumer returns a consumer and error
func (*Consumer) LoopConsume ¶
func (c *Consumer) LoopConsume()
LoopConsume blocks and consumes msgs in loop with multi goroutine
type ConsumerOption ¶
type ConsumerOption func(consumer *Consumer)
func WithConsumerContext ¶
func WithConsumerContext(ctx context.Context) ConsumerOption
func WithConsumerListener ¶
func WithConsumerListener(listener plugins.ConsumeListener) ConsumerOption
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func MustNewProducer ¶
func MustNewProducer(cfg *Conf, opts ...ProducerOption) *Producer
MustNewProducer returns a producer or panic if fails
func NewProducer ¶
func NewProducer(cfg *Conf, opts ...ProducerOption) (*Producer, error)
NewProducer returns a producer and an error
type ProducerOption ¶
type ProducerOption func(producer *Producer)
func WithIdCreator ¶
func WithIdCreator(creator plugins.IdCreator) ProducerOption
func WithProducerContext ¶
func WithProducerContext(ctx context.Context) ProducerOption
func WithProducerListener ¶
func WithProducerListener(listener plugins.ProducerListener) ProducerOption
Click to show internal directories.
Click to hide internal directories.