fusion

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2020 License: MIT Imports: 7 Imported by: 2

README ¶

💥 Fusion

WIP

Fusion is a tiny stream processing library written in Go.

Documentation ¶

Index ¶

Constants ¶

This section is empty.

Variables ¶

This section is empty.

Functions ¶

This section is empty.

Types ¶

type Fusion ¶ added in v0.2.0

type Fusion struct {
	// contains filtered or unexported fields
}

Fusion represents a fusion streaming pipeline. A fusion instance has a stream source and one or more processing stages.

func New ¶

func New(source Source, stages []Processor, opts Options) (*Fusion, error)

New returns a new fusion stream processing pipeline instance with given source and processor stages.

func (*Fusion) Run ¶ added in v0.2.0

func (fu *Fusion) Run(ctx context.Context) error

Run spawns all the worker goroutines and blocks until all of them exit. Worker threads exit when context is cancelled or when source closes. It returns any error that was returned from the source.

type LineStream ¶ added in v0.1.4

type LineStream struct {
	From   io.Reader // From is the reader to use.
	Offset int       // Offset to start at.
	Size   int       // Number of lines to stream.
	// contains filtered or unexported fields
}

LineStream implements a source using io.Reader. This implementation scans the reader line-by-line and streams each line as a message. If offset is set, 'offset' number of lines are read and skipped. If Size is set, only 'size' number of lines are read after which the source will return EOF.

func (*LineStream) ConsumeFrom ¶ added in v0.2.0

func (rd *LineStream) ConsumeFrom(ctx context.Context) (<-chan Message, error)

ConsumeFrom sets up the source channel and sets up goroutines for writing to it.

func (*LineStream) Err ¶ added in v0.2.0

func (rd *LineStream) Err() error

Err returns the error that caused the source to end.

type Logger ¶

type Logger interface {
	Debugf(msg string, args ...interface{})
	Infof(msg string, args ...interface{})
	Warnf(msg string, args ...interface{})
	Errorf(msg string, args ...interface{})
}

Logger implementations provide logging facilities for Actor.

type Message ¶

type Message struct {
	// Payloads can contain one or more message payloads.
	Payloads []Payload `json:"payloads"`

	// Ack will be used to signal an ACK/nACK when message has passed
	// through the pipeline. A no-op value must be set when there is
	// no need for ack. Ack must be idempotent. If acknowledge fails,
	// source is free to re-send the message through normal means.
	// Cause can be set when success=false to send the information
	// about the reason for failure.
	Ack func(success bool, cause error)
}

Message represents a message with one or more payloads.

func (*Message) Clone ¶ added in v0.2.0

func (msg *Message) Clone() Message

Clone returns a deep clone of the original message. Ack function will be set to no-op in the clone.

type NoOpProcessor ¶ added in v0.2.0

type NoOpProcessor struct{}

NoOpProcessor consumes the messages and simply ignores them.

func (NoOpProcessor) Process ¶ added in v0.2.0

func (NoOpProcessor) Process(_ context.Context, _ Message) (*Message, error)

type Options ¶

type Options struct {
	Workers int

	// Logger to use for the fusion instance. If not set, no-op logger
	// will be set.
	Logger Logger

	// DrainWithin is the timeout to wait to properly drain the channel
	// and send NACKs for all messages. If not set, stream will not be
	// drained.
	DrainWithin time.Duration
}

Options represents optional configuration values for the fusion instance.

type Payload ¶ added in v0.2.0

type Payload struct {
	Key []byte `json:"key"`
	Val []byte `json:"val"`
}

Payload represents a key-value pair of arbitrary data.

func (Payload) Clone ¶ added in v0.2.0

func (p Payload) Clone() Payload

Clone returns a depp clone of the payload.

type Processor ¶

type Processor interface {
	// Processor can apply some processing to the message and return the result.
	// If the returned message has no payload, fusion will assume end of the
	// pipeline (i.e., a sink) and call the Ack() on the original message.
	Process(ctx context.Context, msg Message) (*Message, error)
}

Processor represents a processor stage in the stream pipeline. It receives messages from a source or another processor stage from upstream and applies some processing and sends the resultant message downstream.

type ProcessorFunc ¶ added in v0.2.0

type ProcessorFunc func(ctx context.Context, msg Message) (*Message, error)

ProcessorFunc is an adaptor to allow Go function values as processor implementations.

func (ProcessorFunc) Process ¶ added in v0.2.0

func (pf ProcessorFunc) Process(ctx context.Context, msg Message) (*Message, error)

Process dispatches the call to the wrapped function value.

type Source ¶ added in v0.2.0

type Source interface {
	// ConsumeFrom should return a channel to which it independently writes
	// the data stream to. It is the responsibility of this Source to close
	// the returned channel once the data is exhausted. goroutines spawned
	// by the source must be tied to the given context and exit when context
	// is cancelled.
	ConsumeFrom(ctx context.Context) (<-chan Message, error)
}

Source implementation is the source of data in a pipeline.

type SourceFunc ¶ added in v0.2.0

type SourceFunc func(ctx context.Context) (*Message, error)

SourceFunc implements a source using a Go function value.

func (SourceFunc) ConsumeFrom ¶ added in v0.2.0

func (sf SourceFunc) ConsumeFrom(ctx context.Context) (<-chan Message, error)

ConsumeFrom launches a goroutine that continuously calls the wrapped function and writes the return message to the channel. Stops when ctx is cancelled or the function returns an error.

Directories ¶

Path Synopsis
examples
reactor module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳