Documentation
¶
Index ¶
- Variables
- func AwaitTimeoutInsert[Type any, Status StatusType](t *testing.T, w *Workflow[Type, Status], foreignID, runID string, ...)
- func Marshal[T any](t *T) ([]byte, error)
- func MermaidDiagram[Type any, Status StatusType](w *Workflow[Type, Status], path string, d MermaidDirection) error
- func ParseTopic(topic string) (workflowName string, statusType int, err error)
- func Require[Type any, Status StatusType](t *testing.T, w *Workflow[Type, Status], foreignID string, waitFor Status, ...)
- func ToProto(r *WireRecord) *workflowpb.Record
- func Topic(workflowName string, statusType int) string
- func TriggerCallbackOn[Type any, Status StatusType, Payload any](t *testing.T, w *Workflow[Type, Status], foreignID, runID string, ...)
- func Unmarshal[T any](b []byte, t *T) error
- type API
- type Ack
- type AwaitOption
- type BuildOption
- type Builder
- func (b *Builder[Type, Status]) AddCallback(from Status, fn CallbackFunc[Type, Status], to Status)
- func (b *Builder[Type, Status]) AddConnector(name string, c Consumer, cf ConnectorFunc[Type, Status], ...)
- func (b *Builder[Type, Status]) AddStep(from Status, c ConsumerFunc[Type, Status], to Status, opts ...StepOption)
- func (b *Builder[Type, Status]) AddTimeout(from Status, timer TimerFunc[Type, Status], tf TimeoutFunc[Type, Status], ...)
- func (b *Builder[Type, Status]) AddWorkflowConnector(cd WorkflowConnectionDetails, filter ConnectorFilter, from Status, ...)
- func (b *Builder[Type, Status]) Build(eventStreamer EventStreamer, recordStore RecordStore, ...) *Workflow[Type, Status]
- type CallbackFunc
- type ConnectorConsumerFunc
- type ConnectorFilter
- type ConnectorFunc
- type ConnectorOption
- type Consumer
- type ConsumerFunc
- type ConsumerOption
- type ConsumerOptions
- type Cursor
- type Event
- type EventEmitter
- type EventFilter
- type EventStreamer
- type Header
- type MermaidDirection
- type MermaidFormat
- type MermaidTransition
- type Producer
- type Record
- type RecordStore
- type RoleScheduler
- type State
- type StatusType
- type StepOption
- type TestingRecordStore
- type Timeout
- type TimeoutFunc
- type TimeoutOption
- type TimeoutStore
- type TimerFunc
- type TriggerOption
- type WireRecord
- type Workflow
- func (w *Workflow[Type, Status]) Await(ctx context.Context, foreignID, runID string, status Status, ...) (*Record[Type, Status], error)
- func (w *Workflow[Type, Status]) Callback(ctx context.Context, foreignID string, status Status, payload io.Reader) error
- func (w *Workflow[Type, Status]) Run(ctx context.Context)
- func (w *Workflow[Type, Status]) ScheduleTrigger(foreignID string, startingStatus Status, spec string, ...) error
- func (w *Workflow[Type, Status]) States() map[string]State
- func (w *Workflow[Type, Status]) Stop()
- func (w *Workflow[Type, Status]) Trigger(ctx context.Context, foreignID string, startingStatus Status, ...) (runID string, err error)
- type WorkflowConnectionDetails
Constants ¶
This section is empty.
Variables ¶
var ( ErrWorkflowShutdown = errors.New("workflow has been shutdown") ErrCursorNotFound = errors.New("cursor not found") ErrRecordNotFound = errors.New("record not found") ErrTimeoutNotFound = errors.New("timeout not found") ErrRunIDNotFound = errors.New("run ID not found") ErrWorkflowInProgress = errors.New("current workflow still in progress - retry once complete") ErrWorkflowNotRunning = errors.New("trigger failed - workflow is not running") ErrStatusProvidedNotConfigured = errors.New("status provided is not configured for workflow") )
Functions ¶
func AwaitTimeoutInsert ¶
func MermaidDiagram ¶
func MermaidDiagram[Type any, Status StatusType](w *Workflow[Type, Status], path string, d MermaidDirection) error
func ToProto ¶
func ToProto(r *WireRecord) *workflowpb.Record
func TriggerCallbackOn ¶
Types ¶
type API ¶
type API[Type any, Status StatusType] interface { // Trigger will kickstart a workflow for the provided foreignID starting from the provided starting status. There // is no limitation as to where you start the workflow from. For workflows that have data preceding the initial // trigger that needs to be used in the workflow, using WithInitialValue will allow you to provide pre-populated // fields of Type that can be accessed by the consumers. // // foreignID should not be random and should be deterministic for the thing that you are running the workflow for. // This especially helps when connecting other workflows as the foreignID is the only way to connect the streams. The // same goes for Callback as you will need the foreignID to connect the callback back to the workflow instance that // was run. Trigger(ctx context.Context, foreignID string, startingStatus Status, opts ...TriggerOption[Type, Status]) (runID string, err error) // ScheduleTrigger takes a cron spec and will call Trigger at the specified intervals. ScheduleTrigger is a blocking // call and will return ErrWorkflowNotRunning or ErrStatusProvidedNotConfigured to indicate that it cannot begin to // schedule. All schedule errors will be retried indefinitely. The same options are available for ScheduleTrigger // as they are for Trigger. ScheduleTrigger(ctx context.Context, foreignID string, startingStatus Status, spec string, opts ...TriggerOption[Type, Status]) error // Await is a blocking call that returns the typed Record when the workflow of the specified run ID reaches the // specified status. Await(ctx context.Context, foreignID, runID string, status Status, opts ...AwaitOption) (*Record[Type, Status], error) // Callback can be used if Builder.AddCallback has been defined for the provided status. The data in the reader // will be passed to the CallbackFunc that you specify and so the serialisation and deserialisation is in the // hands of the user. Callback(ctx context.Context, foreignID string, status Status, payload io.Reader) error // Run must be called in order to start up all the background consumers / consumers required to run the workflow. Run // only needs to be called once. Any subsequent calls to run are safe and are noop. Run(ctx context.Context) // Stop tells the workflow to shut down gracefully. Stop() }
type Ack ¶
type Ack func() error
Ack is used for the event streamer to update its cursor of what messages have been consumed. If Ack is not called then the event streamer, depending on implementation, will likely not keep track of which records / events have been consumed.
type AwaitOption ¶
type AwaitOption func(o *awaitOpts)
func WithPollingFrequency ¶
func WithPollingFrequency(d time.Duration) AwaitOption
type BuildOption ¶
type BuildOption func(w *buildOptions)
func WithClock ¶
func WithClock(c clock.Clock) BuildOption
func WithDebugMode ¶
func WithDebugMode() BuildOption
type Builder ¶
type Builder[Type any, Status StatusType] struct { // contains filtered or unexported fields }
func NewBuilder ¶
func NewBuilder[Type any, Status StatusType](name string) *Builder[Type, Status]
func (*Builder[Type, Status]) AddCallback ¶
func (b *Builder[Type, Status]) AddCallback(from Status, fn CallbackFunc[Type, Status], to Status)
func (*Builder[Type, Status]) AddConnector ¶
func (b *Builder[Type, Status]) AddConnector(name string, c Consumer, cf ConnectorFunc[Type, Status], opts ...ConnectorOption)
func (*Builder[Type, Status]) AddStep ¶
func (b *Builder[Type, Status]) AddStep(from Status, c ConsumerFunc[Type, Status], to Status, opts ...StepOption)
func (*Builder[Type, Status]) AddTimeout ¶
func (b *Builder[Type, Status]) AddTimeout(from Status, timer TimerFunc[Type, Status], tf TimeoutFunc[Type, Status], to Status, opts ...TimeoutOption)
func (*Builder[Type, Status]) AddWorkflowConnector ¶
func (b *Builder[Type, Status]) AddWorkflowConnector(cd WorkflowConnectionDetails, filter ConnectorFilter, from Status, consumer ConnectorConsumerFunc[Type, Status], to Status, opts ...StepOption)
func (*Builder[Type, Status]) Build ¶
func (b *Builder[Type, Status]) Build(eventStreamer EventStreamer, recordStore RecordStore, timeoutStore TimeoutStore, roleScheduler RoleScheduler, opts ...BuildOption) *Workflow[Type, Status]
type CallbackFunc ¶
type ConnectorConsumerFunc ¶
type ConnectorFilter ¶
ConnectorFilter should return an empty string as the foreignID if the event should be filtered out / skipped, and it should be non-empty if event should be processed. The value of foreignID should match the foreignID of your workflow.
type ConnectorFunc ¶
type ConnectorOption ¶
type ConnectorOption func(co *connectorOptions)
func WithConnectorErrBackOff ¶
func WithConnectorErrBackOff(d time.Duration) ConnectorOption
func WithConnectorParallelCount ¶
func WithConnectorParallelCount(instances int) ConnectorOption
type ConsumerFunc ¶
type ConsumerFunc[Type any, Status StatusType] func(ctx context.Context, r *Record[Type, Status]) (bool, error)
ConsumerFunc provides a record that is expected to be modified if the data needs to change. If true is returned with a nil error then the record, along with its modifications, will be stored. If false is returned with a nil error then the record will not be stored and the event will be skipped and move onto the next event. If a non-nil error is returned then the consumer will back off and try again until a nil error occurs or the retry max has been reached if a Dead Letter Queue has been configured for the workflow.
func Not ¶
func Not[Type any, Status StatusType](c ConsumerFunc[Type, Status]) ConsumerFunc[Type, Status]
type ConsumerOption ¶
type ConsumerOption func(*ConsumerOptions)
func WithConsumerPollFrequency ¶
func WithConsumerPollFrequency(d time.Duration) ConsumerOption
func WithEventFilter ¶
func WithEventFilter(ef EventFilter) ConsumerOption
type ConsumerOptions ¶
type ConsumerOptions struct { PollFrequency time.Duration EventFilter EventFilter }
type Event ¶
type Event struct { // ID is a unique ID for the event generated by the event streamer. ID int64 // ForeignID refers to the ID of a record in the record store. ForeignID int64 // Type relates to the StatusType that the associated record changed to. Type int // Headers stores meta-data in a simple and easily queryable way. Headers map[Header]string // CreatedAt is the time that the event was produced and is generated by the event streamer. CreatedAt time.Time }
type EventEmitter ¶
EventEmitter is a function that gets called before committing the change to the store. The store needs to support transactions if it is implemented as an append only datastore to allow rolling back if the event fails to emit.
type EventFilter ¶
EventFilter can be passed to the event streaming implementation to allow specific consumers to have an earlier on filtering process. True is returned when the event should be skipped.
type EventStreamer ¶
type MermaidDirection ¶
type MermaidDirection string
const ( UnknownDirection MermaidDirection = "" TopToBottomDirection MermaidDirection = "TB" LeftToRightDirection MermaidDirection = "LR" RightToLeftDirection MermaidDirection = "RL" BottomToTopDirection MermaidDirection = "BT" )
type MermaidFormat ¶
type MermaidFormat struct { Direction MermaidDirection StartingPoints []string TerminalPoints []string Transitions []MermaidTransition }
type MermaidTransition ¶
type Record ¶
type Record[Type any, Status StatusType] struct { WireRecord Status Status Object *Type }
type RecordStore ¶
type RecordStore interface { // Store should create or update a record depending on whether the underlying store is mutable or append only. Store // should implement transactions if it is supported especially if the Store is append-only as a new ID for the // record will need to be passed to the event emitter. Store(ctx context.Context, record *WireRecord, eventEmitter EventEmitter) error Lookup(ctx context.Context, id int64) (*WireRecord, error) Latest(ctx context.Context, workflowName, foreignID string) (*WireRecord, error) }
type RoleScheduler ¶
type StepOption ¶
type StepOption func(so *stepOptions)
func WithParallelCount ¶
func WithParallelCount(instances int) StepOption
func WithStepErrBackOff ¶
func WithStepErrBackOff(d time.Duration) StepOption
func WithStepLagAlert ¶
func WithStepLagAlert(d time.Duration) StepOption
func WithStepPollingFrequency ¶
func WithStepPollingFrequency(d time.Duration) StepOption
type TestingRecordStore ¶
type TestingRecordStore interface { RecordStore Snapshots(workflowName, foreignID, runID string) []*WireRecord SetSnapshotOffset(workflowName, foreignID, runID string, offset int) SnapshotOffset(workflowName, foreignID, runID string) int }
type TimeoutFunc ¶
type TimeoutFunc[Type any, Status StatusType] func(ctx context.Context, r *Record[Type, Status], now time.Time) (bool, error)
TimeoutFunc runs once the timeout has expired which is set by TimerFunc. If false is returned with a nil error then the timeout is skipped and not retried at a later date. If a non-nil error is returned the TimeoutFunc will be called again until a nil error is returned. If true is returned with a nil error then the provided record and any modifications made to it will be stored and the status updated - continuing the workflow.
type TimeoutOption ¶
type TimeoutOption func(so *timeoutOptions)
func WithTimeoutErrBackOff ¶
func WithTimeoutErrBackOff(d time.Duration) TimeoutOption
func WithTimeoutLagAlert ¶
func WithTimeoutLagAlert(d time.Duration) TimeoutOption
func WithTimeoutPollingFrequency ¶
func WithTimeoutPollingFrequency(d time.Duration) TimeoutOption
type TimeoutStore ¶
type TimeoutStore interface { Create(ctx context.Context, workflowName, foreignID, runID string, status int, expireAt time.Time) error Complete(ctx context.Context, id int64) error Cancel(ctx context.Context, id int64) error List(ctx context.Context, workflowName string) ([]Timeout, error) ListValid(ctx context.Context, workflowName string, status int, now time.Time) ([]Timeout, error) }
type TimerFunc ¶
type TimerFunc[Type any, Status StatusType] func(ctx context.Context, r *Record[Type, Status], now time.Time) (time.Time, error)
TimerFunc exists to allow the specification of when the timeout should expire dynamically. If not time is set then a timeout will not be created and the event will be skipped. If the time is set then a timeout will be created and once expired TimeoutFunc will be called. Any non-nil error will be retried with backoff.
func DurationTimerFunc ¶
func DurationTimerFunc[Type any, Status StatusType](duration time.Duration) TimerFunc[Type, Status]
func TimeTimerFunc ¶
func TimeTimerFunc[Type any, Status StatusType](t time.Time) TimerFunc[Type, Status]
type TriggerOption ¶
type TriggerOption[Type any, Status StatusType] func(o *triggerOpts[Type, Status])
func WithInitialValue ¶
func WithInitialValue[Type any, Status StatusType](t *Type) TriggerOption[Type, Status]
type WireRecord ¶
type WireRecord struct { ID int64 WorkflowName string ForeignID string RunID string Status int IsStart bool IsEnd bool Object []byte CreatedAt time.Time }
func UnmarshalRecord ¶
func UnmarshalRecord(b []byte) (*WireRecord, error)
func (*WireRecord) ProtoMarshal ¶
func (r *WireRecord) ProtoMarshal() ([]byte, error)
type Workflow ¶
type Workflow[Type any, Status StatusType] struct { Name string // contains filtered or unexported fields }
func (*Workflow[Type, Status]) ScheduleTrigger ¶
func (w *Workflow[Type, Status]) ScheduleTrigger(foreignID string, startingStatus Status, spec string, opts ...TriggerOption[Type, Status]) error
type WorkflowConnectionDetails ¶
type WorkflowConnectionDetails struct { WorkflowName string Status int Stream EventStreamer }