kafka

package
v0.0.0-...-f7bb4bf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2025 License: BSD-3-Clause Imports: 19 Imported by: 0

Documentation

Overview

Package kafka manages kafka interaction

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Emit

func Emit(
	ctx context.Context,
	producer *kafka.Writer,
	message []byte,
	logger *zerolog.Logger,
) error

Emit sends a message over the Kafka interface.

func SignedBlindedTokenIssuerHandler

func SignedBlindedTokenIssuerHandler(
	ctx context.Context,
	msg kafka.Message,
	producer *kafka.Writer,
	server *cbpServer.Server,
	log *zerolog.Logger,
) error

SignedBlindedTokenIssuerHandler emits signed, blinded tokens based on provided blinded tokens.

In cases where there are unrecoverable errors that prevent progress we will return nil.
These permanent failure cases are different from cases where we encounter temporary
errors inside the request data. For permanent failures inside the data processing loop we
simply add the error to the results. However, temporary errors inside the loop should break
the loop and return non-nil just like the errors outside the data processing loop. This is
because future attempts to process permanent failure cases will not succeed.
@TODO: It would be better for the Server implementation and the Kafka implementation of
this behavior to share utility functions rather than passing an instance of the server
as an argument here. That will require a bit of refactoring.

func SignedTokenRedeemHandler

func SignedTokenRedeemHandler(
	ctx context.Context,
	msg kafka.Message,
	producer *kafka.Writer,
	server *cbpServer.Server,
	log *zerolog.Logger,
) error

func StartConsumers

func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error

StartConsumers reads configuration variables and starts the associated kafka consumers

Types

type MessageContext

type MessageContext struct {
	// contains filtered or unexported fields
}

MessageContext is used for channel coordination when processing batches of messages

type Processor

type Processor func(context.Context, kafka.Message, *zerolog.Logger) error

Processor is a function that is used to process Kafka messages on

type SignedIssuerToken

type SignedIssuerToken struct {
	// contains filtered or unexported fields
}

type TopicMapping

type TopicMapping struct {
	Topic     string
	Processor Processor
}

TopicMapping represents a kafka topic, how to process it, and where to emit the result.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳