kafka

package
v0.0.0-...-431a833 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Dialer

func Dialer(algo scram.Algorithm, username, password string, timeout time.Duration, dualStack bool) (sasl.Mechanism, *kafka.Dialer)

Dialer 返回kafka.Dialer方法

Types

type Interface

type Interface interface {
	LeaderHost(ctx context.Context) (string, error)
	Write(ctx context.Context, topic string, partition int, writeTimeout time.Duration, key, value []byte) error
	Read(ctx context.Context, topic string, partition int, readTimeout time.Duration) (*kafka.Batch, error)
	CreateTopics(ctx context.Context, topics []string, partition, replicationFactor int) error
	DeleteTopics(ctx context.Context, topics []string) error
	ListTopics(ctx context.Context) ([]string, error)
	DeleteAllTopics(ctx context.Context) error
}

type Options

type Options struct {
	Username               string          `json:"username,omitempty" yaml:"username,omitempty" xml:"username,omitempty"`
	Password               string          `json:"password,omitempty" yaml:"password,omitempty" xml:"password,omitempty"`
	Brokers                []string        `json:"brokers,omitempty" yaml:"brokers,omitempty" xml:"brokers,omitempty"`
	Topics                 []string        `json:"topics,omitempty" yaml:"topics,omitempty" xml:"topics,omitempty"`
	Timeout                time.Duration   `json:"timeout,omitempty" yaml:"timeout,omitempty" xml:"timeout,omitempty"`
	Async                  bool            `json:"async,omitempty" yaml:"async,omitempty" xml:"async,omitempty"`
	Algo                   scram.Algorithm `json:"algo,omitempty" yaml:"algo,omitempty" xml:"algo,omitempty"`
	Protocol               string          `json:"protocol,omitempty" yaml:"protocol,omitempty" xml:"protocol,omitempty"`
	AllowAutoTopicCreation bool            `json:"allowAutoTopicCreation,omitempty" yaml:"allowAutoTopicCreation,omitempty" xml:"allowAutoTopicCreation,omitempty"`
}

Options Kafka 配置Options

func NewKafkaOptions

func NewKafkaOptions() *Options

func (*Options) CreateTopics

func (k *Options) CreateTopics(ctx context.Context, topics []string, partition, replicationFactor int) error

CreateTopics 创建Kafka主题 该函数通过连接到Kafka集群的领导者节点来创建指定的主題 参数:

ctx: 上下文,用于取消操作
topics: 待创建的主题列表
partition: 每个主题的分区数量
replicationFactor: 主题的副本因子,表示副本的数量

返回值:

error: 表示创建主题时的错误,如果没有错误则是nil

func (*Options) DeleteAllTopics

func (k *Options) DeleteAllTopics(ctx context.Context) error

DeleteAllTopics 删除与特定连接相关联的所有topics 该方法首先通过ListTopicByConn函数获取所有与连接相关的topics列表 然后使用DeleteTopicsByConn函数来删除这些topics 参数:

ctx: 用于取消操作的上下文,可以在整个函数执行的任何时间点检查上下文并返回

返回值:

error: 如果列出或删除topics时发生错误,则返回错误

func (*Options) DeleteTopics

func (k *Options) DeleteTopics(ctx context.Context, topics []string) error

DeleteTopics 通过连接到Kafka集群的领导者来删除指定的主题 它接受一个上下文对象,用于日志记录和获取Kafka领导者主机地址 参数:

topics: 是一个字符串切片,包含了需要删除的主题名称列表

返回值:

error: 一个错误类型,如果执行过程中发生错误,则返回相应的错误信息

func (*Options) LeaderHost

func (k *Options) LeaderHost(ctx context.Context) (string, error)

LeaderHost 通过上下文获取当前Kafka集群的控制节点主机信息 该函数主要用于确定哪个Kafka节点目前是控制节点 参数:

ctx: 上下文,可用于传递超时、截止时间或取消请求等信息

返回值:

string: 当前控制节点的主机信息(IP地址和端口号)
error: 在获取控制节点主机信息过程中可能遇到的错误,如果没有错误,则为nil

func (*Options) ListTopics

func (k *Options) ListTopics(ctx context.Context) ([]string, error)

ListTopics 根据连接列出Kafka中的所有主题 该函数通过连接到Kafka集群的领导者实例来获取所有主题列表 参数:

ctx: 上下文,用于传递请求范围的值、配置 deadline 等

返回值:

[]string: 包含所有主题名称的切片
error: 如果连接到Kafka实例或读取分区信息时发生错误,则返回错误

func (*Options) Read

func (k *Options) Read(ctx context.Context, topic string, partition int, readTimeout time.Duration) (*kafka.Batch, error)

Read 通过指定的kafka连接读取指定主题和分区的数据 参数:

ctx: 上下文,用于传递请求范围的值、配置截止时间以及取消操作
topic: 要读取数据的主题名称
partition: 要读取数据的分区编号
readTimeout: 读取操作的超时时间

返回值:

*kafka.Batch: 读取到的消息批次
error: 如果发生错误,则返回错误信息

func (*Options) Write

func (k *Options) Write(ctx context.Context, topic string, partition int, writeTimeout time.Duration, key, value []byte) error

Write 通过Kafka连接发送消息 该函数负责将一条消息发送到指定的Kafka主题和分区。它首先尝试与Kafka集群的Leader节点建立连接, 然后设置写超时时间,最后发送消息并关闭连接 参数:

ctx: 上下文,用于传递超时、取消信号和日志等信息
topic: 消息要发送到的Kafka主题
partition: 消息要发送到的主题内的分区
writeTimeout: 设置写操作(发送消息)的超时时间
key: 消息的键,可以为空
value: 消息的值

返回值:

error: 如果连接、设置超时、发送消息或关闭连接过程中发生错误,则返回该错误

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳