Documentation
¶
Index ¶
- Variables
- func DeserializeKafkaID(messageID []byte) int64
- func NewKafkaClientInstance(address string) *kafkaClient
- func NewKafkaClientInstanceWithConfig(config *paramtable.KafkaConfig) *kafkaClient
- func NewKafkaClientInstanceWithConfigMap(config kafka.ConfigMap, extraConsumerConfig kafka.ConfigMap, ...) *kafkaClient
- func SerializeKafkaID(messageID int64) []byte
- type Consumer
- func (kc *Consumer) Ack(message mqwrapper.Message)
- func (kc *Consumer) Chan() <-chan mqwrapper.Message
- func (kc *Consumer) CheckTopicValid(topic string) error
- func (kc *Consumer) Close()
- func (kc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error)
- func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error
- func (kc *Consumer) Subscription() string
Constants ¶
This section is empty.
Variables ¶
View Source
var Producer *kafka.Producer
Functions ¶
func DeserializeKafkaID ¶
func NewKafkaClientInstance ¶
func NewKafkaClientInstance(address string) *kafkaClient
func NewKafkaClientInstanceWithConfig ¶
func NewKafkaClientInstanceWithConfig(config *paramtable.KafkaConfig) *kafkaClient
func SerializeKafkaID ¶
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func (*Consumer) Chan ¶
Chan provides a channel to read consumed message. confluent-kafka-go recommend us to use function-based consumer, channel-based consumer API had already deprecated, see more details https://github.com/confluentinc/confluent-kafka-go.
func (*Consumer) CheckTopicValid ¶
func (*Consumer) Subscription ¶
Click to show internal directories.
Click to hide internal directories.