Documentation
¶
Index ¶
- func WithRetry(action func() (bool, error), intialBackoff time.Duration, ...) error
- type Batcher
- type EventSequencesWithIds
- type HasPulsarMessageIds
- type IngestionPipeline
- func NewFilteredMsgIngestionPipeline[T HasPulsarMessageIds](pulsarConfig configuration.PulsarConfig, pulsarSubscriptionName string, ...) *IngestionPipeline[T]
- func NewIngestionPipeline[T HasPulsarMessageIds](pulsarConfig configuration.PulsarConfig, pulsarSubscriptionName string, ...) *IngestionPipeline[T]
- type InstructionConverter
- type Sink
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Batcher ¶
type Batcher[T any] struct {
// contains filtered or unexported fields
}
Batcher batches up events from a channel. Batches are created whenever maxItems have been received or maxTimeout has elapsed since the last batch was created (whichever occurs first).
func NewBatcher ¶
func NewBatcher[T any](input chan T, maxItems int, maxTimeout time.Duration, callback func([]T)) *Batcher[T]
type EventSequencesWithIds ¶
type EventSequencesWithIds struct {
EventSequences []*armadaevents.EventSequence
MessageIds []pulsar.MessageID
}
EventSequencesWithIds consists of a batch of Event Sequences along with the corresponding Pulsar Message Ids
type HasPulsarMessageIds ¶
type HasPulsarMessageIds interface {
GetMessageIDs() []pulsar.MessageID
}
HasPulsarMessageIds should be implemented by structs that can store a batch of pulsar message ids This is needed so we can pass message Ids down the pipeline and ack them at the end
type IngestionPipeline ¶
type IngestionPipeline[T HasPulsarMessageIds] struct {
// contains filtered or unexported fields
}
IngestionPipeline is a pipeline that reads message from pulsar and inserts them into a sink. The pipeline will handle the following automatically:
- Receiving messages from pulsar
- Combining messages into batches for efficient processing
- Unmarshalling into event sequences
- Acking processed messages
Callers must supply two structs, an InstructionConverter for converting event sequences into something that can be exhausted and a Sink capable of exhausting these objects
func NewFilteredMsgIngestionPipeline ¶ added in v0.3.49
func NewFilteredMsgIngestionPipeline[T HasPulsarMessageIds](
pulsarConfig configuration.PulsarConfig,
pulsarSubscriptionName string,
pulsarBatchSize int,
pulsarBatchDuration time.Duration,
pulsarSubscriptionType pulsar.SubscriptionType,
msgFilter func(msg pulsar.Message) bool,
converter InstructionConverter[T],
sink Sink[T],
metricsConfig configuration.MetricsConfig,
metrics *commonmetrics.Metrics,
) *IngestionPipeline[T]
NewFilteredMsgIngestionPipeline creates an IngestionPipeline that processes only messages corresponding to the supplied message filter
func NewIngestionPipeline ¶
func NewIngestionPipeline[T HasPulsarMessageIds](
pulsarConfig configuration.PulsarConfig,
pulsarSubscriptionName string,
pulsarBatchSize int,
pulsarBatchDuration time.Duration,
pulsarSubscriptionType pulsar.SubscriptionType,
converter InstructionConverter[T],
sink Sink[T],
metricsConfig configuration.MetricsConfig,
metrics *commonmetrics.Metrics,
) *IngestionPipeline[T]
NewIngestionPipeline creates an IngestionPipeline that processes all pulsar messages
type InstructionConverter ¶
type InstructionConverter[T HasPulsarMessageIds] interface {
Convert(ctx *armadacontext.Context, msg *EventSequencesWithIds) T
}
InstructionConverter should be implemented by structs that can convert a batch of event sequences into an object suitable for passing to the sink
type Sink ¶
type Sink[T HasPulsarMessageIds] interface {
// Store should persist the sink. The store is responsible for retrying failed attempts and should only return an error
// When it is satisfied that operation cannot be retries.
Store(ctx *armadacontext.Context, msg T) error
}
Sink should be implemented by the struct responsible for putting the data in its final resting place, e.g. a database.