Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConcurrentRouter ¶
ConcurrentRouter is a wrapper around a generic Router which controls concurrent access. This allows one to add new routes even while handling events. Simply wrap your existing Router in a ConcurrentRouter.
func (ConcurrentRouter) Handle ¶
func (c ConcurrentRouter) Handle(event kafka.Event)
func (ConcurrentRouter) NewRoute ¶
func (c ConcurrentRouter) NewRoute(key catchall.Key, handler Handler)
func (ConcurrentRouter) Run ¶
func (c ConcurrentRouter) Run() (stop chan bool, err error)
type Consumer ¶
type Consumer interface { // Run starts the consumer. It is necessary to call this method, otherwise the Consumer // won't consume any events. Run() (stop chan bool, err error) }
Consumer is the generic interface for all consumers.
type Handler ¶
Handler is a function used in a KeyRouter which handles a single kind of message. The Handler may return an error that will be logged.
type JSONProducer ¶
type KeyRouter ¶
type KeyRouter interface { Consumer // NewRoute registers a handler for the given key. Only one handler may exist per key. NewRoute(key catchall.Key, handler Handler) // Handle routes a single Event either to the appropriate Handler function or to a generic error handler. // Note that this method may block. Handle(event kafka.Event) }
KeyRouter is a generic router for Kafka which routes consumed messages to a set of configured handlers based on a key.
type LowLevelProducer ¶
type LowLevelProducer interface { // ProduceSimpleSync sends a message without headers or key into a specific topic and partition. // It is mainly used for testing. ProduceSimpleSync(topic string, partition int32, value []byte) error // ProduceSync synchronously produces a Message. ProduceSync(message *kafka.Message) error }
LowLevelProducer provides access to synchronous sending routines.
type Producer ¶
Producer is a wrapper for kafka.Producer which handles delivery failure in a canonical way.
func NewKafkaProducer ¶
NewKafkaProducer constructs a new Producer that will produce into the given Kafka broker.
func (*Producer) ProduceJSONSync ¶
func (*Producer) ProduceSimpleSync ¶
type TopicRouter ¶
TopicRouter is a Router that routes based on the Kafka topic of consumed messages.
func NewTopicRouter ¶
func NewTopicRouter(consumer *kafka.Consumer) TopicRouter
func (TopicRouter) Handle ¶
func (t TopicRouter) Handle(event kafka.Event)
func (TopicRouter) Run ¶
func (t TopicRouter) Run() (chan bool, error)
func (TopicRouter) Topics ¶
func (t TopicRouter) Topics() []string