flow

package module
v0.18.37 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2025 License: BSD-3-Clause Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WorkflowFilterWorkflowId            WorkflowFilterName = "WorkflowId"
	WorkflowFilterWorkflowType          WorkflowFilterName = "WorkflowType"
	WorkflowFilterTemporalScheduledById WorkflowFilterName = "TemporalScheduledById"
	WorkflowFilterTaskQueue             WorkflowFilterName = "TaskQueue"
	WorkflowFilterExecutionStatus       WorkflowFilterName = "ExecutionStatus"

	ExecutionStatusCompleted      = "Completed"
	ExecutionStatusFailed         = "Failed"
	ExecutionStatusRunning        = "Running"
	ExecutionStatusTerminated     = "Terminated"
	ExecutionStatusCanceled       = "Canceled"
	ExecutionStatusTimedOut       = "TimedOut"
	ExecutionStatusContinuedAsNew = "ContinuedAsNew"
)

Variables

This section is empty.

Functions

func AND

func AND(a, b string) string

func EQ

func EQ(name WorkflowFilterName, value string) string

func GT

func GT(name WorkflowFilterName, value string) string

func IN

func IN(name WorkflowFilterName, value ...string) string

func IsApplicationError added in v0.18.36

func IsApplicationError(err error) (bool, *temporal.ApplicationError)

IsApplicationError return if the err is a ApplicationError

func LT

func LT(name WorkflowFilterName, value string) string

func MutableSideEffect

func MutableSideEffect[T comparable](ctx Context, id string, fn func() T) (T, error)

func OR

func OR(a, b string) string

func SelectorAddFuture

func SelectorAddFuture[T any](s Selector, f Future[T], fn func(f Future[T]))

SelectorAddFuture registers a callback function to be called when a future is ready. The callback is called when Select(ctx) is called. The callback is called once per ready future even if Select is called multiple times for the same Selector instance.

func SelectorAddReceive

func SelectorAddReceive[
	T any,
	CH interface {
		Channel[T] | SignalChannel[T]
		underlying() workflow.ReceiveChannel
	},
](
	s Selector, ch CH, fn func(ch CH, more bool),
)

SelectorAddReceive registers a callback function to be called when a channel has a message to receive. The callback is called when Select(ctx) is called. The message is expected be consumed by the callback function. The branch is automatically removed after the channel is closed and callback function is called once with more parameter set to false.

func SideEffect

func SideEffect[T any](ctx Context, fn func() T) (T, error)

func StartsWith

func StartsWith(name WorkflowFilterName, value string) string

func WrapError

func WrapError(ctx context.Context, err error) error

Types

type Activity

type Activity[REQ, RES, STATE any] struct {
	Name string
	Fn   ActivityFunc[REQ, RES, STATE]
	// contains filtered or unexported fields
}

func NewActivity

func NewActivity[REQ, RES, STATE any](
	name string,
	fn ActivityFunc[REQ, RES, STATE],
) Activity[REQ, RES, STATE]

func (*Activity[REQ, RES, InitArg]) Execute

func (a *Activity[REQ, RES, InitArg]) Execute(ctx Context, req REQ, opts ExecuteActivityOptions) Future[RES]

func (*Activity[REQ, RES, InitArg]) ExecuteLocal

func (a *Activity[REQ, RES, InitArg]) ExecuteLocal(ctx Context, req REQ, opts ExecuteActivityOptions) Future[RES]

func (*Activity[REQ, RES, STATE]) Init

func (a *Activity[REQ, RES, STATE]) Init(sdk *SDK)

func (*Activity[REQ, RES, STATE]) InitWithState added in v0.18.33

func (a *Activity[REQ, RES, STATE]) InitWithState(sdk *SDK, state STATE)

type ActivityContext

type ActivityContext[REQ, RES, STATE any] struct {
	// contains filtered or unexported fields
}

func (ActivityContext[REQ, RES, STATE]) Context

func (ctx ActivityContext[REQ, RES, STATE]) Context() context.Context

func (ActivityContext[REQ, RES, STATE]) Info

func (ctx ActivityContext[REQ, RES, STATE]) Info() ActivityInfo

func (ActivityContext[REQ, RES, STATE]) Log

func (ctx ActivityContext[REQ, RES, STATE]) Log() log.Logger

func (ActivityContext[REQ, RES, STATE]) S added in v0.18.33

func (ctx ActivityContext[REQ, RES, STATE]) S() STATE

func (ActivityContext[REQ, RES, STATE]) SetState added in v0.18.33

func (ctx ActivityContext[REQ, RES, STATE]) SetState(state STATE)

func (ActivityContext[REQ, RES, STATE]) State added in v0.18.33

func (ctx ActivityContext[REQ, RES, STATE]) State() STATE

func (ActivityContext[REQ, RES, STATE]) WorkflowID

func (ctx ActivityContext[REQ, RES, STATE]) WorkflowID() string

type ActivityFunc

type ActivityFunc[REQ, RES, STATE any] func(ctx *ActivityContext[REQ, RES, STATE], req REQ) (*RES, error)

type ActivityInfo

type ActivityInfo = activity.Info

type CancelWorkflowRequest

type CancelWorkflowRequest struct {
	WorkflowID string
	RunID      string
}

type CancelWorkflowResponse

type CancelWorkflowResponse struct {
	Success bool
}

type Channel

type Channel[T any] struct {
	// contains filtered or unexported fields
}

func NewBufferedChannel added in v0.18.19

func NewBufferedChannel[T any](ctx Context, size int) Channel[T]

func NewChannel

func NewChannel[T any](ctx Context) Channel[T]

func NewNamedBufferedChannel added in v0.18.19

func NewNamedBufferedChannel[T any](ctx Context, name string, size int) Channel[T]

func NewNamedChannel added in v0.18.19

func NewNamedChannel[T any](ctx Context, name string) Channel[T]

func (Channel[T]) Close

func (ch Channel[T]) Close()

Close the Channel, and prohibit subsequent sends.

func (Channel[T]) Len

func (ch Channel[T]) Len() int

Len returns the number of buffered messages plus the number of blocked Send calls.

func (Channel[T]) Name

func (ch Channel[T]) Name() string

Name returns the name of the Channel. If the Channel was retrieved from a GetSignalChannel call, Name returns the signal name.

A Channel created without an explicit name will use a generated name by the SDK and is not deterministic.

func (Channel[T]) Receive

func (ch Channel[T]) Receive(ctx Context) (value T, more bool)

Receive blocks until it receives a value, and then assigns the received value to the provided pointer. Returns false when Channel is closed. Parameter valuePtr is a pointer to the expected data structure to be received. For example:

var v string
c.Receive(ctx, &v)

Note, values should not be reused for extraction here because merging on top of existing values may result in unexpected behavior similar to json.Unmarshal.

func (Channel[T]) ReceiveAsync

func (ch Channel[T]) ReceiveAsync() (value T, ok bool)

ReceiveAsync try to receive from Channel without blocking. If there is data available from the Channel, it assign the data to valuePtr and returns true. Otherwise, it returns false immediately.

Note, values should not be reused for extraction here because merging on top of existing values may result in unexpected behavior similar to json.Unmarshal.

func (Channel[T]) ReceiveAsyncWithMoreFlag

func (ch Channel[T]) ReceiveAsyncWithMoreFlag() (value T, ok bool, more bool)

ReceiveAsyncWithMoreFlag is same as ReceiveAsync with extra return value more to indicate if there could be more value from the Channel. The more is false when Channel is closed.

Note, values should not be reused for extraction here because merging on top of existing values may result in unexpected behavior similar to json.Unmarshal.

func (Channel[T]) ReceiveWithTimeout

func (ch Channel[T]) ReceiveWithTimeout(ctx Context, timeout time.Duration) (value T, ok, more bool)

ReceiveWithTimeout blocks up to timeout until it receives a value, and then assigns the received value to the provided pointer. Returns more value of false when Channel is closed. Returns ok value of false when no value was found in the channel for the duration of timeout or the ctx was canceled. The valuePtr is not modified if ok is false. Parameter valuePtr is a pointer to the expected data structure to be received. For example:

var v string
c.ReceiveWithTimeout(ctx, time.Minute, &v)

Note, values should not be reused for extraction here because merging on top of existing values may result in unexpected behavior similar to json.Unmarshal.

func (Channel[T]) Send

func (ch Channel[T]) Send(ctx Context, v T)

Send blocks until the data is sent.

func (Channel[T]) SendAsync

func (ch Channel[T]) SendAsync(v T) (ok bool)

SendAsync try to send without blocking. It returns true if the data was sent, otherwise it returns false.

type Config

type Config struct {
	HostPort  string
	Namespace string
	TaskQueue string
}

type Context

type Context = workflow.Context

type CountWorkflowRequest

type CountWorkflowRequest struct {
	Query string
}

type CountWorkflowResponse

type CountWorkflowResponse struct {
	Total  int64
	Counts map[string]int64
}

type CreateScheduleRequest

type CreateScheduleRequest struct {
	ID               string
	Action           ScheduleAction
	Spec             ScheduleSpec
	CatchupWindow    time.Duration
	RemainingActions int
	OverlapPolicy    ScheduleOverlapPolicy

	ExecutionTimeout time.Duration
	RunTimeout       time.Duration
	SearchAttributes SearchAttributes
	TimezoneName     string
}

type EMPTY

type EMPTY struct{}

type ExecuteActivityOptions

type ExecuteActivityOptions struct {
	// ScheduleToCloseTimeout - Total time that a workflow is willing to wait for an Activity to complete.
	// ScheduleToCloseTimeout limits the total time of an Activity's execution, including retries
	// 		(use StartToCloseTimeout to limit the time of a single attempt).
	// The zero value of this uses the default value.
	// Either this option or StartToCloseTimeout is required: Defaults to unlimited.
	ScheduleToCloseTimeout time.Duration

	// ScheduleToStartTimeout - Time that the Activity Task can stay in the Task Queue before it is picked up by
	// a Worker. Do not specify this timeout unless using host-specific Task Queues for Activity Tasks are being
	// used for routing. In almost all situations that don't involve routing activities to specific hosts, it is
	// better to rely on the default value.
	// ScheduleToStartTimeout is always non-retryable. Retrying after this timeout doesn't make sense, as it would
	// just put the Activity Task back into the same Task Queue.
	// Optional: Defaults to unlimited.
	ScheduleToStartTimeout time.Duration

	// StartToCloseTimeout - Maximum time of a single Activity execution attempt.
	// Note that the Temporal Server doesn't detect Worker process failures directly. It relies on this timeout
	// to detect that an Activity that didn't complete on time. So this timeout should be as short as the longest
	// possible execution of the Activity body. Potentially long-running Activities must specify HeartbeatTimeout
	// and call Activity.RecordHeartbeat(ctx, "my-heartbeat") periodically for timely failure detection.
	// Either this option or ScheduleToCloseTimeout is required: Defaults to the ScheduleToCloseTimeout value.
	StartToCloseTimeout time.Duration
	RetryPolicy         *RetryPolicy
}

type ExecuteChildWorkflowOptions

type ExecuteChildWorkflowOptions struct {
	// WorkflowID of the child workflow to be scheduled.
	// Optional: an auto generated workflowID will be used if this is not provided.
	WorkflowID string

	// TaskQueue that the child workflow needs to be scheduled on.
	// Optional: the parent workflow task queue will be used if this is not provided.
	TaskQueue string

	// WorkflowExecutionTimeout - The end-to-end timeout for the child workflow execution including retries
	// and continue as new.
	// Optional: defaults to unlimited.
	WorkflowExecutionTimeout time.Duration

	// WorkflowRunTimeout - The timeout for a single run of the child workflow execution. Each retry or
	// continue as new should obey this timeout. Use WorkflowExecutionTimeout to specify how long the parent
	// is willing to wait for the child completion.
	// Optional: defaults to WorkflowExecutionTimeout
	WorkflowRunTimeout time.Duration

	// WorkflowTaskTimeout - Maximum execution time of a single Workflow Task. In the majority of cases there is
	// no need to change this timeout. Note that this timeout is not related to the overall Workflow duration in
	// any way. It defines for how long the Workflow can get blocked in the case of a Workflow Worker crash.
	// Default is 10 seconds. The Maximum value allowed by the Temporal Server is 1 minute.
	WorkflowTaskTimeout time.Duration

	// WaitForCancellation - Whether to wait for a canceled child workflow to be ended (child workflow can be ended
	// as: completed/failed/timeout/terminated/canceled)
	// Optional: default false
	WaitForCancellation bool

	// WorkflowIDReusePolicy - Whether server allow reuse of workflow ID, can be useful
	// for dedupe logic if set to WorkflowIdReusePolicyRejectDuplicate
	WorkflowIDReusePolicy enumspb.WorkflowIdReusePolicy

	// RetryPolicy specify how to retry child workflow if error happens.
	// Optional: default is no retry
	RetryPolicy *RetryPolicy

	// ParentClosePolicy specify how the retry child workflow get terminated.
	// default is Terminate
	ParentClosePolicy enumspb.ParentClosePolicy
}

type ExecuteWorkflowOptions

type ExecuteWorkflowOptions struct {
	// ID – The business identifier of the workflow execution.
	// Optional: defaulted to an uuid.
	ID string
	// WorkflowExecutionTimeout – The timeout for the duration of workflow execution.
	// It includes retries and continues as new. Use WorkflowRunTimeout to limit the execution time
	// of a single workflow run.
	// The resolution is seconds.
	// Optional: defaulted to unlimited.
	WorkflowExecutionTimeout time.Duration

	// WorkflowRunTimeout – The timeout for the duration of a single workflow run.
	// The resolution is seconds.
	// Optional: defaulted to WorkflowExecutionTimeout.
	WorkflowRunTimeout time.Duration

	// WorkflowTaskTimeout – The timeout for processing a workflow task from the time the worker
	// pulled this task. If a workflow task is lost, it is retried after this timeout.
	// The resolution is seconds.
	// Optional: defaulted to 10 secs.
	WorkflowTaskTimeout time.Duration
	// StartDelay – Time to wait before dispatching the first workflow task.
	// If the workflow gets a signal before the delay, a workflow task will be dispatched and the rest
	// of the delay will be ignored. A signal from signal with start will not trigger a workflow task.
	// Cannot be set at the same time as a CronSchedule.
	StartDelay time.Duration
	// WorkflowIDReusePolicy
	// Specifies server behavior if a *completed* workflow with the same id exists.
	// This can be useful for dedupe logic if set to RejectDuplicate
	// Optional: defaulted to AllowDuplicate.
	WorkflowIDReusePolicy WorkflowIdReusePolicy

	// WorkflowIDConflictPolicy
	// Specifies server behavior if a *running* workflow with the same id exists.
	// This cannot be set if WorkflowIDReusePolicy is set to TerminateIfRunning.
	// Optional: defaulted to Fail.
	WorkflowIDConflictPolicy WorkflowIdConflictPolicy
}

type Future

type Future[M any] struct {
	// contains filtered or unexported fields
}

func ExecuteActivity

func ExecuteActivity[REQ, RES, IA any](
	ctx Context, act Activity[REQ, RES, IA], req REQ, opts ExecuteActivityOptions,
) Future[RES]

func ExecuteActivityLocal

func ExecuteActivityLocal[REQ, RES, IA any](
	ctx Context, act Activity[REQ, RES, IA], req REQ, opts ExecuteActivityOptions,
) Future[RES]

func (Future[M]) Get

func (f Future[M]) Get(ctx Context) (*M, error)

func (Future[M]) IsReady

func (f Future[M]) IsReady() bool

type GetWorkflowHistoryRequest

type GetWorkflowHistoryRequest struct {
	WorkflowID  string
	RunID       string
	Skip        int
	Limit       int
	OnlyLastOne bool
}

type GetWorkflowHistoryResponse

type GetWorkflowHistoryResponse struct {
	Events []HistoryEvent
}

type GetWorkflowRequest added in v0.18.34

type GetWorkflowRequest struct {
	WorkflowID string
	RunID      string
}

type HistoryEvent

type HistoryEvent struct {
	ID      int64
	Type    string
	Time    int64
	Payload map[string]any
}

type RetryPolicy

type RetryPolicy = temporal.RetryPolicy

type SDK

type SDK struct {
	// contains filtered or unexported fields
}

func NewSDK

func NewSDK(cfg Config) (*SDK, error)

func (*SDK) CancelWorkflow

func (sdk *SDK) CancelWorkflow(ctx context.Context, req CancelWorkflowRequest) (*CancelWorkflowResponse, error)

func (*SDK) CountWorkflows

func (sdk *SDK) CountWorkflows(ctx context.Context, req CountWorkflowRequest) (*CountWorkflowResponse, error)

func (*SDK) CreateSchedule

func (sdk *SDK) CreateSchedule(ctx context.Context, req CreateScheduleRequest) (ScheduleHandle, error)

func (*SDK) DeleteSchedule

func (sdk *SDK) DeleteSchedule(ctx context.Context, id string) error

func (*SDK) GetSchedule

func (sdk *SDK) GetSchedule(ctx context.Context, id string) ScheduleHandle

func (*SDK) GetWorkflow added in v0.18.34

func (sdk *SDK) GetWorkflow(ctx context.Context, req GetWorkflowRequest) (*WorkflowExecution, error)

func (*SDK) GetWorkflowHistory

func (sdk *SDK) GetWorkflowHistory(
	ctx context.Context, req GetWorkflowHistoryRequest,
) (*GetWorkflowHistoryResponse, error)

func (*SDK) ListSchedules

func (sdk *SDK) ListSchedules(ctx context.Context, query string, pageSize int) (ScheduleListIterator, error)

func (*SDK) SearchWorkflows

func (sdk *SDK) SearchWorkflows(ctx context.Context, req SearchWorkflowRequest) (*SearchWorkflowResponse, error)

func (*SDK) Signal added in v0.18.34

func (sdk *SDK) Signal(ctx context.Context, workflowID, signalName string, arg any) error

func (*SDK) Start

func (sdk *SDK) Start() error

func (*SDK) Stop

func (sdk *SDK) Stop()

func (*SDK) TaskQueue

func (sdk *SDK) TaskQueue() string

func (*SDK) TogglePause

func (sdk *SDK) TogglePause(ctx context.Context, id string, pause bool) error

func (*SDK) Trigger added in v0.18.23

func (sdk *SDK) Trigger(ctx context.Context, id string) error

func (*SDK) UpdateWorkflowRetentionPeriod

func (sdk *SDK) UpdateWorkflowRetentionPeriod(ctx context.Context, d time.Duration) error

type ScheduleAction

type ScheduleAction struct {
	WorkflowIDPrefix string
	WorkflowName     string
	WorkflowArg      any
	SearchAttributes SearchAttributes
	RetryPolicy      *RetryPolicy
}

type ScheduleActionResult

type ScheduleActionResult = client.ScheduleActionResult

type ScheduleEntry

type ScheduleEntry = client.ScheduleListEntry

type ScheduleHandle

type ScheduleHandle = client.ScheduleHandle

type ScheduleListIterator

type ScheduleListIterator = client.ScheduleListIterator

type ScheduleOverlapPolicy

type ScheduleOverlapPolicy int32

ScheduleOverlapPolicy controls what happens when a workflow would be started by a schedule and is already running.

const (
	ScheduleOverlapPolicyUnspecified ScheduleOverlapPolicy = 0
	// ScheduleOverlapPolicySkip (default) means don't start anything. When the
	// workflow completes, the next scheduled event after that time will be considered.
	ScheduleOverlapPolicySkip ScheduleOverlapPolicy = 1
	// ScheduleOverlapPolicyBufferOne means start the workflow again as soon as the
	// current one completes, but only buffer one start in this way. If another start is
	// supposed to happen when the workflow is running, and one is already buffered, then
	// only the first one will be started after the running workflow finishes.
	ScheduleOverlapPolicyBufferOne ScheduleOverlapPolicy = 2
	// ScheduleOverlapPolicyBufferAll means buffer up any number of starts to all
	// happen sequentially, immediately after the running workflow completes.
	ScheduleOverlapPolicyBufferAll ScheduleOverlapPolicy = 3
	// ScheduleOverlapPolicyCancelOther means that if there is another workflow
	// running, cancel it, and start the new one after the old one completes cancellation.
	ScheduleOverlapPolicyCancelOther ScheduleOverlapPolicy = 4
	// ScheduleOverlapPolicyTerminateOther means that if there is another workflow
	// running, terminate it and start the new one immediately.
	ScheduleOverlapPolicyTerminateOther ScheduleOverlapPolicy = 5
	// ScheduleOverlapPolicyAllowAll means to start any number of concurrent workflows.
	// Note that with this policy, the last completion result and
	// last failure will not be available since workflows are not sequential.
	ScheduleOverlapPolicyAllowAll ScheduleOverlapPolicy = 6
)

type ScheduleSpec

type ScheduleSpec struct {
	Second     int
	Minute     int
	Hour       int
	Month      int
	Year       int
	DayOfWeek  time.Weekday
	DayOfMonth int // between 1 and 31 inclusive

	StartTime time.Time
	EndTime   time.Time
	Jitter    time.Duration
	Timezone  string
}

type SearchAttributeUpdate

type SearchAttributeUpdate = temporal.SearchAttributeUpdate

func AttrBool

func AttrBool(name string, val bool) SearchAttributeUpdate

func AttrInt64

func AttrInt64(name string, val int64) SearchAttributeUpdate

func AttrKeywords

func AttrKeywords(name string, val []string) SearchAttributeUpdate

func AttrString

func AttrString(name string, val string) SearchAttributeUpdate

type SearchAttributes

type SearchAttributes = temporal.SearchAttributes

func NewSearchAttributes

func NewSearchAttributes(attributes ...SearchAttributeUpdate) SearchAttributes

type SearchWorkflowRequest

type SearchWorkflowRequest struct {
	NextPageToken []byte
	Query         string
}

type SearchWorkflowResponse

type SearchWorkflowResponse struct {
	Executions    []WorkflowExecution
	NextPageToken []byte
}

type Selector

type Selector workflow.Selector

type Signal added in v0.18.33

type Signal[T any] struct {
	Name string
}

func (Signal[T]) GetChannel added in v0.18.33

func (s Signal[T]) GetChannel(ctx Context) SignalChannel[T]

func (Signal[T]) Send added in v0.18.33

func (s Signal[T]) Send(ctx Context, workflowID string, arg T)

type SignalChannel added in v0.18.33

type SignalChannel[T any] struct {
	// contains filtered or unexported fields
}

func NewSignalChannel added in v0.18.33

func NewSignalChannel[T any](ctx Context, name string) SignalChannel[T]

func (SignalChannel[T]) Receive added in v0.18.33

func (ch SignalChannel[T]) Receive(ctx Context) (value T, more bool)

Receive blocks until it receives a value, and then assigns the received value to the provided pointer. Returns false when Channel is closed. Parameter valuePtr is a pointer to the expected data structure to be received. For example:

var v string
c.Receive(ctx, &v)

Note, values should not be reused for extraction here because merging on top of existing values may result in unexpected behavior similar to json.Unmarshal.

func (SignalChannel[T]) ReceiveAsync added in v0.18.33

func (ch SignalChannel[T]) ReceiveAsync() (value T, ok bool)

ReceiveAsync try to receive from Channel without blocking. If there is data available from the Channel, it assign the data to valuePtr and returns true. Otherwise, it returns false immediately.

Note, values should not be reused for extraction here because merging on top of existing values may result in unexpected behavior similar to json.Unmarshal.

func (SignalChannel[T]) ReceiveAsyncWithMoreFlag added in v0.18.33

func (ch SignalChannel[T]) ReceiveAsyncWithMoreFlag() (value T, ok bool, more bool)

ReceiveAsyncWithMoreFlag is same as ReceiveAsync with extra return value more to indicate if there could be more value from the Channel. The more is false when Channel is closed.

Note, values should not be reused for extraction here because merging on top of existing values may result in unexpected behavior similar to json.Unmarshal.

func (SignalChannel[T]) ReceiveWithTimeout added in v0.18.33

func (ch SignalChannel[T]) ReceiveWithTimeout(ctx Context, timeout time.Duration) (value T, ok, more bool)

ReceiveWithTimeout blocks up to timeout until it receives a value, and then assigns the received value to the provided pointer. Returns more value of false when Channel is closed. Returns ok value of false when no value was found in the channel for the duration of timeout or the ctx was canceled. The valuePtr is not modified if ok is false. Parameter valuePtr is a pointer to the expected data structure to be received. For example:

var v string
c.ReceiveWithTimeout(ctx, time.Minute, &v)

Note, values should not be reused for extraction here because merging on top of existing values may result in unexpected behavior similar to json.Unmarshal.

type UpdateNamespaceRequest

type UpdateNamespaceRequest struct {
	Description                      *string
	WorkflowExecutionRetentionPeriod *time.Duration
}

type WaitGroup

type WaitGroup = workflow.WaitGroup

type Workflow

type Workflow[REQ, RES, STATE any] struct {
	Name  string
	State STATE
	Fn    WorkflowFunc[REQ, RES, STATE]
	// contains filtered or unexported fields
}

func NewWorkflow

func NewWorkflow[REQ, RES, STATE any](
	name string,
	fn WorkflowFunc[REQ, RES, STATE],
) Workflow[REQ, RES, STATE]

func NewWorkflowWithState added in v0.18.33

func NewWorkflowWithState[REQ, RES, STATE any](
	name string, state STATE,
	fn WorkflowFunc[REQ, RES, STATE],
) Workflow[REQ, RES, STATE]

func (*Workflow[REQ, RES, STATE]) Execute

func (w *Workflow[REQ, RES, STATE]) Execute(
	ctx context.Context, req REQ, opts ExecuteWorkflowOptions,
) (*WorkflowRun[RES], error)

func (*Workflow[REQ, RES, STATE]) ExecuteAsChild

func (w *Workflow[REQ, RES, STATE]) ExecuteAsChild(
	ctx Context,
	req REQ,
	opts ExecuteChildWorkflowOptions,
) WorkflowFuture[RES]

func (*Workflow[REQ, RES, STATE]) Init

func (w *Workflow[REQ, RES, STATE]) Init(sdk *SDK)

func (*Workflow[REQ, RES, STATE]) InitWithState added in v0.18.33

func (w *Workflow[REQ, RES, STATE]) InitWithState(sdk *SDK, s STATE)

type WorkflowContext

type WorkflowContext[REQ, RES, STATE any] struct {
	// contains filtered or unexported fields
}

func (WorkflowContext[REQ, RES, STATE]) Context

func (ctx WorkflowContext[REQ, RES, STATE]) Context() workflow.Context

func (WorkflowContext[REQ, RES, STATE]) Info

func (ctx WorkflowContext[REQ, RES, STATE]) Info() *WorkflowInfo

func (WorkflowContext[REQ, RES, STATE]) Log

func (ctx WorkflowContext[REQ, RES, STATE]) Log() log.Logger

func (WorkflowContext[REQ, RES, STATE]) NamedSelector added in v0.18.33

func (ctx WorkflowContext[REQ, RES, STATE]) NamedSelector(name string) Selector

func (WorkflowContext[REQ, RES, STATE]) S added in v0.18.33

func (ctx WorkflowContext[REQ, RES, STATE]) S() STATE

func (WorkflowContext[REQ, RES, STATE]) Selector

func (ctx WorkflowContext[REQ, RES, STATE]) Selector() Selector

func (*WorkflowContext[REQ, RES, STATE]) SetState added in v0.18.33

func (ctx *WorkflowContext[REQ, RES, STATE]) SetState(s STATE)

func (WorkflowContext[REQ, RES, STATE]) Sleep

func (ctx WorkflowContext[REQ, RES, STATE]) Sleep(d time.Duration) error

func (WorkflowContext[REQ, RES, STATE]) State added in v0.18.33

func (ctx WorkflowContext[REQ, RES, STATE]) State() STATE

func (WorkflowContext[REQ, RES, STATE]) Timer

func (ctx WorkflowContext[REQ, RES, STATE]) Timer(d time.Duration) Future[temporal.CanceledError]

func (WorkflowContext[REQ, RES, STATE]) WaitGroup

func (ctx WorkflowContext[REQ, RES, STATE]) WaitGroup() WaitGroup

type WorkflowExecution

type WorkflowExecution struct {
	Name        string
	WorkflowID  string
	RunID       string
	HistorySize int64
	Memo        string
	StartTime   time.Time
	CloseTime   time.Time
	Duration    time.Duration
	Status      string
}

type WorkflowFilterName

type WorkflowFilterName string

type WorkflowFunc

type WorkflowFunc[REQ, RES, STATE any] func(ctx *WorkflowContext[REQ, RES, STATE], req REQ) (*RES, error)

type WorkflowFuture

type WorkflowFuture[M any] struct {
	// contains filtered or unexported fields
}

func (WorkflowFuture[M]) Get

func (f WorkflowFuture[M]) Get(ctx Context) (*M, error)

func (WorkflowFuture[M]) IsReady

func (f WorkflowFuture[M]) IsReady() bool

type WorkflowIdConflictPolicy

type WorkflowIdConflictPolicy int32

WorkflowIdConflictPolicy Defines what to do when trying to start a workflow with the same workflow id as a *running* workflow. Note that it is *never* valid to have two actively running instances of the same workflow id.

See `WorkflowIdReusePolicy` for handling workflow id duplication with a *closed* workflow.

const (
	WorkflowIdConflictPolicyUnspecified WorkflowIdConflictPolicy = 0
	// WorkflowIdConflictPolicyFail
	// Don't start a new workflow; instead return `WorkflowExecutionAlreadyStartedFailure`.
	WorkflowIdConflictPolicyFail WorkflowIdConflictPolicy = 1
	// WorkflowIdConflictPolicyUseExisting
	// Don't start a new workflow; instead return a workflow handle for the running workflow.
	WorkflowIdConflictPolicyUseExisting WorkflowIdConflictPolicy = 2
	// WorkflowIdConflictPolicyTerminateExisting
	// Terminate the running workflow before starting a new one.
	WorkflowIdConflictPolicyTerminateExisting WorkflowIdConflictPolicy = 3
)

type WorkflowIdReusePolicy

type WorkflowIdReusePolicy int32

WorkflowIdReusePolicy Defines whether to allow re-using a workflow id from a previously *closed* workflow. If the request is denied, a `WorkflowExecutionAlreadyStartedFailure` is returned.

See `WorkflowIdConflictPolicy` for handling workflow id duplication with a *running* workflow.

const (
	WorkflowIdReusePolicyUnspecified WorkflowIdReusePolicy = 0
	// WorkflowIdReusePolicyAllowDuplicate
	// Allow starting a workflow execution using the same workflow id.
	WorkflowIdReusePolicyAllowDuplicate WorkflowIdReusePolicy = 1
	// WorkflowIdReusePolicyAllowDuplicateFailedOnly
	// Allow starting a workflow execution using the same workflow id, only when the last
	// execution's final state is one of [terminated, cancelled, timed out, failed].
	WorkflowIdReusePolicyAllowDuplicateFailedOnly WorkflowIdReusePolicy = 2
	// WorkflowIdReusePolicyRejectDuplicate
	// Do not permit re-use of the workflow id for this workflow. Future start workflow requests
	// could potentially change the policy, allowing re-use of the workflow id.
	WorkflowIdReusePolicyRejectDuplicate WorkflowIdReusePolicy = 3
	// WorkflowIdReusePolicyTerminateIfRunning
	// This option belongs in WorkflowIdConflictPolicy but is here for backwards compatibility.
	// If specified, it acts like ALLOW_DUPLICATE, but also the WorkflowId*Conflict*Policy on
	// the request is treated as WorkflowIdConflictPolicyTerminateExisting.
	// If no running workflow, then the behavior is the same as ALLOW_DUPLICATE.
	WorkflowIdReusePolicyTerminateIfRunning WorkflowIdReusePolicy = 4
)

type WorkflowInfo

type WorkflowInfo = workflow.Info

type WorkflowRun

type WorkflowRun[T any] struct {
	ID    string
	RunID string
	// contains filtered or unexported fields
}

func (WorkflowRun[T]) Get

func (x WorkflowRun[T]) Get(ctx context.Context) (*T, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳