kafka

package
v0.0.0-...-6be2836 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2025 License: MIT Imports: 17 Imported by: 0

README

Publisher

目前每个主题都会在内部对应一个发布者,但是一个Publisher中的所有发布者共用一份配置。后续考虑在Publish时,添加自定义配置。

Subscriber

目前每个主题都会在内部对应一个订阅者,但是一个Subscriber中的所有订阅者共用一份配置。后续考虑在Subscribe时,添加自定义配置。

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBroker

func NewBroker(options ...broker.Option) (broker.Broker, error)

func NewPublisher

func NewPublisher(opts ...broker.Option) (broker.Publisher, error)

func NewSubscriber

func NewSubscriber(opts ...broker.Option) (broker.Subscriber, error)

func WithAddrs

func WithAddrs(addrs ...string) broker.Option

WithAddrs 设置Kafka集群的broker地址列表

func WithAllowPublishAutoTopicCreation

func WithAllowPublishAutoTopicCreation(allow bool) broker.Option

设置为true时,允许在发布消息时自动创建主题

默认不允许自动创建主题

func WithAsync

func WithAsync(async bool) broker.Option

设置为true时,WriteMessages方法将永不阻塞 这也意味着错误会被忽略,因为调用者无法接收返回值 仅在不关心消息是否成功写入Kafka时使用此选项

func WithAutoAck

func WithAutoAck(autoAck bool) broker.Option

自动提交消息确认

func WithBatchBytes

func WithBatchBytes(bytes int64) broker.Option

发送到分区前请求的最大字节数限制

默认使用Kafka的默认值1048576字节

func WithBatchSize

func WithBatchSize(size int) broker.Option

发送到分区前缓冲的消息数量限制

默认的目标批次大小是100条消息

func WithBatchTimeout

func WithBatchTimeout(timeout time.Duration) broker.Option

未满批次的消息发送到Kafka的时间间隔

默认至少每秒刷新一次

func WithCommitInterval

func WithCommitInterval(interval time.Duration) broker.Option

提交offset的时间间隔

func WithCrc32Balancer

func WithCrc32Balancer(consistent bool) broker.Option

WithCrc32Balancer CRC32负载均衡器

func WithDialerConfig

func WithDialerConfig(config *tls.Config) broker.Option

拨号器配置

func WithDialerTimeout

func WithDialerTimeout(timeout time.Duration) broker.Option

拨号超时时间

func WithHashBalancer

func WithHashBalancer(hasher hash.Hash32) broker.Option

WithHashBalancer Hash负载均衡器

func WithHeartbeatInterval

func WithHeartbeatInterval(interval time.Duration) broker.Option

心跳间隔时间

func WithLeastBytesBalancer

func WithLeastBytesBalancer() broker.Option

WithLeastBytesBalancer LeastBytes负载均衡器

func WithMaxAttempts

func WithMaxAttempts(attempts int) broker.Option

消息投递的最大重试次数

默认最多重试10次

func WithMaxBytes

func WithMaxBytes(maxBytes int) broker.Option

每次拉取消息的最大字节数

func WithMaxWait

func WithMaxWait(maxWait time.Duration) broker.Option

等待拉取消息的最大时间

func WithMinBytes

func WithMinBytes(minBytes int) broker.Option

每次拉取消息的最小字节数

func WithMurmur2Balancer

func WithMurmur2Balancer(consistent bool) broker.Option

WithMurmur2Balancer Murmur2负载均衡器

func WithPartitionWatchInterval

func WithPartitionWatchInterval(interval time.Duration) broker.Option

分区监控检查间隔

func WithQueue

func WithQueue(queue string) broker.Option

设置消息队列

func WithQueueCapacity

func WithQueueCapacity(capacity int) broker.Option

设置队列容量

func WithReadLagInterval

func WithReadLagInterval(interval time.Duration) broker.Option

读取延迟检查间隔

func WithReadTimeout

func WithReadTimeout(timeout time.Duration) broker.Option

Writer执行读操作的超时时间

默认10秒

func WithRebalanceTimeout

func WithRebalanceTimeout(timeout time.Duration) broker.Option

重平衡超时时间

func WithReferenceHashBalancer

func WithReferenceHashBalancer(hasher hash.Hash32) broker.Option

WithReferenceHashBalancer ReferenceHash负载均衡器

func WithRetentionTime

func WithRetentionTime(retention time.Duration) broker.Option

消息保留时间

func WithRetries

func WithRetries(retries int) broker.Option

WithRetries 设置重试次数,默认不重试

func WithRoundRobinBalancer

func WithRoundRobinBalancer() broker.Option

WithRoundRobinBalancer RoundRobin负载均衡器,默认均衡器。

func WithSASLMechanism

func WithSASLMechanism(mechanism sasl.Mechanism) broker.Option

WithSASLMechanism 设置SASL机制

func WithSessionTimeout

func WithSessionTimeout(timeout time.Duration) broker.Option

会话超时时间

func WithStartOffset

func WithStartOffset(offset int64) broker.Option

起始偏移量

func WithTLSConfig

func WithTLSConfig(config *tls.Config) broker.Option

WithTLSConfig 设置TLS配置

func WithTracings

func WithTracings(opts ...tracing.Option) broker.Option

WithTracings 设置追踪选项

func WithWatchPartitionChanges

func WithWatchPartitionChanges(watch bool) broker.Option

是否监控分区变化

func WithWriteTimeout

func WithWriteTimeout(timeout time.Duration) broker.Option

Writer执行写操作的超时时间

默认10秒

Types

type BalancerName

type BalancerName string
const (
	DefaultAddr = "127.0.0.1:9092"

	LeastBytesBalancer    BalancerName = "LeastBytes"
	RoundRobinBalancer    BalancerName = "RoundRobin"
	HashBalancer          BalancerName = "Hash"
	ReferenceHashBalancer BalancerName = "ReferenceHash"
	Crc32Balancer         BalancerName = "CRC32Balancer"
	Murmur2Balancer       BalancerName = "Murmur2Balancer"
)

type MessageCarrier

type MessageCarrier struct {
	// contains filtered or unexported fields
}

func NewMessageCarrier

func NewMessageCarrier(msg *kafkaGo.Message) MessageCarrier

func (MessageCarrier) Get

func (c MessageCarrier) Get(key string) string

func (MessageCarrier) Keys

func (c MessageCarrier) Keys() []string

func (MessageCarrier) Set

func (c MessageCarrier) Set(key, val string)

type Writer

type Writer struct {
	Writer  *kafkaGo.Writer
	Writers map[string]*kafkaGo.Writer
}

func NewWriter

func NewWriter() *Writer

func (*Writer) Close

func (w *Writer) Close()

func (*Writer) CreateProducer

func (w *Writer) CreateProducer(writerConfig WriterConfig, saslMechanism sasl.Mechanism, tlsConfig *tls.Config) *kafkaGo.Writer

CreateProducer 创建一个kafka-go Writer实例

type WriterConfig

type WriterConfig struct {
	// Kafka集群的broker地址列表
	Brokers []string

	// 用于在分区之间分配消息的均衡器
	//
	// 默认使用轮询分配方式
	Balancer kafkaGo.Balancer

	// 消息投递的最大重试次数
	//
	// 默认最多重试10次
	MaxAttempts int

	// 发送到分区前缓冲的消息数量限制
	//
	// 默认的目标批次大小是100条消息
	BatchSize int

	// 发送到分区前请求的最大字节数限制
	//
	// 默认使用Kafka的默认值1048576字节
	BatchBytes int64

	// 未满批次的消息发送到Kafka的时间间隔
	//
	// 默认至少每秒刷新一次
	BatchTimeout time.Duration

	// Writer执行读操作的超时时间
	//
	// 默认10秒
	ReadTimeout time.Duration

	// Writer执行写操作的超时时间
	//
	// 默认10秒
	WriteTimeout time.Duration

	// 生产请求需要等待的分区副本确认数
	// 默认值为-1,表示等待所有副本确认
	// 大于0的值表示需要多少个副本确认才算成功
	//
	// 当前kafka-go版本(v0.3)不支持设置为0
	// 如果需要该功能,需要升级到v0.4版本
	RequiredAcks kafkaGo.RequiredAcks

	// 设置为true时,WriteMessages方法将永不阻塞
	// 这也意味着错误会被忽略,因为调用者无法接收返回值
	// 仅在不关心消息是否成功写入Kafka时使用此选项
	Async bool

	// 如果不为nil,用于报告Writer内部变化的日志记录器
	Logger kafkaGo.Logger

	// 用于报告错误的日志记录器
	// 如果为nil,Writer将使用Logger代替
	ErrorLogger kafkaGo.Logger

	// 允许Writer在主题不存在时自动创建主题
	AllowAutoTopicCreation bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳