Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KafkaConfig ¶
type KafkaConfig struct { Brokers []string `yaml:"brokers"` Topics []string `yaml:"topics"` GroupID string `yaml:"groupID"` MaxRetries int `yaml:"max_retries"` }
func LoadKafkaConfig ¶
func LoadKafkaConfig(brokers []string, topics []string, groupID string, maxRetries int) *KafkaConfig
type KafkaConsumer ¶
type KafkaConsumer struct {
// contains filtered or unexported fields
}
func NewKafkaConsumer ¶
func NewKafkaConsumer(cfg *KafkaConfig, handlers map[string]MessageHandler, ctx context.Context) (*KafkaConsumer, error)
NewKafkaConsumer initializes a Kafka consumer.
func (*KafkaConsumer) Close ¶
func (kc *KafkaConsumer) Close() error
func (*KafkaConsumer) Start ¶
func (kc *KafkaConsumer) Start() error
Start begins consuming messages and dispatches them to appropriate handlers.
type KafkaProducer ¶
type KafkaProducer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func NewProducer(kafkaConfig KafkaConfig, ctx context.Context) (*KafkaProducer, error)
func (*KafkaProducer) Close ¶
func (producer *KafkaProducer) Close() error
func (*KafkaProducer) ProduceMessages ¶
func (producer *KafkaProducer) ProduceMessages(messageValue string, topic string) error
type MessageHandler ¶
type MessageHandler func(*sarama.ConsumerMessage) error
MessageHandler defines the function signature for message handlers.
Click to show internal directories.
Click to hide internal directories.