Documentation
¶
Index ¶
- func NewBroker(options ...broker.Option) (broker.Broker, error)
- func NewPublisher(opts ...broker.Option) (broker.Publisher, error)
- func NewSubscriber(opts ...broker.Option) (broker.Subscriber, error)
- func WithAddrs(addrs ...string) broker.Option
- func WithAllowPublishAutoTopicCreation(allow bool) broker.Option
- func WithAsync(async bool) broker.Option
- func WithAutoAck(autoAck bool) broker.Option
- func WithBatchBytes(bytes int64) broker.Option
- func WithBatchSize(size int) broker.Option
- func WithBatchTimeout(timeout time.Duration) broker.Option
- func WithCommitInterval(interval time.Duration) broker.Option
- func WithCrc32Balancer(consistent bool) broker.Option
- func WithDialerConfig(config *tls.Config) broker.Option
- func WithDialerTimeout(timeout time.Duration) broker.Option
- func WithHashBalancer(hasher hash.Hash32) broker.Option
- func WithHeartbeatInterval(interval time.Duration) broker.Option
- func WithLeastBytesBalancer() broker.Option
- func WithMaxAttempts(attempts int) broker.Option
- func WithMaxBytes(maxBytes int) broker.Option
- func WithMaxWait(maxWait time.Duration) broker.Option
- func WithMinBytes(minBytes int) broker.Option
- func WithMurmur2Balancer(consistent bool) broker.Option
- func WithPartitionWatchInterval(interval time.Duration) broker.Option
- func WithQueue(queue string) broker.Option
- func WithQueueCapacity(capacity int) broker.Option
- func WithReadLagInterval(interval time.Duration) broker.Option
- func WithReadTimeout(timeout time.Duration) broker.Option
- func WithRebalanceTimeout(timeout time.Duration) broker.Option
- func WithReferenceHashBalancer(hasher hash.Hash32) broker.Option
- func WithRetentionTime(retention time.Duration) broker.Option
- func WithRetries(retries int) broker.Option
- func WithRoundRobinBalancer() broker.Option
- func WithSASLMechanism(mechanism sasl.Mechanism) broker.Option
- func WithSessionTimeout(timeout time.Duration) broker.Option
- func WithStartOffset(offset int64) broker.Option
- func WithTLSConfig(config *tls.Config) broker.Option
- func WithTracings(opts ...tracing.Option) broker.Option
- func WithWatchPartitionChanges(watch bool) broker.Option
- func WithWriteTimeout(timeout time.Duration) broker.Option
- type BalancerName
- type MessageCarrier
- type Writer
- type WriterConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewSubscriber ¶
func NewSubscriber(opts ...broker.Option) (broker.Subscriber, error)
func WithCommitInterval ¶
提交offset的时间间隔
func WithCrc32Balancer ¶
WithCrc32Balancer CRC32负载均衡器
func WithHashBalancer ¶
WithHashBalancer Hash负载均衡器
func WithHeartbeatInterval ¶
心跳间隔时间
func WithLeastBytesBalancer ¶
WithLeastBytesBalancer LeastBytes负载均衡器
func WithMurmur2Balancer ¶
WithMurmur2Balancer Murmur2负载均衡器
func WithPartitionWatchInterval ¶
分区监控检查间隔
func WithReferenceHashBalancer ¶
WithReferenceHashBalancer ReferenceHash负载均衡器
func WithRoundRobinBalancer ¶
WithRoundRobinBalancer RoundRobin负载均衡器,默认均衡器。
func WithSASLMechanism ¶
WithSASLMechanism 设置SASL机制
Types ¶
type BalancerName ¶
type BalancerName string
const ( DefaultAddr = "127.0.0.1:9092" LeastBytesBalancer BalancerName = "LeastBytes" RoundRobinBalancer BalancerName = "RoundRobin" HashBalancer BalancerName = "Hash" ReferenceHashBalancer BalancerName = "ReferenceHash" Crc32Balancer BalancerName = "CRC32Balancer" Murmur2Balancer BalancerName = "Murmur2Balancer" )
type MessageCarrier ¶
type MessageCarrier struct {
// contains filtered or unexported fields
}
func NewMessageCarrier ¶
func NewMessageCarrier(msg *kafkaGo.Message) MessageCarrier
func (MessageCarrier) Get ¶
func (c MessageCarrier) Get(key string) string
func (MessageCarrier) Keys ¶
func (c MessageCarrier) Keys() []string
func (MessageCarrier) Set ¶
func (c MessageCarrier) Set(key, val string)
type Writer ¶
func (*Writer) CreateProducer ¶
func (w *Writer) CreateProducer(writerConfig WriterConfig, saslMechanism sasl.Mechanism, tlsConfig *tls.Config) *kafkaGo.Writer
CreateProducer 创建一个kafka-go Writer实例
type WriterConfig ¶
type WriterConfig struct { // Kafka集群的broker地址列表 Brokers []string // 用于在分区之间分配消息的均衡器 // // 默认使用轮询分配方式 Balancer kafkaGo.Balancer // 消息投递的最大重试次数 // // 默认最多重试10次 MaxAttempts int // 发送到分区前缓冲的消息数量限制 // // 默认的目标批次大小是100条消息 BatchSize int // 发送到分区前请求的最大字节数限制 // // 默认使用Kafka的默认值1048576字节 BatchBytes int64 // 未满批次的消息发送到Kafka的时间间隔 // // 默认至少每秒刷新一次 BatchTimeout time.Duration // Writer执行读操作的超时时间 // // 默认10秒 ReadTimeout time.Duration // Writer执行写操作的超时时间 // // 默认10秒 WriteTimeout time.Duration // 生产请求需要等待的分区副本确认数 // 默认值为-1,表示等待所有副本确认 // 大于0的值表示需要多少个副本确认才算成功 // // 当前kafka-go版本(v0.3)不支持设置为0 // 如果需要该功能,需要升级到v0.4版本 RequiredAcks kafkaGo.RequiredAcks // 设置为true时,WriteMessages方法将永不阻塞 // 这也意味着错误会被忽略,因为调用者无法接收返回值 // 仅在不关心消息是否成功写入Kafka时使用此选项 Async bool // 如果不为nil,用于报告Writer内部变化的日志记录器 Logger kafkaGo.Logger // 用于报告错误的日志记录器 // 如果为nil,Writer将使用Logger代替 ErrorLogger kafkaGo.Logger // 允许Writer在主题不存在时自动创建主题 AllowAutoTopicCreation bool }
Click to show internal directories.
Click to hide internal directories.