Documentation
¶
Overview ¶
Package kafkacarrier provides an implementation of the OpenTelemetry TextMapCarrier interface for Kafka message headers. It facilitates injecting and extracting trace context into Kafka message headers, enabling distributed tracing over Kafka messaging systems.
This package defines the KafkaHeaderCarrier type, which adapts the []kafka.Header type to implement the TextMapCarrier interface used by OpenTelemetry propagators.
Usage ¶
## Injecting trace context into Kafka headers before publishing a message:
// Initialize Kafka headers and carrier var headers []kafka.Header carrier := &kafkacarrier.KafkaHeaderCarrier{Headers: headers} // Propagate the trace context traceflow.PropagateTraceContext(ctx, carrier) // Create and publish the Kafka message with headers message := kafka.Message{ Topic: "topic", Value: messageData, Headers: carrier.Headers(), } err := kafkaWriter.WriteMessages(ctx, message)
## Extracting trace context from Kafka headers upon receiving a message:
func handleMessage(msg kafka.Message) { // Create carrier from Kafka headers carrier := &kafkacarrier.KafkaHeaderCarrier{Headers: msg.Headers} // Extract the trace context ctx := traceflow.ExtractTraceContext(context.Background(), carrier) // Start a new span with the extracted context span := traceflow.New(ctx, "kafka").Start("ConsumeEvent") defer span.End() // Process the message // ... }
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KafkaHeadersCarrier ¶
type KafkaHeadersCarrier struct {
Headers *[]kafka.Header
}
KafkaHeadersCarrier is a custom carrier for Kafka headers.
func New ¶
func New(headers *[]kafka.Header) *KafkaHeadersCarrier
New creates a new KafkaHeadersCarrier.
func (*KafkaHeadersCarrier) Get ¶
func (c *KafkaHeadersCarrier) Get(key string) string
Get retrieves the value of the header with the given key.
func (*KafkaHeadersCarrier) Keys ¶
func (c *KafkaHeadersCarrier) Keys() []string
Keys returns the keys of the headers.
func (*KafkaHeadersCarrier) Set ¶
func (c *KafkaHeadersCarrier) Set(key, value string)
Set sets the value of the header with the given key.