kafka_golang

package
v0.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2020 License: Apache-2.0 Imports: 6 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConcurrentRouter

type ConcurrentRouter struct {
	Router KeyRouter
	sync.RWMutex
}

ConcurrentRouter is a wrapper around a generic Router which controls concurrent access. This allows one to add new routes even while handling events. Simply wrap your existing Router in a ConcurrentRouter.

func (ConcurrentRouter) Handle

func (c ConcurrentRouter) Handle(event kafka.Event)

func (ConcurrentRouter) NewRoute

func (c ConcurrentRouter) NewRoute(key catchall.Key, handler Handler)

func (ConcurrentRouter) Run

func (c ConcurrentRouter) Run() (stop chan bool, err error)

type Consumer

type Consumer interface {
	// Run starts the consumer. It is necessary to call this method, otherwise the Consumer
	// won't consume any events.
	Run() (stop chan bool, err error)
}

Consumer is the generic interface for all consumers.

type Handler

type Handler func(event *kafka.Message) error

Handler is a function used in a KeyRouter which handles a single kind of message. The Handler may return an error that will be logged.

type JSONProducer

type JSONProducer interface {
	// ProduceJSONSync produces a message without headers and key by JSON-encoding the given value.
	ProduceJSONSync(topic string, partition int32, value interface{}) error
}

type KeyRouter

type KeyRouter interface {
	Consumer
	// NewRoute registers a handler for the given key. Only one handler may exist per key.
	NewRoute(key catchall.Key, handler Handler)
	// Handle routes a single Event either to the appropriate Handler function or to a generic error handler.
	// Note that this method may block.
	Handle(event kafka.Event)
}

KeyRouter is a generic router for Kafka which routes consumed messages to a set of configured handlers based on a key.

type LowLevelProducer

type LowLevelProducer interface {
	// ProduceSimpleSync sends a message without headers or key into a specific topic and partition.
	// It is mainly used for testing.
	ProduceSimpleSync(topic string, partition int32, value []byte) error
	// ProduceSync synchronously produces a Message.
	ProduceSync(message *kafka.Message) error
}

LowLevelProducer provides access to synchronous sending routines.

type Producer

type Producer struct {
	Producer *kafka.Producer
}

Producer is a wrapper for kafka.Producer which handles delivery failure in a canonical way.

func NewKafkaProducer

func NewKafkaProducer(broker string) (*Producer, error)

NewKafkaProducer constructs a new Producer that will produce into the given Kafka broker.

func (*Producer) ProduceJSONSync

func (k *Producer) ProduceJSONSync(topic string, partition int32, value interface{}) error

func (*Producer) ProduceSimpleSync

func (k *Producer) ProduceSimpleSync(topic string, partition int32, value []byte) error

func (*Producer) ProduceSync

func (k *Producer) ProduceSync(message *kafka.Message) error

type TopicRouter

type TopicRouter struct {
	*kafka.Consumer
	Handlers map[catchall.Key]Handler
}

TopicRouter is a Router that routes based on the Kafka topic of consumed messages.

func NewTopicRouter

func NewTopicRouter(consumer *kafka.Consumer) TopicRouter

func (TopicRouter) Handle

func (t TopicRouter) Handle(event kafka.Event)

func (TopicRouter) NewRoute

func (t TopicRouter) NewRoute(topic catchall.Key, handler Handler)

func (TopicRouter) Run

func (t TopicRouter) Run() (chan bool, error)

func (TopicRouter) Topics

func (t TopicRouter) Topics() []string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳