Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoMessage can be returned from stream/queue implementations to indicate that // the stream/queue currently has no message. Actor might switch to polling mode in // this case. ErrNoMessage = errors.New("no message available") // Skip can be returned by processor functions to indicate that the message // should be ignored. Skip = errors.New("skip message") // Failed can be returned from processor functions to indicate that the message // must be failed immediately (i.e., skip retries even if available). Processors // can wrap this message using '%w' directive in fmt functions. Failed = errors.New("fail message") )
Functions ¶
This section is empty.
Types ¶
type Actor ¶
type Actor struct { Logger // contains filtered or unexported fields }
Actor represents an entity that consumes messages from the stream and acts on it. If processing message from the stream fails, it is queued in the configured delay queue if retries are enabled. If retries are not enabled, messages are passed to the configured OnFailure handler.
type Backoff ¶
type Backoff interface { // RetryAfter should return the time duration which should be // elapsed before the next queueForRetry. RetryAfter(msg Message) time.Duration }
Backoff represents a backoff strategy to be used by the actor.
func ConstBackoff ¶
ConstBackoff implements a constant interval Backoff strategy.
type DelayQueue ¶
type DelayQueue interface { // Enqueue must save the message with priority based on the timestamp set. // If no timestamp is set, current timestamp should be assumed. Enqueue(msg Message) error // Dequeue should read one message that has an expired timestamp and call // readFn with it. Success/failure from readFn must be considered as ACK // or nACK respectively. When message is not available, Dequeue should not // block but return ErrNoMessage. Queue can return EOF to indicate that the // queue is fully drained. Other errors from the queue will be logged and // ignored. Dequeue(ctx context.Context, readFn ReadFn) error }
DelayQueue implementation maintains the messages in a timestamp based order. This is used by actor for retries.
type InMemQ ¶
type InMemQ struct {
// contains filtered or unexported fields
}
InMemQ implements an in-memory min-heap based message queue.
type LineStream ¶ added in v0.1.4
type LineStream struct { From io.Reader // From is the reader to use. Offset int // Offset to start at. Size int // Number of lines to stream. // contains filtered or unexported fields }
LineStream implements a stream using io.Reader. This implementation scans the reader line-by-line and streams each line as a message. If offset is set, 'offset' number of lines are read and skipped. If Size is set, only 'size' number of lines are read after which the stream will return EOF.
func (*LineStream) Close ¶ added in v0.1.4
func (rd *LineStream) Close() error
Close closes the underlying reader if supported.
type Logger ¶
type Logger interface { Debugf(msg string, args ...interface{}) Infof(msg string, args ...interface{}) Warnf(msg string, args ...interface{}) Errorf(msg string, args ...interface{}) }
Logger implementations provide logging facilities for Actor.
type Message ¶
type Message struct { Key []byte `json:"key" xml:"key"` Val []byte `json:"val" xml:"val"` // Time at which message arrived or should be processed when scheduled by // retrying logic. (Managed by the actor) Time time.Time `json:"time" xml:"time"` // Attempts is incremented by the actor every time an attempt is done to // process the message. Attempts int `json:"attempts" xml:"attempts"` }
Message represents a message from the stream. Contents of key and value are not validated by the framework itself, but may be validated by the processor functions.
type Options ¶
type Options struct { // Stream is the primary source of messages for the actor. Worker // threads read from stream continuously and process the messages. // If stream is not set, actor will rely entirely on the queue and // manually enqueued messages using actor.Enqueue(). Stream Stream // Queue is used for retries and for manually enqueuing messages // for the actor. If queue is not set, an in-memory queue will be // used. Queue DelayQueue // Processor function to use for processing the messages. If not // set, a no-op processor will be used that skips everything. Processor Processor // Workers is the number of worker goroutines to spawn when actor // starts. Defaults to 1. Workers int // MaxRetries is the number of retry attempts allowed. If not set, // retries are disabled. MaxRetries int // Backoff strategy to be used for retries. Messages that need to // be retried are re-queued to a future time based on the delay // returned by backoff. If not set, retries are disabled. Backoff Backoff // OnFailure if set, will be called when all retries are exhausted // and the processing has not succeeded. If not set, such messages // will simply be logged and ignored. err argument will have error // that occurred in processor in the last retry attempt. If this // returns error, message will not be committed to the stream/queue // where it was read from. OnFailure func(msg Message, err error) error // Logger can be overridden to use custom logger. Logger Logger // PollInterval to use when non-blocking queues/streams return ErrNoMessage. // Defaults to 300ms. PollInterval time.Duration }
Options represents the configuration options for an actor instance.
type Processor ¶
Processor implementations define the logic to be executed by actor on receiving a message from the stream.
type Stream ¶
type Stream interface { // Read should read the next message available in the stream and // call readFn with it. Success/Failure of the readFn invocation // should be used as ACK/NACK respectively. Read(ctx context.Context, readFn ReadFn) error }
Stream represents an immutable stream of messages.