Documentation
¶
Index ¶
- Constants
- func DefaultKafkaConfig() *kafka.ConfigMap
- func MessagePartitionFromCtx(ctx context.Context) (int32, bool)
- func MessagePartitionOffsetFromCtx(ctx context.Context) (int64, bool)
- func MessageTimestampFromCtx(ctx context.Context) (time.Time, bool)
- type DefaultMarshaler
- type Marshaler
- type MarshalerUnmarshaler
- type Message
- type Publisher
- type PublisherConfig
- type Subscriber
- type SubscriberConfig
- type Unmarshaler
Constants ¶
View Source
const UUIDHeaderKey = "_watermill_message_uuid"
Variables ¶
This section is empty.
Functions ¶
func DefaultKafkaConfig ¶
DefaultKafkaSubscriberConfig creates default kafka.ConfigMap used by Publishers / Subscribers.
Custom config can be passed to NewSubscriber and NewPublisher.
kafkaConfig := DefaultKafkaSubscriberConfig() kafkaConfig.Set("session.timeout.ms=90000") subscriberConfig.KafkaConfig = kafkaConfig subscriber, err := NewSubscriber(subscriberConfig, logger) // ...
func MessagePartitionFromCtx ¶
MessagePartitionFromCtx returns Kafka partition of the consumed message
func MessagePartitionOffsetFromCtx ¶
MessagePartitionOffsetFromCtx returns Kafka partition offset of the consumed message
Types ¶
type DefaultMarshaler ¶
type DefaultMarshaler struct{}
Default JSON Marshaler
type MarshalerUnmarshaler ¶
type MarshalerUnmarshaler interface { Marshaler Unmarshaler }
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)
NewPublisher creates a new Kafka Publisher.
type PublisherConfig ¶
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
func NewSubscriber ¶
func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)
NewSubscriber creates a new Kafka Subscriber.
func (*Subscriber) Close ¶
func (s *Subscriber) Close() (err error)
type SubscriberConfig ¶
type SubscriberConfig struct { // Kafka brokers list. Brokers []string // Unmarshaler is used to unmarshal messages from Kafka format into Watermill format. Unmarshaler Unmarshaler // Kafka ConfigMap KafkaConfig *kafka.ConfigMap // ConsumerGroup is the group.id set for kafka.Consumer ConsumerGroup string }
Click to show internal directories.
Click to hide internal directories.