Documentation
¶
Overview ¶
Package kafkaconsumer extends the BaseApp with a kafka consumer server
Example ¶
package main import ( "context" "github.com/sabariramc/goserverbase/v6/app/server/kafkaclient" "github.com/sabariramc/goserverbase/v6/errors" "github.com/sabariramc/goserverbase/v6/kafka" ) func main() { srv := kafkaclient.New() srv.AddHandler(context.Background(), "gobase.test.topic1", func(ctx context.Context, m *kafka.Message) error { return nil }) srv.AddHandler(context.Background(), "gobase.test.topic2", func(ctx context.Context, m *kafka.Message) error { return &errors.CustomError{ErrorCode: "gobase.test.error", ErrorMessage: "error sample"} }) srv.StartClient() }
Output:
Index ¶
- type Config
- type KafkaClient
- func (k *KafkaClient) AddHandler(ctx context.Context, topicName string, handler KafkaEventProcessor)
- func (k *KafkaClient) Commit(ctx context.Context) error
- func (k *KafkaClient) GetCorrelationParams(headers map[string]string) *correlation.CorrelationParam
- func (k *KafkaClient) GetMessageContext(msg *kafka.Message) context.Context
- func (k *KafkaClient) GetSpanFromContext(ctx context.Context) (span.Span, bool)
- func (k *KafkaClient) GetUserIdentifier(headers map[string]string) *correlation.UserIdentifier
- func (k *KafkaClient) HealthCheck(ctx context.Context) error
- func (k *KafkaClient) HealthCheckMonitor(ctx context.Context)
- func (k *KafkaClient) Name(ctx context.Context) string
- func (k *KafkaClient) ProcessEvent(ctx context.Context, msg *kafka.Message, handler KafkaEventProcessor)
- func (k *KafkaClient) Shutdown(ctx context.Context) error
- func (k *KafkaClient) StartClient()
- func (k *KafkaClient) StatusCheck(ctx context.Context) (any, error)
- func (k *KafkaClient) StoreMessage(ctx context.Context, msg *kafka.Message) error
- func (k *KafkaClient) Subscribe(ctx context.Context)
- type KafkaEventProcessor
- type Options
- type Tracer
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { *baseapp.Config // Embeds for base config *kafka.ConsumerConfig // Embeds for kafka consumer config HealthCheckInterval uint // Interval in seconds to do health check of various modules HealthCheckResultPath string // Local disk file path for writing health check results Log log.Log // Logger instance. Tracer Tracer // Tracer instance. }
Config holds the configuration for the application.
func GetDefaultConfig ¶
func GetDefaultConfig() *Config
GetDefaultConfig creates a new Config with values from environment variables or default values.
Environment Variables - KAFKACS__HEALTH_CHECK_INTERVAL: Sets [HealthCheckInterval] - KAFKACS__HEALTH_CHECK_RESULT_PATH: Sets [HealthCheckResultPath]
type KafkaClient ¶
KafkaClient represents a Kafka consumer server. Implements ShutdownHook, HealthCheckHook and StatusCheckHook
func (*KafkaClient) AddHandler ¶
func (k *KafkaClient) AddHandler(ctx context.Context, topicName string, handler KafkaEventProcessor)
AddHandler adds a handler for processing Kafka events for the specified topic.
func (*KafkaClient) Commit ¶
func (k *KafkaClient) Commit(ctx context.Context) error
Commit commits the current offset of the Kafka consumer.
func (*KafkaClient) GetCorrelationParams ¶
func (k *KafkaClient) GetCorrelationParams(headers map[string]string) *correlation.CorrelationParam
GetCorrelationParams extracts correlation parameters from the given headers and returns a CorrelationParam instance.
func (*KafkaClient) GetMessageContext ¶
func (k *KafkaClient) GetMessageContext(msg *kafka.Message) context.Context
GetMessageContext creates a context for processing a Kafka message with correlation parameters and user identifier. If a tracer was passed during the server initiation, create a new span for every message and updates attribute
func (*KafkaClient) GetSpanFromContext ¶
GetSpanFromContext retrieves the OpenTelemetry span from the given context.
func (*KafkaClient) GetUserIdentifier ¶
func (k *KafkaClient) GetUserIdentifier(headers map[string]string) *correlation.UserIdentifier
GetUserIdentifier extracts user identifier from the given headers and returns a UserIdentifier instance.
func (*KafkaClient) HealthCheck ¶
func (k *KafkaClient) HealthCheck(ctx context.Context) error
HealthCheck runs a health check on the Kafka consumer server.
func (*KafkaClient) HealthCheckMonitor ¶
func (k *KafkaClient) HealthCheckMonitor(ctx context.Context)
HealthCheckMonitor starts a health check monitor that periodically runs health checks.
func (*KafkaClient) Name ¶
func (k *KafkaClient) Name(ctx context.Context) string
Name returns the name of the KafkaClient. Implementation of the hook interface defined in the BaseApp
func (*KafkaClient) ProcessEvent ¶
func (k *KafkaClient) ProcessEvent(ctx context.Context, msg *kafka.Message, handler KafkaEventProcessor)
ProcessEvent processes a Kafka message using the specified handler.
func (*KafkaClient) Shutdown ¶
func (k *KafkaClient) Shutdown(ctx context.Context) error
Shutdown gracefully shuts down the Kafka consumer server. Implementation for shutdown hook
func (*KafkaClient) StartClient ¶ added in v6.0.1
func (k *KafkaClient) StartClient()
StartClient starts the Kafka client. And starts background process for message poll, signal monitoring and set up cleanup steps when the server shutdowns
func (*KafkaClient) StatusCheck ¶
func (k *KafkaClient) StatusCheck(ctx context.Context) (any, error)
StatusCheck runs a status check on the Kafka consumer server.
func (*KafkaClient) StoreMessage ¶
StoreMessage stores the given Kafka message.
func (*KafkaClient) Subscribe ¶
func (k *KafkaClient) Subscribe(ctx context.Context)
Subscribe subscribes to Kafka topics and starts consuming messages.
type KafkaEventProcessor ¶
KafkaEventProcessor defines the function signature for processing Kafka events handlers.
type Options ¶
type Options func(*Config)
Options represents options for configuring a KafkaClient instance.
func WithKafkaConsumerConfig ¶
func WithKafkaConsumerConfig(config *kafka.ConsumerConfig) Options
WithKafkaConsumerConfig sets the Kafka consumer configuration for KafkaClient.
func WithNotifier ¶
WithNotifier sets the notifier instance for KafkaClient.
func WithServerConfig ¶
WithServerConfig sets the server configuration for KafkaClient.
func WithTracer ¶
WithTracer sets the tracer instance for KafkaClient.