Documentation
¶
Index ¶
- Constants
- type AdminConfig
- type ConsumerConfig
- type ConsumerFunc
- type KafkaAdmin
- func (c *KafkaAdmin) Connect() error
- func (c *KafkaAdmin) CreateTopic(topic string, numPartitions int, replicationFactor int) error
- func (c *KafkaAdmin) DeleteTopic(topic string) error
- func (c *KafkaAdmin) Disconnect()
- func (c *KafkaAdmin) GetTopics(topics ...string) ([]kafka.Partition, error)
- func (c *KafkaAdmin) IsConnected() bool
- func (c *KafkaAdmin) ListTopics() ([]string, error)
- func (c *KafkaAdmin) TopicExists(topic string) (bool, error)
- type KafkaConsumer
- func (c *KafkaConsumer) ChannelSubscribe(ch chan Message) error
- func (c *KafkaConsumer) Connect()
- func (c *KafkaConsumer) Disconnect()
- func (c *KafkaConsumer) GetConfig() *kafka.ReaderConfig
- func (c *KafkaConsumer) IsConnected() bool
- func (c *KafkaConsumer) ReadMessage() (Message, error)
- func (c *KafkaConsumer) Rewind() error
- func (c *KafkaConsumer) Subscribe(handler ConsumerFunc) error
- func (c *KafkaConsumer) SubscribeWithOffsets(handler ConsumerFunc) error
- type KafkaProducer
- func (p *KafkaProducer) Disconnect()
- func (p *KafkaProducer) IsConnected() bool
- func (p *KafkaProducer) Write(value []byte, key ...[]byte) error
- func (p *KafkaProducer) WriteJson(data interface{}, key ...[]byte) error
- func (p *KafkaProducer) WriteMulti(values ...[]byte) error
- func (p *KafkaProducer) WriteMultiJson(values ...interface{}) error
- type Message
- type ProducerConfig
Constants ¶
const ( ErrMissingConsumerBroker = utils.Error("Missing Consumer broker address") ErrMissingConsumerTopic = utils.Error("Missing Consumer Topic name") ErrMissingConsumerGroup = utils.Error("Missing Consumer Group") ErrConsumerAlreadyConnected = utils.Error("Cannot change connection properties; already connected") DefaultTimeout = time.Second * 30 ErrMissingProducerBroker = utils.Error("Missing Producer broker address") ErrMissingProducerTopic = utils.Error("Missing Producer Topic name") ErrProducerClosed = utils.Error("Producer is already closed") ErrInvalidAuthType = utils.Error("Invalid authentication type") ErrMissingAdminBroker = utils.Error("Missing Admin broker address") ErrNilConfig = utils.Error("Config is nil") AuthTypeNone = "none" AuthTypePlain = "plain" AuthTypeScram256 = "scram256" AuthTypeScram512 = "scram512" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AdminConfig ¶
type AdminConfig struct { Brokers string `json:"brokers"` AuthType string `json:"authType"` Username string `json:"username"` Password string `json:"password"` tlsProvider.ClientConfig }
func (AdminConfig) Validate ¶
func (c AdminConfig) Validate() error
type ConsumerConfig ¶
type ConsumerConfig struct { Brokers string `json:"brokers"` Topic string `json:"topic"` Group string `json:"group"` AuthType string `json:"authType"` Username string `json:"username"` Password string `json:"password"` tlsProvider.ClientConfig }
func (ConsumerConfig) Validate ¶
func (c ConsumerConfig) Validate() error
type ConsumerFunc ¶
ConsumerFunc Reader handler type
type KafkaAdmin ¶
type KafkaAdmin struct { Conn *kafka.Conn // contains filtered or unexported fields }
func NewAdmin ¶
func NewAdmin(ctx context.Context, cfg *AdminConfig) (*KafkaAdmin, error)
func (*KafkaAdmin) Connect ¶
func (c *KafkaAdmin) Connect() error
func (*KafkaAdmin) CreateTopic ¶
func (c *KafkaAdmin) CreateTopic(topic string, numPartitions int, replicationFactor int) error
CreateTopic create a new Topic
func (*KafkaAdmin) DeleteTopic ¶
func (c *KafkaAdmin) DeleteTopic(topic string) error
DeleteTopic removes a Topic
func (*KafkaAdmin) Disconnect ¶
func (c *KafkaAdmin) Disconnect()
func (*KafkaAdmin) GetTopics ¶
func (c *KafkaAdmin) GetTopics(topics ...string) ([]kafka.Partition, error)
func (*KafkaAdmin) IsConnected ¶
func (c *KafkaAdmin) IsConnected() bool
func (*KafkaAdmin) ListTopics ¶
func (c *KafkaAdmin) ListTopics() ([]string, error)
ListTopics list existing kafka topics
func (*KafkaAdmin) TopicExists ¶
func (c *KafkaAdmin) TopicExists(topic string) (bool, error)
TopicExists returns true if Topic exists
type KafkaConsumer ¶
type KafkaConsumer struct { Brokers string Group string Topic string Reader *kafka.Reader // contains filtered or unexported fields }
func NewConsumer ¶
func NewConsumer(ctx context.Context, cfg *ConsumerConfig) (*KafkaConsumer, error)
func (*KafkaConsumer) ChannelSubscribe ¶
func (c *KafkaConsumer) ChannelSubscribe(ch chan Message) error
ChannelSubscribe subscribes to a reader handler by channel Note: This function is blocking
func (*KafkaConsumer) Disconnect ¶
func (c *KafkaConsumer) Disconnect()
Disconnect Diconnect from kafka
func (*KafkaConsumer) GetConfig ¶
func (c *KafkaConsumer) GetConfig() *kafka.ReaderConfig
GetConfig Get initial config object Useful to set other options before connect
func (*KafkaConsumer) IsConnected ¶
func (c *KafkaConsumer) IsConnected() bool
IsConnected Returns true if Reader was initialized
func (*KafkaConsumer) ReadMessage ¶
func (c *KafkaConsumer) ReadMessage() (Message, error)
ReadMessage reads a single message from Kafka It returns the Kafka message and an error If there is no message available, it will block until a message is available If an error occurs, it will be returned
func (*KafkaConsumer) Rewind ¶
func (c *KafkaConsumer) Rewind() error
Rewind Read messages from the beginning
func (*KafkaConsumer) Subscribe ¶
func (c *KafkaConsumer) Subscribe(handler ConsumerFunc) error
Subscribe consumes a message from a topic using a handler Note: this function is blocking
func (*KafkaConsumer) SubscribeWithOffsets ¶
func (c *KafkaConsumer) SubscribeWithOffsets(handler ConsumerFunc) error
SubscribeWithOffsets manages a reader handler that explicitly commits offsets Note: this function is blocking
type KafkaProducer ¶
type KafkaProducer struct { Brokers string Topic string Writer *kafka.Writer // contains filtered or unexported fields }
func NewProducer ¶
func NewProducer(ctx context.Context, cfg *ProducerConfig) (*KafkaProducer, error)
func (*KafkaProducer) Disconnect ¶
func (p *KafkaProducer) Disconnect()
Disconnect disconnects from the Writer
func (*KafkaProducer) IsConnected ¶
func (p *KafkaProducer) IsConnected() bool
IsConnected returns ture if Writer is connected
func (*KafkaProducer) Write ¶
func (p *KafkaProducer) Write(value []byte, key ...[]byte) error
Write writes a single message to topic
func (*KafkaProducer) WriteJson ¶
func (p *KafkaProducer) WriteJson(data interface{}, key ...[]byte) error
WriteJson Write a struct to a Topic as a json message
func (*KafkaProducer) WriteMulti ¶
func (p *KafkaProducer) WriteMulti(values ...[]byte) error
WriteMulti Write multiple messages to Topic
func (*KafkaProducer) WriteMultiJson ¶
func (p *KafkaProducer) WriteMultiJson(values ...interface{}) error
WriteMultiJson Write a slice of structs to a Topic as a json message
type Message ¶ added in v0.1.1
type Message = kafka.Message
Message is a type alias to avoid using kafka-go in application code
type ProducerConfig ¶
type ProducerConfig struct { Brokers string `json:"brokers"` Topic string `json:"topic"` AuthType string `json:"authType"` Username string `json:"username"` Password string `json:"password"` tlsProvider.ClientConfig }
func (ProducerConfig) Validate ¶
func (c ProducerConfig) Validate() error