Documentation
¶
Index ¶
- func Dialer(algo scram.Algorithm, username, password string, timeout time.Duration, ...) (sasl.Mechanism, *kafka.Dialer)
- type Interface
- type Options
- func (k *Options) CreateTopics(ctx context.Context, topics []string, partition, replicationFactor int) error
- func (k *Options) DeleteAllTopics(ctx context.Context) error
- func (k *Options) DeleteTopics(ctx context.Context, topics []string) error
- func (k *Options) LeaderHost(ctx context.Context) (string, error)
- func (k *Options) ListTopics(ctx context.Context) ([]string, error)
- func (k *Options) Read(ctx context.Context, topic string, partition int, readTimeout time.Duration) (*kafka.Batch, error)
- func (k *Options) Write(ctx context.Context, topic string, partition int, writeTimeout time.Duration, ...) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Interface ¶
type Interface interface { LeaderHost(ctx context.Context) (string, error) Write(ctx context.Context, topic string, partition int, writeTimeout time.Duration, key, value []byte) error Read(ctx context.Context, topic string, partition int, readTimeout time.Duration) (*kafka.Batch, error) CreateTopics(ctx context.Context, topics []string, partition, replicationFactor int) error DeleteTopics(ctx context.Context, topics []string) error ListTopics(ctx context.Context) ([]string, error) DeleteAllTopics(ctx context.Context) error }
type Options ¶
type Options struct { Username string `json:"username,omitempty" yaml:"username,omitempty" xml:"username,omitempty"` Password string `json:"password,omitempty" yaml:"password,omitempty" xml:"password,omitempty"` Brokers []string `json:"brokers,omitempty" yaml:"brokers,omitempty" xml:"brokers,omitempty"` Topics []string `json:"topics,omitempty" yaml:"topics,omitempty" xml:"topics,omitempty"` Timeout time.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty" xml:"timeout,omitempty"` Async bool `json:"async,omitempty" yaml:"async,omitempty" xml:"async,omitempty"` Algo scram.Algorithm `json:"algo,omitempty" yaml:"algo,omitempty" xml:"algo,omitempty"` Protocol string `json:"protocol,omitempty" yaml:"protocol,omitempty" xml:"protocol,omitempty"` AllowAutoTopicCreation bool `json:"allowAutoTopicCreation,omitempty" yaml:"allowAutoTopicCreation,omitempty" xml:"allowAutoTopicCreation,omitempty"` }
Options Kafka 配置Options
func NewKafkaOptions ¶
func NewKafkaOptions() *Options
func (*Options) CreateTopics ¶
func (k *Options) CreateTopics(ctx context.Context, topics []string, partition, replicationFactor int) error
CreateTopics 创建Kafka主题 该函数通过连接到Kafka集群的领导者节点来创建指定的主題 参数:
ctx: 上下文,用于取消操作 topics: 待创建的主题列表 partition: 每个主题的分区数量 replicationFactor: 主题的副本因子,表示副本的数量
返回值:
error: 表示创建主题时的错误,如果没有错误则是nil
func (*Options) DeleteAllTopics ¶
DeleteAllTopics 删除与特定连接相关联的所有topics 该方法首先通过ListTopicByConn函数获取所有与连接相关的topics列表 然后使用DeleteTopicsByConn函数来删除这些topics 参数:
ctx: 用于取消操作的上下文,可以在整个函数执行的任何时间点检查上下文并返回
返回值:
error: 如果列出或删除topics时发生错误,则返回错误
func (*Options) DeleteTopics ¶
DeleteTopics 通过连接到Kafka集群的领导者来删除指定的主题 它接受一个上下文对象,用于日志记录和获取Kafka领导者主机地址 参数:
topics: 是一个字符串切片,包含了需要删除的主题名称列表
返回值:
error: 一个错误类型,如果执行过程中发生错误,则返回相应的错误信息
func (*Options) LeaderHost ¶
LeaderHost 通过上下文获取当前Kafka集群的控制节点主机信息 该函数主要用于确定哪个Kafka节点目前是控制节点 参数:
ctx: 上下文,可用于传递超时、截止时间或取消请求等信息
返回值:
string: 当前控制节点的主机信息(IP地址和端口号) error: 在获取控制节点主机信息过程中可能遇到的错误,如果没有错误,则为nil
func (*Options) ListTopics ¶
ListTopics 根据连接列出Kafka中的所有主题 该函数通过连接到Kafka集群的领导者实例来获取所有主题列表 参数:
ctx: 上下文,用于传递请求范围的值、配置 deadline 等
返回值:
[]string: 包含所有主题名称的切片 error: 如果连接到Kafka实例或读取分区信息时发生错误,则返回错误
func (*Options) Read ¶
func (k *Options) Read(ctx context.Context, topic string, partition int, readTimeout time.Duration) (*kafka.Batch, error)
Read 通过指定的kafka连接读取指定主题和分区的数据 参数:
ctx: 上下文,用于传递请求范围的值、配置截止时间以及取消操作 topic: 要读取数据的主题名称 partition: 要读取数据的分区编号 readTimeout: 读取操作的超时时间
返回值:
*kafka.Batch: 读取到的消息批次 error: 如果发生错误,则返回错误信息
func (*Options) Write ¶
func (k *Options) Write(ctx context.Context, topic string, partition int, writeTimeout time.Duration, key, value []byte) error
Write 通过Kafka连接发送消息 该函数负责将一条消息发送到指定的Kafka主题和分区。它首先尝试与Kafka集群的Leader节点建立连接, 然后设置写超时时间,最后发送消息并关闭连接 参数:
ctx: 上下文,用于传递超时、取消信号和日志等信息 topic: 消息要发送到的Kafka主题 partition: 消息要发送到的主题内的分区 writeTimeout: 设置写操作(发送消息)的超时时间 key: 消息的键,可以为空 value: 消息的值
返回值:
error: 如果连接、设置超时、发送消息或关闭连接过程中发生错误,则返回该错误
Click to show internal directories.
Click to hide internal directories.