Documentation
¶
Overview ¶
Package kafka Manage Kafka Client
@update 2023-03-28 02:01:25
Index ¶
Constants ¶
const ( // MechanismSASLPlain SASL PLAINTEXT authentication // @update 2024-01-03 06:54:55 MechanismSASLPlain = "SASL_PLAINTEXT" // MechanismSASLSCRAMSHA256 SASL SCRAM-SHA-256 authentication // @update 2024-01-03 06:55:11 MechanismSASLSCRAMSHA256 = "SASL_SCRAM_SHA_256" // MechanismSASLSCRAMSHA512 SASL SCRAM-SHA-512 authentication // @update 2024-01-03 06:55:17 MechanismSASLSCRAMSHA512 = "SASL_SCRAM_SHA_512" )
Mechanism SASL authentication mechanism constants
Variables ¶
var ( // ErrNoTopics GetTopicsFunc got no topic // @update 2023-03-14 01:18:01 ErrNoTopics = errors.New("no topics found") // ErrTooManyConsumeError consumer.ErrCount reach MaxConsumeErrorCount // @update 2023-03-15 02:02:27 ErrTooManyConsumeError = errors.New("too many errors when consuming data") // ErrClosedConsumer error when try to close closed consumer // @update 2023-03-15 02:03:09 ErrClosedConsumer = errors.New("the consumer has been closed") // ErrClosedProducer error when try to close closed producer // @update 2023-03-30 05:12:28 ErrClosedProducer = errors.New("the producer has been closed") )
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct { ConsumerConfig // contains filtered or unexported fields }
Consumer struct holds data related to the consumer
@author kevineluo @update 2023-02-24 01:53:37
func NewConsumer ¶
func NewConsumer(ctx context.Context, config ConsumerConfig) (c *Consumer, err error)
NewConsumer creates a new Kafka consumer.
@param kafkaBootstrap string @param groupID string @param getTopics GetTopicsFunc @return c *Consumer @return err error @author kevineluo @update 2023-03-14 01:12:16
func (*Consumer) CheckState ¶ added in v0.0.4
func (consumer *Consumer) CheckState()
CheckState check consumer state
@receiver consumer *Consumer @author kevineluo @update 2023-04-18 01:34:40
type ConsumerConfig ¶
type ConsumerConfig struct { // the Kafka broker address, the default value is "localhost:9092". // If you have multiple brokers, you can use a comma to separate them, such as "localhost:9092,localhost:9093,localhost:9094". // default: "localhost:9092" Bootstrap string `json:"bootstrap"` // Group ID of consumer GroupID string `json:"group_id"` // If no message received after [MaxMsgInterval] seconds then restart Consumer, default: 300 seconds MaxMsgInterval time.Duration `json:"max_msg_interval"` // Interval for consumer to sync topics, default: 15 seconds SyncTopicInterval time.Duration `json:"sync_topic_interval"` // Maximum number of goroutine for subscribing to topics, default: runtime.NumCPU() MaxConsumeGoroutines int `json:"max_consume_goroutines"` // max error count from consuming messages, set it to -1 to ignore error, default: 5 MaxConsumeErrorCount int `json:"max_consume_error_count"` // function which handles received messages from the Kafka broker MessageHandler MessageHandler `json:"-"` // Function used to sync topics, default: GetAllTopic GetTopics GetTopicsFunc `json:"-"` // logger implement logr.LogSinker, default: zapr.Logger Logger *logr.Logger `json:"-"` // Mechanism sasl authentication, default: nil Mechanism sasl.Mechanism `json:"-"` // TLS TLS configuration, default: nil TLS *tls.Config `json:"-"` // used when Config.logger is nil, follow the zap style level(https://pkgo.dev/go.uber.org/[email protected]/zapcore#Level), // setting the log level for zapr.Logger(config.logLevel should be in range[-1, 5]) // default: 0 -- InfoLevel LogLevel int `json:"log_level"` // enable verbose kafka-go log, default: false Verbose bool `json:"verbose"` }
ConsumerConfig configuration object used to create new instances of Consumer
@author kevineluo @update 2023-03-15 03:01:48
func (*ConsumerConfig) Validate ¶
func (config *ConsumerConfig) Validate() (err error)
Validate check config and set default value
@receiver config *ConsumerConfig @return err error @author kevineluo @update 2023-03-31 04:44:47
type GetTopicsFunc ¶
GetTopicsFunc way to get needed topic(implemented and provided by user)
@return topics []string @return err error @author kevineluo @update 2023-03-28 07:16:54
func GetAllTopic ¶
func GetAllTopic() GetTopicsFunc
GetAllTopic function decorator for get all topics, return GetTopicsFunc
@param kafkaBootstrap string @return GetTopicsFunc @author kevineluo @update 2023-03-15 03:14:57
func GetTopicReMatch ¶
func GetTopicReMatch(reList []string) GetTopicsFunc
GetTopicReMatch function decorator for get topics with regex match, return GetTopicsFunc matches found (resTopics) and an err if applicable.
@param reList []string @return GetTopicsFunc @author kevineluo @update 2023-03-29 03:22:56
type MessageHandler ¶
MessageHandler function which handles received messages from the Kafka broker.
@param msg *kafka.Message @param consumer *Consumer @return err error @author kevineluo @update 2023-03-28 07:16:44
type Producer ¶
type Producer struct { ProducerConfig // contains filtered or unexported fields }
Producer struct holds data related to the producer
@author kevineluo @update 2023-03-15 10:30:44
func NewProducer ¶
func NewProducer(ctx context.Context, config ProducerConfig) (p *Producer, err error)
NewProducer creates a new Kafka producer.
@param kafkaBootstrap string @param logrLogger logr.Logger @return p *Producer @return err error @author kevineluo @update 2023-03-15 10:52:06
NewProducer creates a new producer with the given config A producer is a wrapper of kafka writer with some additional features such as topic auto creation, message batching, etc Note that the producer is not thread-safe If you want to use one producer in multiple goroutines, you should do the synchronization by yourself
func (*Producer) Close ¶
Close close the producer
@receiver producer *Producer @return error @author kevineluo @update 2023-03-15 02:43:18
func (*Producer) Closed ¶ added in v0.0.2
func (producer *Producer) Closed() <-chan struct{}
Closed check if the Producer is Closed
@receiver producer *Producer @return bool @author kevineluo @update 2023-03-30 05:11:40
func (*Producer) WriteMessages ¶
WriteMessages write messages to kafka, notice that all messages should set their own topic
@receiver producer *Producer @param ctx context.Context @param msgs ...kafka.Message @return err error @author kevineluo @update 2023-03-29 02:46:03
type ProducerConfig ¶
type ProducerConfig struct { // kafka bootstrap, default: "localhost:9092" Bootstrap string `json:"bootstrap"` // determine synchronously / asynchronously write messages, default: false Async bool `json:"async"` // By default kafka has the auto.create.topics.enable='true', you can ignore this config. // when this config, producer will attempt to create topic prior to publishing the message, // else it will return an error when meeting missing topic, // default: false AllowAutoTopicCreation bool `json:"allow_auto_topic_creation"` // logger implement logr.LogSinker, default: zapr.Logger Logger *logr.Logger `json:"-"` // Mechanism sasl authentication, default: nil Mechanism sasl.Mechanism `json:"-"` // TLS TLS configuration, default: nil TLS *tls.Config `json:"-"` // used when Config.logger is nil, follow the zap style level(https://pkgo.dev/go.uber.org/[email protected]/zapcore#Level), // setting the log level for zapr.Logger(config.logLevel should be in range[-1, 5]) // default: 0 -- InfoLevel LogLevel int `json:"log_level"` // enable verbose kafka-go log, default: false Verbose bool `json:"verbose"` }
ProducerConfig configuration object used to create new instances of Producer
@author kevineluo @update 2023-03-15 03:01:48
func (*ProducerConfig) Validate ¶
func (config *ProducerConfig) Validate() (err error)
Validate check config and set default value
@receiver config *ProducerConfig @return err error @author kevineluo @update 2023-03-15 03:19:23