Documentation
¶
Index ¶
- Variables
- func CloseDefaultProducer()
- func CloseProducer(name string) (err error)
- func LoadCfgs() (r map[string]*Cfg, err error)
- func Publish(name string, body string, key string, partition int32) (err error)
- func SetFailFeedback(name string, fn func(*kafka.Message, string)) (err error)
- func SetSucFeedback(name string, fn func(*kafka.Message, string)) (err error)
- type Cfg
- type KafkaProducer
- type Partitioner
- type PartitionerConstructor
Constants ¶
This section is empty.
Variables ¶
View Source
var ( DefaultSucFeedbackFn = func(suc *kafka.Message, body string) { seelog.Infof("[KAFKA PRODUCER] publish success: %s, message: %s", suc.TopicPartition.String(), body) } DefaultFailFeedbackFn = func(fail *kafka.Message, body string) { seelog.Errorf("[KAFKA PRODUCER] publish fail: %s, message: %s", fail.TopicPartition.String(), body) } )
Functions ¶
func CloseDefaultProducer ¶
func CloseDefaultProducer()
func CloseProducer ¶
func SetFailFeedback ¶
Types ¶
type Cfg ¶
type Cfg struct { Addrs []string `toml:"addrs"` Acks int `toml:"acks"` // 等待服务器完成到如何进度在响应 Topic string `toml:"topic"` // 默认topic.当不指定topic时候使用该值 Partitioner string `toml:"partitioner"` // 指定分区选择器 ReturnSuccesses bool `toml:"return_successes"` // 是否等待成功的响应,仅RequireAcks设置不是NoReponse才有效 ReturnErrors bool `toml:"return_errors"` // 是否等待失败的响应,仅RequireAcks设置不是NoReponse才有效 ReturnFeedbackNum int `toml:"return_feedback_num"` // 等待响应的并发数 }
type KafkaProducer ¶
type KafkaProducer struct {
// contains filtered or unexported fields
}
func DefaultProducer ¶ added in v0.2.8
func DefaultProducer() (r *KafkaProducer)
func NewKafkaProducer ¶
func NewKafkaProducer(cfg *Cfg) (r *KafkaProducer, err error)
func Producer ¶
func Producer(name string) (r *KafkaProducer)
func SafeProducer ¶
func SafeProducer(name string) (r *KafkaProducer, err error)
func (*KafkaProducer) Close ¶
func (this *KafkaProducer) Close()
func (*KafkaProducer) Publish ¶
func (this *KafkaProducer) Publish(body string, key string, partition int32)
func (*KafkaProducer) SetFailFeedback ¶
func (this *KafkaProducer) SetFailFeedback(fn func(*kafka.Message, string))
func (*KafkaProducer) SetSucFeedback ¶
func (this *KafkaProducer) SetSucFeedback(fn func(*kafka.Message, string))
type Partitioner ¶ added in v0.2.0
type Partitioner interface { // Partition takes a message and partition count and chooses a partition Partition(message *kafka.Message, numPartitions int32) (r int32, err error) }
func NewHashPartitioner ¶ added in v0.2.0
func NewHashPartitioner() Partitioner
func NewManualPartitioner ¶ added in v0.2.0
func NewManualPartitioner() Partitioner
func NewRandomPartitioner ¶ added in v0.2.0
func NewRandomPartitioner() Partitioner
func NewRoundRobinPartitioner ¶ added in v0.2.0
func NewRoundRobinPartitioner() Partitioner
type PartitionerConstructor ¶ added in v0.2.0
type PartitionerConstructor func() Partitioner
Click to show internal directories.
Click to hide internal directories.