Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BatchMerger ¶ added in v0.15.0
type BatchMerger[T utils.ArmadaEvent] func(batch []*utils.EventsWithIds[T]) *utils.EventsWithIds[T]
BatchMerger merges together events within the batch, where possible
type BatchMetricPublisher ¶ added in v0.15.0
type BatchMetricPublisher[T utils.ArmadaEvent] func(metrics *commonmetrics.Metrics, batch *utils.EventsWithIds[T])
BatchMetricPublisher logs a summary of the batching process
type Batcher ¶
type Batcher[T any] struct {
// contains filtered or unexported fields
}
Batcher batches up events from a channel. Batches are created whenever maxItems have been received or maxTimeout has elapsed since the last batch was created (whichever occurs first).
func NewBatcher ¶
func NewBatcher[T any](input <-chan T, maxItems int, maxTimeout time.Duration, itemCountFunc func(T) int, publish chan []T) *Batcher[T]
type EventCounter ¶ added in v0.15.0
type EventCounter[T utils.ArmadaEvent] func(events *utils.EventsWithIds[T]) int
EventCounter determines the true count of events, as some utils.ArmadaEvent can contain nested events
type HasPulsarMessageIds ¶
type HasPulsarMessageIds interface {
GetMessageIDs() []pulsar.MessageID
}
HasPulsarMessageIds should be implemented by structs that can store a batch of pulsar message ids This is needed so we can pass message Ids down the pipeline and ack them at the end
type IngestionPipeline ¶
type IngestionPipeline[T HasPulsarMessageIds, U utils.ArmadaEvent] struct {
// contains filtered or unexported fields
}
IngestionPipeline is a pipeline that reads message from pulsar and inserts them into a sink. The pipeline will handle the following automatically:
- Receiving messages from pulsar
- Unmarshalling into eventsWithIds
- Combining messages into batches for efficient processing
- Publishing relevant metrics related to batch
- Converting eventsWithIds to instructions
- Acking processed messages
Callers must supply two structs, an InstructionConverter for converting eventsWithIds into something that can be exhausted and a Sink capable of exhausting these objects
func NewIngestionPipeline ¶
func NewIngestionPipeline[T HasPulsarMessageIds, U utils.ArmadaEvent](
pulsarConfig commonconfig.PulsarConfig,
pulsarTopic string,
pulsarSubscriptionName string,
pulsarBatchSize int,
pulsarBatchDuration time.Duration,
pulsarSubscriptionType pulsar.SubscriptionType,
eventCounter EventCounter[U],
messageConverter MessageUnmarshaller[U],
batchMerger BatchMerger[U],
metricPublisher BatchMetricPublisher[U],
converter InstructionConverter[T, U],
sink Sink[T],
metrics *commonmetrics.Metrics,
) *IngestionPipeline[T, U]
NewIngestionPipeline creates an IngestionPipeline that processes all pulsar messages
type InstructionConverter ¶
type InstructionConverter[T HasPulsarMessageIds, U utils.ArmadaEvent] interface {
Convert(ctx *armadacontext.Context, msg *utils.EventsWithIds[U]) T
}
InstructionConverter should be implemented by structs that can convert a batch of eventsWithIds into an object suitable for passing to the sink
type MessageUnmarshaller ¶ added in v0.15.0
type MessageUnmarshaller[T utils.ArmadaEvent] func(msg pulsar.ConsumerMessage, metrics *commonmetrics.Metrics) *utils.EventsWithIds[T]
MessageUnmarshaller converts consumed pulsar messages to the intermediate type, utils.EventsWithIds.
type Sink ¶
type Sink[T HasPulsarMessageIds] interface {
// Store should persist the sink. The store is responsible for retrying failed attempts and should only return an error
// When it is satisfied that operation cannot be retries.
Store(ctx *armadacontext.Context, msg T) error
}
Sink should be implemented by the struct responsible for putting the data in its final resting place, e.g. a database.