Documentation
¶
Overview ¶
Package kafka manages kafka interaction
Index ¶
- func Emit(ctx context.Context, producer *kafka.Writer, message []byte, ...) error
- func SignedBlindedTokenIssuerHandler(ctx context.Context, msg kafka.Message, producer *kafka.Writer, ...) error
- func SignedTokenRedeemHandler(ctx context.Context, msg kafka.Message, producer *kafka.Writer, ...) error
- func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error
- type MessageContext
- type Processor
- type SignedIssuerToken
- type TopicMapping
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Emit ¶
func Emit( ctx context.Context, producer *kafka.Writer, message []byte, logger *zerolog.Logger, ) error
Emit sends a message over the Kafka interface.
func SignedBlindedTokenIssuerHandler ¶
func SignedBlindedTokenIssuerHandler( ctx context.Context, msg kafka.Message, producer *kafka.Writer, server *cbpServer.Server, log *zerolog.Logger, ) error
SignedBlindedTokenIssuerHandler emits signed, blinded tokens based on provided blinded tokens.
In cases where there are unrecoverable errors that prevent progress we will return nil. These permanent failure cases are different from cases where we encounter temporary errors inside the request data. For permanent failures inside the data processing loop we simply add the error to the results. However, temporary errors inside the loop should break the loop and return non-nil just like the errors outside the data processing loop. This is because future attempts to process permanent failure cases will not succeed. @TODO: It would be better for the Server implementation and the Kafka implementation of this behavior to share utility functions rather than passing an instance of the server as an argument here. That will require a bit of refactoring.
Types ¶
type MessageContext ¶
type MessageContext struct {
// contains filtered or unexported fields
}
MessageContext is used for channel coordination when processing batches of messages
type SignedIssuerToken ¶
type SignedIssuerToken struct {
// contains filtered or unexported fields
}
type TopicMapping ¶
TopicMapping represents a kafka topic, how to process it, and where to emit the result.
Click to show internal directories.
Click to hide internal directories.