producer

package
v1.7.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2023 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var GenDefaultDeliveryHandlerFuncWithZapLogger = func(log *zap.Logger) func(event kafka.Event) {
	return func(e kafka.Event) {
		switch ev := e.(type) {
		case *kafka.Message:
			if ev.TopicPartition.Error != nil {
				log.Error(fmt.Sprintf("confluentKafkaProducer: Delivery failed: %v\n", ev.TopicPartition),
					zap.Any("tp", ev.TopicPartition))
				return
			}

			log.Debug(fmt.Sprintf("Successfully produced record to topic %s partition [%d] @ offset %v",
				*ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset),
				zap.Any("tp", ev.TopicPartition))

		case *kafka.Error:
			log.Error("confluentKafkaProducer: Received kafka.Error msg", zap.Error(ev))
		default:
			log.Error("confluentKafkaProducer: Received unknown error", zap.Any("ev", ev))
		}
	}
}

Functions

func New

func New(configMap *kafka.ConfigMap) confluent_kafka.Producer

New - "constructor" for kafka confluentKafkaProducer.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳