Documentation
¶
Overview ¶
Package otelkafka provides functions to trace the github.com/confluentinc/confluent-kafka-go/kafka package.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewMessageCarrier ¶
func NewMessageCarrier(message *kafka.Message) propagation.TextMapCarrier
NewMessageCarrier returns a TextMapCarrier that will encode and decode tracing information to and from the passed message.
Types ¶
type Config ¶
type Config struct { Tracer trace.Tracer Propagator propagation.TextMapPropagator DefaultStartOpts []trace.SpanStartOption // contains filtered or unexported fields }
Config contains configuration options.
func NewConfig ¶
NewConfig returns a Config for instrumentation with all options applied.
If no TracerProvider or Propagator are specified with options, the default OpenTelemetry globals will be used.
func (*Config) MergedSpanStartOptions ¶
func (c *Config) MergedSpanStartOptions(opts ...trace.SpanStartOption) []trace.SpanStartOption
MergedSpanStartOptions returns a copy of opts with any DefaultStartOpts that c is configured with prepended.
func (*Config) ResolveTracer ¶
ResolveTracer returns an OpenTelemetry tracer from the appropriate TracerProvider.
If the passed context contains a span, the TracerProvider that created the tracer that created that span will be used. Otherwise, the TracerProvider from c is used.
type Consumer ¶
Consumer wraps a kafka.Consumer and traces its operations.
func NewConsumer ¶
NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer with tracing instrumentation.
Example ¶
c, err := NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "group.id": "myGroup", }) if err != nil { panic(err) } defer c.Close()
Output:
func WrapConsumer ¶
WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.
func (*Consumer) Close ¶
Close calls the underlying Consumer.Close and if polling is enabled, ends any remaining span.
func (*Consumer) Poll ¶
Poll polls the consumer for events. Message events are traced.
Will block for at most timeoutMs milliseconds.
The following callbacks may be triggered:
Subscribe()'s rebalanceCb
Returns nil on timeout, else an Event
func (*Consumer) ReadMessage ¶
ReadMessage polls the consumer for a message and traces the read.
This is a convenience API that wraps Poll() and only returns messages or errors. All other event types are discarded.
The call will block for at most `timeout` waiting for a new message or error. `timeout` may be set to -1 for indefinite wait.
Timeout is returned as (nil, err) where err is `err.(kafka.Error).Code() == kafka.ErrTimedOut`.
Messages are returned as (msg, nil), while general errors are returned as (nil, err), and partition-specific errors are returned as (msg, err) where msg.TopicPartition provides partition-specific information (such as topic, partition and offset).
All other event types, such as PartitionEOF, AssignedPartitions, etc, are silently discarded.
type Option ¶
type Option interface {
Apply(*Config)
}
Option applies options to a configuration.
func WithAttributes ¶
WithAttributes returns an Option that appends attr to the attributes set for every span created.
func WithPropagator ¶
func WithPropagator(p propagation.TextMapPropagator) Option
WithPropagator returns an Option that sets p as the TextMapPropagator used when propagating a span context.
func WithTracerProvider ¶
func WithTracerProvider(tp trace.TracerProvider) Option
WithTracerProvider returns an Option that sets the TracerProvider used for a configuration.
type OptionFunc ¶
type OptionFunc func(*Config)
OptionFunc is a generic way to set an option using a func.
func (OptionFunc) Apply ¶
func (o OptionFunc) Apply(c *Config)
Apply applies the configuration option.
type Producer ¶
A Producer wraps a kafka.Producer and traces its operations.
func NewProducer ¶
NewProducer calls kafka.NewProducer and wraps the resulting Producer with tracing instrumentation.
Example ¶
p, err := NewProducer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", }) if err != nil { panic(err) } defer p.Close()
Output:
func WrapProducer ¶
WrapProducer wraps a kafka.Producer so that any produced events are traced.
func (*Producer) Close ¶
func (p *Producer) Close()
Close calls the wrapped Producer.Close and closes the producer channel.
func (*Producer) ProduceChannel ¶
ProduceChannel returns the traced producer channel.