Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Fusion ¶ added in v0.2.0
type Fusion struct {
// contains filtered or unexported fields
}
Fusion represents a fusion streaming pipeline. A fusion instance has a stream source and one or more processing stages.
type LineStream ¶ added in v0.1.4
type LineStream struct { From io.Reader // From is the reader to use. Offset int // Offset to start at. Size int // Number of lines to stream. // contains filtered or unexported fields }
LineStream implements a source using io.Reader. This implementation scans the reader line-by-line and streams each line as a message. If offset is set, 'offset' number of lines are read and skipped. If Size is set, only 'size' number of lines are read after which the source will return EOF.
func (*LineStream) ConsumeFrom ¶ added in v0.2.0
func (rd *LineStream) ConsumeFrom(ctx context.Context) (<-chan Message, error)
ConsumeFrom sets up the source channel and sets up goroutines for writing to it.
func (*LineStream) Err ¶ added in v0.2.0
func (rd *LineStream) Err() error
Err returns the error that caused the source to end.
type Logger ¶
type Logger interface { Debugf(msg string, args ...interface{}) Infof(msg string, args ...interface{}) Warnf(msg string, args ...interface{}) Errorf(msg string, args ...interface{}) }
Logger implementations provide logging facilities for Actor.
type Message ¶
type Message struct { // Payloads can contain one or more message payloads. Payloads []Payload `json:"payloads"` // Ack will be used to signal an ACK/nACK when message has passed // through the pipeline. A no-op value must be set when there is // no need for ack. Ack must be idempotent. If acknowledge fails, // source is free to re-send the message through normal means. // Cause can be set when success=false to send the information // about the reason for failure. Ack func(success bool, cause error) }
Message represents a message with one or more payloads.
type NoOpProcessor ¶ added in v0.2.0
type NoOpProcessor struct{}
NoOpProcessor consumes the messages and simply ignores them.
type Options ¶
type Options struct { Workers int // Logger to use for the fusion instance. If not set, no-op logger // will be set. Logger Logger // DrainWithin is the timeout to wait to properly drain the channel // and send NACKs for all messages. If not set, stream will not be // drained. DrainWithin time.Duration }
Options represents optional configuration values for the fusion instance.
type Processor ¶
type Processor interface { // Processor can apply some processing to the message and return the result. // If the returned message has no payload, fusion will assume end of the // pipeline (i.e., a sink) and call the Ack() on the original message. Process(ctx context.Context, msg Message) (*Message, error) }
Processor represents a processor stage in the stream pipeline. It receives messages from a source or another processor stage from upstream and applies some processing and sends the resultant message downstream.
type ProcessorFunc ¶ added in v0.2.0
ProcessorFunc is an adaptor to allow Go function values as processor implementations.
type Source ¶ added in v0.2.0
type Source interface { // ConsumeFrom should return a channel to which it independently writes // the data stream to. It is the responsibility of this Source to close // the returned channel once the data is exhausted. goroutines spawned // by the source must be tied to the given context and exit when context // is cancelled. ConsumeFrom(ctx context.Context) (<-chan Message, error) }
Source implementation is the source of data in a pipeline.
type SourceFunc ¶ added in v0.2.0
SourceFunc implements a source using a Go function value.
func (SourceFunc) ConsumeFrom ¶ added in v0.2.0
func (sf SourceFunc) ConsumeFrom(ctx context.Context) (<-chan Message, error)
ConsumeFrom launches a goroutine that continuously calls the wrapped function and writes the return message to the channel. Stops when ctx is cancelled or the function returns an error.