Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // Skip can be returned from proc implementations to signal that the // message has been skipped from processing but should still be acked. Skip = errors.New("skip message") // Fail can be returned from proc implementations to signal that the // message should be marked as failed but should be acked. This can // be useful when Proc knows the message can never successfully be // processed. Fail = errors.New("fail message") )
Functions ¶
This section is empty.
Types ¶
type Fusion ¶ added in v0.2.0
type Fusion struct {
// contains filtered or unexported fields
}
Fusion represents a fusion streaming pipeline. A fusion instance has a stream source and one or more processing stages.
type LineStream ¶ added in v0.1.4
type LineStream struct { From io.Reader // From is the reader to use. Offset int // Offset to start at. Size int // Number of lines to stream. Buffer int // Stream channel buffer size. // contains filtered or unexported fields }
LineStream implements a source using io.Reader. This implementation scans the reader line-by-line and streams each line as a message. If offset is set, 'offset' number of lines are read and skipped. If Size is set, only 'size' number of lines are read after which the source will return EOF.
func (*LineStream) ConsumeFrom ¶ added in v0.2.0
func (rd *LineStream) ConsumeFrom(ctx context.Context) (<-chan Msg, error)
ConsumeFrom sets up the source channel and sets up goroutines for writing to it.
func (*LineStream) Err ¶ added in v0.2.0
func (rd *LineStream) Err() error
Err returns the error that caused the source to end.
type Logger ¶
type Logger interface { Debugf(msg string, args ...interface{}) Infof(msg string, args ...interface{}) Warnf(msg string, args ...interface{}) Errorf(msg string, args ...interface{}) }
Logger implementations provide logging facilities for Actor.
type Msg ¶ added in v0.2.4
type Msg struct { Key []byte `json:"key"` Val []byte `json:"val"` Attribs map[string]string `json:"attribs"` // Ack will be used to signal an ACK/nACK when message has passed // through the pipeline. A no-op value must be set when there is // no need for ack. Ack must be idempotent. If acknowledge fails, // source is free to re-send the message through normal means. // Cause can be set when success=false to send the information // about the reason for failure. Ack func(success bool, cause error) }
Msg represents a message with one or more payloads.
type Options ¶
type Options struct { // workers represents the number of workers to launch for reading // from the source and invoking stages. Workers int // Proc represents the processor to be used by the fusion pipeline to // process the incoming messages. If not set, a default no-op will be // used. Proc Proc // OnFinish when set, will be called when a proc successfully processes // a message or fails by returning Fail or skips by returning Skip. OnFinish func(msg Msg, err error) // Logger to use for the fusion instance. If not set, no-op logger // will be set. Logger Logger // drainT is the timeout to wait to properly drain the channel // and send NACKs for all messages when fusion instance is exiting // before stream is exhausted due to a context cancellation etc. If // not set, stream will not be drained. DrainWithin time.Duration }
Options represents optional configuration values for the fusion instance.
type Proc ¶ added in v0.2.1
Proc represents a processor in the stream pipeline. Proc can return nil or Skip or Fail to indicate a terminal state in which case the msg will be acknowledged. If proc returns any other error, it will be nAcked.
type Source ¶ added in v0.2.0
type Source interface { // ConsumeFrom should return a channel to which it independently writes // the data stream to. It is the responsibility of this Source to close // the returned channel once the data is exhausted. goroutines spawned // by the source must be tied to the given context and exit when context // is cancelled. ConsumeFrom(ctx context.Context) (<-chan Msg, error) }
Source implementation is the source of data in a pipeline.
type SourceFunc ¶ added in v0.2.0
SourceFunc implements a source using a Go function value.
func (SourceFunc) ConsumeFrom ¶ added in v0.2.0
func (sf SourceFunc) ConsumeFrom(ctx context.Context) (<-chan Msg, error)
ConsumeFrom launches a goroutine that continuously calls the wrapped function and writes the return message to the channel. Stops when ctx is cancelled or the function returns an error.