Documentation
¶
Index ¶
- func NextState(pipeline pipeline.Pipeline, state storage.ProcessState, ...) (newState storage.ProcessState, err error)
- type EnvelopeActorPair
- type Manager
- func (m *Manager) Close()
- func (m *Manager) Get(id api.UUID) (*Process, error)
- func (m *Manager) GetByTrigger(emitter api.Actor, envelopeID api.UUID) (*Process, error)
- func (m *Manager) GetEmitterEnvelopeStatus(emitter api.Actor, envelopeID api.UUID) api.Process_Status
- func (m *Manager) ListPostMortem(level api.LogLevel, status ...api.PMProcess_Status) ([]*Process, error)
- func (m *Manager) LookupOrCreate(emitter api.Actor, envelopeID api.UUID) (*Process, error)
- func (m *Manager) Purge(id api.UUID) error
- func (m *Manager) PurgeMany(ids []api.UUID) ([]api.UUID, error)
- type Process
- func (p *Process) ActorProcessingEnd(actorID api.UUID, nodeID string, status api.ActorProcessingState_Status)
- func (p Process) Cancel(reason string) error
- func (p *Process) Export(envelopeStorage storage.EnvelopeStorage, log xbus.Logger) (map[string]interface{}, error)
- func (p *Process) ExportStream(ctx context.Context, envelopeStorage storage.EnvelopeStorage, ...) error
- func (p *Process) GetPostMortemState() (storage.PostMortemState, error)
- func (p *Process) GetResponse() api.UUID
- func (p Process) GetStatus() api.Process_Status
- func (p Process) GetTargets(output storage.NodeOutputRef, envelopeID api.UUID, noRouteTableUpdate bool) ([]api.EnvelopeTarget, error)
- func (p *Process) MatchPipeline(output string, eventTypes []string) error
- func (p Process) Pause() error
- func (p Process) Resume() error
- func (p *Process) SetPostMortemState(state storage.PostMortemState) error
- func (p *Process) UpdateReceiveStatus(envelopeID, actorID api.UUID, outputref storage.NodeOutputRef, ...) error
- type StateEngine
- func (e *StateEngine) EndProcessNode(node storage.ProcessNode, status api.ActorProcessingState_Status, ...)
- func (e *StateEngine) GetNodeState(node storage.ProcessNode) (storage.ProcessNodeState, storage.Node)
- func (e *StateEngine) GetState() storage.ProcessState
- func (e *StateEngine) HitInput(node storage.ProcessNode, input string, complete bool, closed bool, ...)
- func (e *StateEngine) HitOutput(node storage.ProcessNode, output string, status storage.ReceptionStatus, ...)
- func (e *StateEngine) SetNodeState(node storage.ProcessNode, state storage.ProcessNodeState)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NextState ¶
func NextState( pipeline pipeline.Pipeline, state storage.ProcessState, logentry storage.ProcessLogEntry, log xbus.Logger, ) (newState storage.ProcessState, err error)
NextState calculate the next state of a process after aplying a logentry
Types ¶
type EnvelopeActorPair ¶
EnvelopeActorPair contains a pair (EnvelopeID, ActorID)
type Manager ¶
type Manager struct { PipelineManager *pipeline.Manager ActorStorage storage.ActorStorage ProcessStorage storage.ProcessStorage Log xbus.Logger CurrentProcessList []*Process }
Manager manage the processes
func NewManager ¶
func NewManager( pipelineManager *pipeline.Manager, actorStorage storage.ActorStorage, processStorage storage.ProcessStorage, logger xbus.Logger, ) *Manager
NewManager creates a process.Manager
func (*Manager) GetByTrigger ¶
GetByTrigger returns the process matching the given trigger emitter/envelope
func (*Manager) GetEmitterEnvelopeStatus ¶
func (m *Manager) GetEmitterEnvelopeStatus(emitter api.Actor, envelopeID api.UUID) api.Process_Status
GetEmitterEnvelopeStatus returns the status of the process triggered by the given emitter/envelope
func (*Manager) ListPostMortem ¶
func (m *Manager) ListPostMortem(level api.LogLevel, status ...api.PMProcess_Status) ([]*Process, error)
ListPostMortem returns dead processes
func (*Manager) LookupOrCreate ¶
LookupOrCreate returns the Process handling the given envelope or create if necessary
type Process ¶
type Process struct { ID api.UUID CreatedAt time.Time TriggerEmitterID api.UUID TriggerEnvelopeID api.UUID Pipeline *pipeline.Pipeline SourceRef storage.NodeOutputRef // contains filtered or unexported fields }
Process orchestrate a pipeline execution
func NewProcess ¶
func NewProcess( actorStorage storage.ActorStorage, emitterID, envelopeID api.UUID, pipeline *pipeline.Pipeline, source storage.NodeOutputRef, manager *Manager, ) *Process
NewProcess creates a process
func (*Process) ActorProcessingEnd ¶
func (p *Process) ActorProcessingEnd(actorID api.UUID, nodeID string, status api.ActorProcessingState_Status)
ActorProcessingEnd informs the process
func (*Process) Export ¶
func (p *Process) Export(envelopeStorage storage.EnvelopeStorage, log xbus.Logger) (map[string]interface{}, error)
Export exports all the data of the process, including envelopes and logs, as a map suitable for json or yaml export This implementation is not big-envelope friendly and is now deprecated. See ExportStream
func (*Process) ExportStream ¶
func (p *Process) ExportStream( ctx context.Context, envelopeStorage storage.EnvelopeStorage, jobStorage storage.JobStorage, logStorage storage.LogStorage, log xbus.Logger, out io.Writer, exportEnvelope bool, exportJobs bool, exportLogs bool, exportProcessLogs bool, envelopeSizeLimit int64, logLevel api.LogLevel, ) error
ExportStream export the process as a stream, which supports big envelopes and logs
func (*Process) GetPostMortemState ¶
func (p *Process) GetPostMortemState() (storage.PostMortemState, error)
GetPostMortemState set the postmortem state of the process
func (*Process) GetResponse ¶
GetResponse returns the process response envelope id if any
func (Process) GetStatus ¶
func (p Process) GetStatus() api.Process_Status
GetStatus returns the current status of the process
func (Process) GetTargets ¶
func (p Process) GetTargets(output storage.NodeOutputRef, envelopeID api.UUID, noRouteTableUpdate bool) ([]api.EnvelopeTarget, error)
GetTargets returns the targets for an envelope coming from a given emitter
func (*Process) MatchPipeline ¶
MatchPipeline attempts to find the pipeline that will drive the process
func (*Process) SetPostMortemState ¶
func (p *Process) SetPostMortemState(state storage.PostMortemState) error
SetPostMortemState set the postmortem state of the process
func (*Process) UpdateReceiveStatus ¶
func (p *Process) UpdateReceiveStatus(envelopeID, actorID api.UUID, outputref storage.NodeOutputRef, status storage.ReceptionStatus) error
UpdateReceiveStatus is called whenever an envelope fragment is received from a node
type StateEngine ¶
type StateEngine struct {
// contains filtered or unexported fields
}
StateEngine provides api to consistently modify a process state
func (*StateEngine) EndProcessNode ¶
func (e *StateEngine) EndProcessNode( node storage.ProcessNode, status api.ActorProcessingState_Status, timestamp time.Time, )
EndProcessNode signal the end of processing by a process node
func (*StateEngine) GetNodeState ¶
func (e *StateEngine) GetNodeState( node storage.ProcessNode, ) (storage.ProcessNodeState, storage.Node)
GetNodeState returns a copy of a node state and the node graph definition
func (*StateEngine) GetState ¶
func (e *StateEngine) GetState() storage.ProcessState
GetState returns the current state (which is expected to be consistent)
func (*StateEngine) HitInput ¶
func (e *StateEngine) HitInput( node storage.ProcessNode, input string, complete bool, closed bool, timestamp time.Time, )
HitInput signal a data emission to an input
func (*StateEngine) HitOutput ¶
func (e *StateEngine) HitOutput( node storage.ProcessNode, output string, status storage.ReceptionStatus, timestamp time.Time, )
HitOutput signal a data reception from an output
func (*StateEngine) SetNodeState ¶
func (e *StateEngine) SetNodeState(node storage.ProcessNode, state storage.ProcessNodeState)
SetNodeState updates a node state