Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Backoff ¶
type Backoff interface { // RetryAfter should return the time duration which should be // elapsed before the next queueForRetry. RetryAfter(attempts int) time.Duration }
Backoff represents a backoff strategy to be used by the Retrier.
func ConstBackoff ¶
ConstBackoff implements a constant interval Backoff strategy.
type DelayQueue ¶
type DelayQueue interface { // Enqueue must save the message with priority based on the timestamp set. // If no timestamp is set, current timestamp should be assumed. Enqueue(item Item) error // Dequeue should read one message that has an expired timestamp and call // readFn with it. Success/failure from readFn must be considered as ACK // or nACK respectively. When message is not available, Dequeue should not // block but return ErrNoMessage. queue can return EOF to indicate that the // queue is fully drained. Other errors from the queue will be logged and // ignored. Dequeue(ctx context.Context, readFn ReadFn) error }
DelayQueue implementation maintains the messages in a timestamp based order. This is used by retrier for retries.
type InMemQ ¶
type InMemQ struct {
// contains filtered or unexported fields
}
InMemQ implements an in-memory min-heap based message queue.
type Item ¶
type Item struct { Message fusion.Msg `json:"message"` Attempts int `json:"attempts"` NextAttempt time.Time `json:"next_attempt"` LastAttempt time.Time `json:"last_attempt"` }
Item is maintained by the delay queue and tracks the retries done etc.
type Retrier ¶
type Retrier struct { // Proc is the fusion Proc that must be executed for each message. // This field must be set. Proc fusion.Proc // Queue can be set to use a delay queue. If not set, an in-memory // queue will be used. Queue DelayQueue // Backoff can be set to configure a backoff strategy to be used // during retries. If not set, constant backoff strategy will be // used with 1s intervals. Backoff Backoff // MaxRetries can be set to control how many retries should be done // before returning Fail status. Defaults to 3. MaxRetries int // EnqueueWorkers is the number of worker threads to use for moving // messages from stream to the queue. EnqueueWorkers int // ProcWorkers is the number of main worker threads to use for running // proc. ProcWorkers int // OnFailure is called when a message fails and exhausts all retries. // If not set, such messages will be logged and discarded. OnFailure func(item Item) // Log can be set to customise logging mechanism used by retrier. If // not set, logging will be disabled. Log fusion.Log }
Retrier is a fusion Proc that can wrap other Proc implementation to provide automatic retries.
Click to show internal directories.
Click to hide internal directories.