Versions in this module Expand all Collapse all v1 v1.3.4 May 13, 2020 v1.3.3 May 8, 2020 Changes in this version + var EveryPartitionLastMessage sync.Map + func CloseConsumer() error + func CommitOffsetForAllPartition(onCommit func(message kafka.Message)) error + func Consume() (kafka.Message, error) + func ConsumeByCallback(consume func(kafka.Message, error) bool) + func NewConsumer() + type PC struct + func (pc *PC) BlockingQueueSize() int + func (pc *PC) Cancel() bool + func (pc *PC) Init(capacity int, batchSize int, batchTimeOut time.Duration) + func (pc *PC) Produce(object interface{}) + func (pc *PC) Subscribe(mapTo func(interface{}) kafka.Message, consume func([]kafka.Message)) + type Producer struct + func Init(topic string) *Producer + func LoadProducerByTopic(topic string) *Producer + func (p *Producer) CloseProducer() error + func (p *Producer) Produce(msg []byte) error + func (p *Producer) ProduceMsgs(msgs []kafka.Message) error + func (p *Producer) ProduceWithKey(key []byte, value []byte) error