Documentation
¶
Index ¶
- Constants
- Variables
- func NewKafkaConfig(options ...KafkaConfigOption) *kafka.ConfigMap
- func NewTokenSource(clientID, clientSecret, url string, refreshBefore time.Duration, ...) oauth2.TokenSource
- type Consumer
- type Event
- type Handler
- type HandlerBuilder
- type HandlerFunc
- type Header
- type KafkaConfigOption
- func WithAutoOffsetReset(offsetReset OffsetReset) KafkaConfigOption
- func WithBootstrapServers(servers []string) KafkaConfigOption
- func WithCommaSeparatedBootstrapServers(servers string) KafkaConfigOption
- func WithGroupID(groupID string) KafkaConfigOption
- func WithKeyValue(key string, value interface{}) KafkaConfigOption
- func WithLogConnectionClose(logClose bool) KafkaConfigOption
- func WithSessionTimeout(timeout time.Duration) KafkaConfigOption
- func WithTopicMetadataRefreshInterval(interval time.Duration) KafkaConfigOption
- type KafkaConsumerConfig
- type KafkaProducerConfig
- type Middleware
- type OffsetReset
- type Producer
- type Token
- type TopicPartition
Examples ¶
Constants ¶
const HeaderTrackingID = "X-Tracking-Id"
const OTelTracerName = "github.com/blacklane/go-libs/x/events"
OTelTracerName is the name to be used when getting the OTel tracer. TODO: make a method to create/get a tracer?
Variables ¶
var ErrConsumerAlreadyShutdown = errors.New("consumer already shutdown")
var ErrProducerIsAlreadyRunning = errors.New("producer is already running")
var ErrProducerNotHandlingEvents = errors.New("producer should be handling events")
var ErrShutdownTimeout = errors.New("shutdown timeout: not all handlers finished, not closing kafka client")
Functions ¶
func NewKafkaConfig ¶ added in v0.5.0
func NewKafkaConfig(options ...KafkaConfigOption) *kafka.ConfigMap
NewKafkaConfig creates a kafka config object according to confluentic documentation: https://docs.confluent.io/platform/current/installation/configuration
func NewTokenSource ¶
func NewTokenSource( clientID, clientSecret, url string, refreshBefore time.Duration, httpClient http.Client) oauth2.TokenSource
NewTokenSource returns a simple oauth2.TokenSource implementation. It refreshes the token refreshBefore the token expiration.
Types ¶
type Consumer ¶
func NewKafkaConsumer ¶
func NewKafkaConsumer(config *KafkaConsumerConfig, topics []string, handlers ...Handler) (Consumer, error)
NewKafkaConsumer returns a Consumer which will send every message to all handlers and ignore any error returned by them. A middleware should handle the errors. To handle errors, either `kafka.Error` messages or any other error while interacting with Kafka, register a Error function on *KafkaConsumerConfig.
type Event ¶
type Event struct { Headers Header TopicPartition TopicPartition Key []byte Payload []byte }
type HandlerBuilder ¶
type HandlerBuilder struct {
// contains filtered or unexported fields
}
Example ¶
m1 := Middleware(func(handler Handler) Handler { return HandlerFunc(func(ctx context.Context, e Event) error { fmt.Println("middleware 1: before handler") err := handler.Handle(ctx, e) fmt.Println("middleware 1: after handler") return err }) }) m2 := Middleware(func(handler Handler) Handler { return HandlerFunc(func(ctx context.Context, e Event) error { fmt.Println("middleware 2: before handler") err := handler.Handle(ctx, e) fmt.Println("middleware 2: after handler") return err }) }) h := HandlerFunc(func(_ context.Context, _ Event) error { fmt.Println("handler") return nil }) hb := HandlerBuilder{} hb.AddHandler(h) hb.UseMiddleware(m1, m2) // HandlerBuilder.Build returns a slice as several handlers might be added handler := hb.Build()[0] err := handler.Handle(context.Background(), Event{}) if err != nil { fmt.Println("handler error: " + err.Error()) }
Output: middleware 1: before handler middleware 2: before handler handler middleware 2: after handler middleware 1: after handler
Example (MultipleHandlers) ¶
m1 := Middleware(func(handler Handler) Handler { return HandlerFunc(func(ctx context.Context, e Event) error { fmt.Println("middleware 1: before handler") err := handler.Handle(ctx, e) fmt.Println("middleware 1: after handler") return err }) }) m2 := Middleware(func(handler Handler) Handler { return HandlerFunc(func(ctx context.Context, e Event) error { fmt.Println("middleware 2: before handler") err := handler.Handle(ctx, e) fmt.Println("middleware 2: after handler") return err }) }) h1 := HandlerFunc(func(_ context.Context, _ Event) error { fmt.Println("handler 1") return nil }) h2 := HandlerFunc(func(_ context.Context, _ Event) error { fmt.Println("handler 2") return nil }) hb := HandlerBuilder{} hb.AddHandler(h1) hb.UseMiddleware(m1, m2) hb.AddHandler(h2) // HandlerBuilder.Build returns a slice as several handlers might be added handlers := hb.Build() handler1 := handlers[0] handler2 := handlers[1] err := handler1.Handle(context.Background(), Event{}) if err != nil { fmt.Println("handler1 error: " + err.Error()) } fmt.Print("\n") err = handler2.Handle(context.Background(), Event{}) if err != nil { fmt.Println("handler2 error: " + err.Error()) }
Output: middleware 1: before handler middleware 2: before handler handler 1 middleware 2: after handler middleware 1: after handler middleware 1: before handler middleware 2: before handler handler 2 middleware 2: after handler middleware 1: after handler
func (*HandlerBuilder) AddHandler ¶
func (hb *HandlerBuilder) AddHandler(h Handler)
func (HandlerBuilder) Build ¶
func (hb HandlerBuilder) Build() []Handler
func (*HandlerBuilder) UseMiddleware ¶
func (hb *HandlerBuilder) UseMiddleware(m ...Middleware)
type HandlerFunc ¶
type KafkaConfigOption ¶ added in v0.5.0
func WithAutoOffsetReset ¶ added in v0.5.0
func WithAutoOffsetReset(offsetReset OffsetReset) KafkaConfigOption
WithAutoOffsetReset specify what to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted)
func WithBootstrapServers ¶ added in v0.5.0
func WithBootstrapServers(servers []string) KafkaConfigOption
WithBootstrapServers sets the list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
func WithCommaSeparatedBootstrapServers ¶ added in v0.5.0
func WithCommaSeparatedBootstrapServers(servers string) KafkaConfigOption
WithCommaSeparatedBootstrapServers sets the list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
func WithGroupID ¶ added in v0.5.0
func WithGroupID(groupID string) KafkaConfigOption
WithGroupID sets a unique string that identifies the consumer group this consumer belongs to.
func WithKeyValue ¶ added in v0.5.0
func WithKeyValue(key string, value interface{}) KafkaConfigOption
func WithLogConnectionClose ¶ added in v0.5.0
func WithLogConnectionClose(logClose bool) KafkaConfigOption
func WithSessionTimeout ¶ added in v0.5.0
func WithSessionTimeout(timeout time.Duration) KafkaConfigOption
WithSessionTimeout sets the timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker.
func WithTopicMetadataRefreshInterval ¶ added in v0.5.0
func WithTopicMetadataRefreshInterval(interval time.Duration) KafkaConfigOption
type KafkaConsumerConfig ¶
type KafkaConsumerConfig struct {
// contains filtered or unexported fields
}
KafkaConsumerConfig holds all possible configurations for the kafka consumer. Use NewKafkaProducerConfig to initialise it. To see the possible configurations, check the its WithXXX methods and *kafkaConfig.WithXXX` methods as well
func NewKafkaConsumerConfig ¶
func NewKafkaConsumerConfig(config *kafka.ConfigMap) *KafkaConsumerConfig
NewKafkaConsumerConfig returns a initialised *KafkaConsumerConfig
func (KafkaConsumerConfig) WithErrFunc ¶
func (kc KafkaConsumerConfig) WithErrFunc(errFn func(error))
WithErrFunc sets a function to handle any error beyond producer delivery errors.
func (KafkaConsumerConfig) WithOAuth ¶
func (kc KafkaConsumerConfig) WithOAuth(tokenSource oauth2.TokenSource)
WithOAuth prepares to handle OAuth2. It'll set the kafka configurations:
sasl.mechanism: OAUTHBEARER security.protocol: SASL_SSL
it'll override any existing value for sasl.mechanism, security.protocol.
type KafkaProducerConfig ¶
type KafkaProducerConfig struct {
// contains filtered or unexported fields
}
KafkaProducerConfig holds all possible configurations for the kafka producer. Use NewKafkaProducerConfig to initialise it. To see the possible configurations, check the its WithXXX methods and *kafkaConfig.WithXXX` methods as well
func NewKafkaProducerConfig ¶
func NewKafkaProducerConfig(config *kafka.ConfigMap) *KafkaProducerConfig
NewKafkaProducerConfig returns an initialised *KafkaProducerConfig
func (KafkaProducerConfig) WithErrFunc ¶
func (kc KafkaProducerConfig) WithErrFunc(errFn func(error))
WithErrFunc sets a function to handle any error beyond producer delivery errors.
func (*KafkaProducerConfig) WithEventDeliveryErrHandler ¶
func (pc *KafkaProducerConfig) WithEventDeliveryErrHandler(errHandler func(Event, error))
WithEventDeliveryErrHandler registers a delivery error handler to be called whenever a delivery fails.
func (*KafkaProducerConfig) WithFlushTimeout ¶
func (pc *KafkaProducerConfig) WithFlushTimeout(timeout int)
WithFlushTimeout sets the producer Flush timeout.
func (KafkaProducerConfig) WithOAuth ¶
func (kc KafkaProducerConfig) WithOAuth(tokenSource oauth2.TokenSource)
WithOAuth prepares to handle OAuth2. It'll set the kafka configurations:
sasl.mechanism: OAUTHBEARER security.protocol: SASL_SSL
it'll override any existing value for sasl.mechanism, security.protocol.
type Middleware ¶
type OffsetReset ¶ added in v0.5.0
type OffsetReset string
const ( // OffsetResetEarliest automatically reset the offset to the earliest offset OffsetResetEarliest OffsetReset = "earliest" // OffsetResetLatest automatically reset the offset to the latest offset OffsetResetLatest OffsetReset = "latest" // OffsetResetNone throw exception to the consumer if no previous offset is found for the consumer's group OffsetResetNone OffsetReset = "none" )
type Producer ¶
type Producer interface { // Send sends an event to the given topic // Deprecated. use SendCtx instead Send(event Event, topic string) error // SendCtx send an event to the given topic. // It also adds the OTel propagation headers and the X-Tracking-Id if not set // already. SendCtx(ctx context.Context, eventName string, event Event, topic string) error // SendWithTrackingID adds the tracking ID to the event's headers and sends // it to the given topic // Deprecated. use SendCtx instead SendWithTrackingID(trackingID string, event Event, topic string) error // HandleEvents starts to listen to the producer events channel HandleEvents() error // Shutdown gracefully shuts down the producer, it respect the context // timeout. Shutdown(ctx context.Context) error }
func NewKafkaProducer ¶
func NewKafkaProducer(c *KafkaProducerConfig) (Producer, error)
NewKafkaProducer returns new a producer. To handle errors, either `kafka.Error` messages or any other error while interacting with Kafka, register an Error function on *KafkaConsumerConfig.
type Token ¶
type Token struct { AccessToken string `json:"access_token"` ExpiresIn int `json:"expires_in"` RefreshExpiresIn int `json:"refresh_expires_in"` RefreshToken string `json:"refresh_token"` TokenType string `json:"token_type"` NotBeforePolicy int `json:"not-before-policy"` SessionState string `json:"session_state"` Scope string `json:"scope"` }
Token represents an JWT token. TODO: ensure it isn't used and remove it.
type TopicPartition ¶ added in v0.4.0
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package eventstest is a generated GoMock package.
|
Package eventstest is a generated GoMock package. |
examples
|
|
oauth
Module
|
|
without-auth
Module
|
|