kafkacarrier

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2025 License: MIT Imports: 2 Imported by: 0

Documentation

Overview

Package kafkacarrier provides an implementation of the OpenTelemetry TextMapCarrier interface for Kafka message headers. It facilitates injecting and extracting trace context into Kafka message headers, enabling distributed tracing over Kafka messaging systems.

This package defines the KafkaHeaderCarrier type, which adapts the []kafka.Header type to implement the TextMapCarrier interface used by OpenTelemetry propagators.

Usage

## Injecting trace context into Kafka headers before publishing a message:

// Initialize Kafka headers and carrier
var headers []kafka.Header
carrier := &kafkacarrier.KafkaHeaderCarrier{Headers: headers}

// Propagate the trace context
traceflow.PropagateTraceContext(ctx, carrier)

// Create and publish the Kafka message with headers
message := kafka.Message{
    Topic:   "topic",
    Value:   messageData,
    Headers: carrier.Headers(),
}
err := kafkaWriter.WriteMessages(ctx, message)

## Extracting trace context from Kafka headers upon receiving a message:

func handleMessage(msg kafka.Message) {
    // Create carrier from Kafka headers
    carrier := &kafkacarrier.KafkaHeaderCarrier{Headers: msg.Headers}

    // Extract the trace context
    ctx := traceflow.ExtractTraceContext(context.Background(), carrier)

    // Start a new span with the extracted context
    span := traceflow.New(ctx, "kafka").Start("ConsumeEvent")
    defer span.End()

    // Process the message
    // ...
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type KafkaHeadersCarrier

type KafkaHeadersCarrier struct {
	Headers *[]kafka.Header
}

KafkaHeadersCarrier is a custom carrier for Kafka headers.

func New

func New(headers *[]kafka.Header) *KafkaHeadersCarrier

New creates a new KafkaHeadersCarrier.

func (*KafkaHeadersCarrier) Get

func (c *KafkaHeadersCarrier) Get(key string) string

Get retrieves the value of the header with the given key.

func (*KafkaHeadersCarrier) Keys

func (c *KafkaHeadersCarrier) Keys() []string

Keys returns the keys of the headers.

func (*KafkaHeadersCarrier) Set

func (c *KafkaHeadersCarrier) Set(key, value string)

Set sets the value of the header with the given key.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳