kafkaclient

package
v6.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2024 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package kafkaconsumer extends the BaseApp with a kafka consumer server

Example
package main

import (
	"context"

	"github.com/sabariramc/goserverbase/v6/app/server/kafkaclient"
	"github.com/sabariramc/goserverbase/v6/errors"
	"github.com/sabariramc/goserverbase/v6/kafka"
)

func main() {
	srv := kafkaclient.New()
	srv.AddHandler(context.Background(), "gobase.test.topic1", func(ctx context.Context, m *kafka.Message) error {
		return nil
	})
	srv.AddHandler(context.Background(), "gobase.test.topic2", func(ctx context.Context, m *kafka.Message) error {
		return &errors.CustomError{ErrorCode: "gobase.test.error", ErrorMessage: "error sample"}
	})
	srv.StartClient()
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	*baseapp.Config               // Embeds for base config
	*kafka.ConsumerConfig         // Embeds for kafka consumer config
	HealthCheckInterval   uint    // Interval in seconds to do health check of various modules
	HealthCheckResultPath string  // Local disk file path for writing health check results
	Log                   log.Log // Logger instance.
	Tracer                Tracer  // Tracer instance.
}

Config holds the configuration for the application.

func GetDefaultConfig

func GetDefaultConfig() *Config

GetDefaultConfig creates a new Config with values from environment variables or default values.

Environment Variables
- KAFKACS__HEALTH_CHECK_INTERVAL: Sets [HealthCheckInterval]
- KAFKACS__HEALTH_CHECK_RESULT_PATH: Sets [HealthCheckResultPath]

type KafkaClient

type KafkaClient struct {
	*baseapp.BaseApp
	// contains filtered or unexported fields
}

KafkaClient represents a Kafka consumer server. Implements ShutdownHook, HealthCheckHook and StatusCheckHook

func New

func New(option ...Options) *KafkaClient

New creates a new instance of KafkaClient.

func (*KafkaClient) AddHandler

func (k *KafkaClient) AddHandler(ctx context.Context, topicName string, handler KafkaEventProcessor)

AddHandler adds a handler for processing Kafka events for the specified topic.

func (*KafkaClient) Commit

func (k *KafkaClient) Commit(ctx context.Context) error

Commit commits the current offset of the Kafka consumer.

func (*KafkaClient) GetCorrelationParams

func (k *KafkaClient) GetCorrelationParams(headers map[string]string) *correlation.CorrelationParam

GetCorrelationParams extracts correlation parameters from the given headers and returns a CorrelationParam instance.

func (*KafkaClient) GetMessageContext

func (k *KafkaClient) GetMessageContext(msg *kafka.Message) context.Context

GetMessageContext creates a context for processing a Kafka message with correlation parameters and user identifier. If a tracer was passed during the server initiation, create a new span for every message and updates attribute

func (*KafkaClient) GetSpanFromContext

func (k *KafkaClient) GetSpanFromContext(ctx context.Context) (span.Span, bool)

GetSpanFromContext retrieves the OpenTelemetry span from the given context.

func (*KafkaClient) GetUserIdentifier

func (k *KafkaClient) GetUserIdentifier(headers map[string]string) *correlation.UserIdentifier

GetUserIdentifier extracts user identifier from the given headers and returns a UserIdentifier instance.

func (*KafkaClient) HealthCheck

func (k *KafkaClient) HealthCheck(ctx context.Context) error

HealthCheck runs a health check on the Kafka consumer server.

func (*KafkaClient) HealthCheckMonitor

func (k *KafkaClient) HealthCheckMonitor(ctx context.Context)

HealthCheckMonitor starts a health check monitor that periodically runs health checks.

func (*KafkaClient) Name

func (k *KafkaClient) Name(ctx context.Context) string

Name returns the name of the KafkaClient. Implementation of the hook interface defined in the BaseApp

func (*KafkaClient) ProcessEvent

func (k *KafkaClient) ProcessEvent(ctx context.Context, msg *kafka.Message, handler KafkaEventProcessor)

ProcessEvent processes a Kafka message using the specified handler.

func (*KafkaClient) Shutdown

func (k *KafkaClient) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the Kafka consumer server. Implementation for shutdown hook

func (*KafkaClient) StartClient added in v6.0.1

func (k *KafkaClient) StartClient()

StartClient starts the Kafka client. And starts background process for message poll, signal monitoring and set up cleanup steps when the server shutdowns

func (*KafkaClient) StatusCheck

func (k *KafkaClient) StatusCheck(ctx context.Context) (any, error)

StatusCheck runs a status check on the Kafka consumer server.

func (*KafkaClient) StoreMessage

func (k *KafkaClient) StoreMessage(ctx context.Context, msg *kafka.Message) error

StoreMessage stores the given Kafka message.

func (*KafkaClient) Subscribe

func (k *KafkaClient) Subscribe(ctx context.Context)

Subscribe subscribes to Kafka topics and starts consuming messages.

type KafkaEventProcessor

type KafkaEventProcessor func(context.Context, *kafka.Message) error

KafkaEventProcessor defines the function signature for processing Kafka events handlers.

type Options

type Options func(*Config)

Options represents options for configuring a KafkaClient instance.

func WithKafkaConsumerConfig

func WithKafkaConsumerConfig(config *kafka.ConsumerConfig) Options

WithKafkaConsumerConfig sets the Kafka consumer configuration for KafkaClient.

func WithLog

func WithLog(log log.Log) Options

WithLog sets the log instance for KafkaClient.

func WithNotifier

func WithNotifier(notifier notifier.Notifier) Options

WithNotifier sets the notifier instance for KafkaClient.

func WithServerConfig

func WithServerConfig(config *baseapp.Config) Options

WithServerConfig sets the server configuration for KafkaClient.

func WithTracer

func WithTracer(t Tracer) Options

WithTracer sets the tracer instance for KafkaClient.

type Tracer

type Tracer interface {
	StartKafkaSpanFromMessage(ctx context.Context, msg *ckafka.Message) (context.Context, span.Span)
	span.SpanOp
}

Tracer defines the interface for tracing functionality.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳