fusion

package module
v0.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2020 License: MIT Imports: 7 Imported by: 2

README ¶

💥 Fusion

WIP

Fusion is a tiny stream processing library written in Go.

Documentation ¶

Index ¶

Constants ¶

This section is empty.

Variables ¶

View Source
var (
	// Skip can be returned from proc implementations to signal that the
	// message has been skipped from processing but should still be acked.
	Skip = errors.New("skip message")

	// Fail can be returned from proc implementations to signal that the
	// message should be marked as failed but should be acked. This can
	// be useful when Proc knows the message can never successfully be
	// processed.
	Fail = errors.New("fail message")
)

Functions ¶

This section is empty.

Types ¶

type Fusion ¶ added in v0.2.0

type Fusion struct {
	// contains filtered or unexported fields
}

Fusion represents a fusion streaming pipeline. A fusion instance has a stream source and one or more processing stages.

func New ¶

func New(source Source, opts Options) (*Fusion, error)

New returns a new fusion stream processing pipeline instance with given source and processor stages. If no stage is added, pipeline simply drains the source.

func (*Fusion) Run ¶ added in v0.2.0

func (fu *Fusion) Run(ctx context.Context) error

Run spawns all the worker goroutines and blocks until all of them exit. Worker threads exit when context is cancelled or when source closes. It returns any error that was returned from the source.

type LineStream ¶ added in v0.1.4

type LineStream struct {
	From   io.Reader // From is the reader to use.
	Offset int       // Offset to start at.
	Size   int       // Number of lines to stream.
	Buffer int       // Stream channel buffer size.
	// contains filtered or unexported fields
}

LineStream implements a source using io.Reader. This implementation scans the reader line-by-line and streams each line as a message. If offset is set, 'offset' number of lines are read and skipped. If Size is set, only 'size' number of lines are read after which the source will return EOF.

func (*LineStream) ConsumeFrom ¶ added in v0.2.0

func (rd *LineStream) ConsumeFrom(ctx context.Context) (<-chan Msg, error)

ConsumeFrom sets up the source channel and sets up goroutines for writing to it.

func (*LineStream) Err ¶ added in v0.2.0

func (rd *LineStream) Err() error

Err returns the error that caused the source to end.

type Logger ¶

type Logger interface {
	Debugf(msg string, args ...interface{})
	Infof(msg string, args ...interface{})
	Warnf(msg string, args ...interface{})
	Errorf(msg string, args ...interface{})
}

Logger implementations provide logging facilities for Actor.

type Msg ¶ added in v0.2.4

type Msg struct {
	Key     []byte            `json:"key"`
	Val     []byte            `json:"val"`
	Attribs map[string]string `json:"attribs"`

	// Ack will be used to signal an ACK/nACK when message has passed
	// through the pipeline. A no-op value must be set when there is
	// no need for ack. Ack must be idempotent. If acknowledge fails,
	// source is free to re-send the message through normal means.
	// Cause can be set when success=false to send the information
	// about the reason for failure.
	Ack func(success bool, cause error)
}

Msg represents a message with one or more payloads.

func (*Msg) Clone ¶ added in v0.2.4

func (msg *Msg) Clone() Msg

Clone returns a clone of the original message. Ack function will be set to no-op in the clone. If the value implements Cloner, it will be used to clone the value.

type Options ¶

type Options struct {
	// workers represents the number of workers to launch for reading
	// from the source and invoking stages.
	Workers int

	// Proc represents the processor to be used by the fusion pipeline to
	// process the incoming messages. If not set, a default no-op will be
	// used.
	Proc Proc

	// OnFinish when set, will be called when a proc successfully processes
	// a message or fails by returning Fail or skips by returning Skip.
	OnFinish func(msg Msg, err error)

	// Logger to use for the fusion instance. If not set, no-op logger
	// will be set.
	Logger Logger

	// drainT is the timeout to wait to properly drain the channel
	// and send NACKs for all messages when fusion instance is exiting
	// before stream is exhausted due to a context cancellation etc. If
	// not set, stream will not be drained.
	DrainWithin time.Duration
}

Options represents optional configuration values for the fusion instance.

type Proc ¶ added in v0.2.1

type Proc func(ctx context.Context, msg Msg) error

Proc represents a processor in the stream pipeline. Proc can return nil or Skip or Fail to indicate a terminal state in which case the msg will be acknowledged. If proc returns any other error, it will be nAcked.

type Source ¶ added in v0.2.0

type Source interface {
	// ConsumeFrom should return a channel to which it independently writes
	// the data stream to. It is the responsibility of this Source to close
	// the returned channel once the data is exhausted. goroutines spawned
	// by the source must be tied to the given context and exit when context
	// is cancelled.
	ConsumeFrom(ctx context.Context) (<-chan Msg, error)
}

Source implementation is the source of data in a pipeline.

type SourceFunc ¶ added in v0.2.0

type SourceFunc func(ctx context.Context) (*Msg, error)

SourceFunc implements a source using a Go function value.

func (SourceFunc) ConsumeFrom ¶ added in v0.2.0

func (sf SourceFunc) ConsumeFrom(ctx context.Context) (<-chan Msg, error)

ConsumeFrom launches a goroutine that continuously calls the wrapped function and writes the return message to the channel. Stops when ctx is cancelled or the function returns an error.

Directories ¶

Path Synopsis
examples
reactor module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳