temporalnexus

package
v1.34.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2025 License: MIT Imports: 17 Imported by: 6

Documentation

Overview

Package temporalnexus provides utilities for exposing Temporal constructs as Nexus Operations.

Nexus RPC is a modern open-source service framework for arbitrary-length operations whose lifetime may extend beyond a traditional RPC. Nexus was designed with durable execution in mind, as an underpinning to connect durable executions within and across namespaces, clusters and regions – with a clean API contract to streamline multi-team collaboration. Any service can be exposed as a set of sync or async Nexus operations – the latter provides an operation identity and a uniform interface to get the status of an operation or its result, receive a completion callback, or cancel the operation.

Temporal leverages the Nexus RPC protocol to facilitate calling across namespace and cluster and boundaries.

See also:

Nexus over HTTP Spec: https://github.com/nexus-rpc/api/blob/main/SPEC.md

Nexus Go SDK: https://github.com/nexus-rpc/sdk-go

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConvertLinkWorkflowEventToNexusLink(we *commonpb.Link_WorkflowEvent) nexus.Link

ConvertLinkWorkflowEventToNexusLink converts a Link_WorkflowEvent type to Nexus Link.

NOTE: Experimental

func ConvertNexusLinkToLinkWorkflowEvent added in v1.29.0

func ConvertNexusLinkToLinkWorkflowEvent(link nexus.Link) (*commonpb.Link_WorkflowEvent, error)

ConvertNexusLinkToLinkWorkflowEvent converts a Nexus Link to Link_WorkflowEvent.

NOTE: Experimental

func GetClient added in v1.33.0

func GetClient(ctx context.Context) client.Client

GetClient returns a client to be used in a Nexus operation's context, this is the same client that the worker was created with. Client methods will panic when called from the test environment.

func GetLogger added in v1.29.0

func GetLogger(ctx context.Context) log.Logger

GetLogger returns a logger to be used in a Nexus operation's context.

func GetMetricsHandler added in v1.29.0

func GetMetricsHandler(ctx context.Context) metrics.Handler

GetMetricsHandler returns a metrics handler to be used in a Nexus operation's context.

func MustNewWorkflowRunOperationWithOptions

func MustNewWorkflowRunOperationWithOptions[I, O any](options WorkflowRunOperationOptions[I, O]) nexus.Operation[I, O]

MustNewWorkflowRunOperation map an operation to a workflow run with the given options. Panics if invalid options are provided.

func NewSyncOperation deprecated

func NewSyncOperation[I any, O any](
	name string,
	handler func(context.Context, client.Client, I, nexus.StartOperationOptions) (O, error),
) nexus.Operation[I, O]

NewSyncOperation is a helper for creating a synchronous-only nexus.Operation from a given name and handler function. The handler is passed the client that the worker was created with. Sync operations are useful for exposing short-lived Temporal client requests, such as signals, queries, sync update, list workflows, etc...

Deprecated: Use nexus.NewSyncOperation and get the client via temporalnexus.GetClient

Example
package main

import (
	"context"

	"github.com/nexus-rpc/sdk-go/nexus"
	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/temporalnexus"
	"go.temporal.io/sdk/worker"
)

type MyInput struct {
	ID string
}

type MyQueryOutput struct {
}

func main() {
	opRead := nexus.NewSyncOperation("my-read-only-operation", func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (MyQueryOutput, error) {
		var ret MyQueryOutput
		res, err := temporalnexus.GetClient(ctx).QueryWorkflow(ctx, input.ID, "", "some-query", nil)
		if err != nil {
			return ret, err
		}
		return ret, res.Get(&ret)
	})

	// Operations don't have to return values.
	opWrite := nexus.NewSyncOperation("my-write-operation", func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (nexus.NoValue, error) {
		return nil, temporalnexus.GetClient(ctx).SignalWorkflow(ctx, input.ID, "", "some-signal", nil)
	})

	service := nexus.NewService("my-service")
	_ = service.Register(opRead, opWrite)

	c, _ := client.Dial(client.Options{
		HostPort:  "localhost:7233",
		Namespace: "my-namespace",
	})
	w := worker.New(c, "my-task-queue", worker.Options{})
	w.RegisterNexusService(service)
}
Output:

func NewWorkflowRunOperation

func NewWorkflowRunOperation[I, O any](
	name string,
	workflow func(workflow.Context, I) (O, error),
	getOptions func(context.Context, I, nexus.StartOperationOptions) (client.StartWorkflowOptions, error),
) nexus.Operation[I, O]

NewWorkflowRunOperation maps an operation to a workflow run.

Example
package main

import (
	"context"

	"github.com/nexus-rpc/sdk-go/nexus"
	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/temporalnexus"
	"go.temporal.io/sdk/worker"
	"go.temporal.io/sdk/workflow"
)

type MyOutput struct {
}

type MyInput struct {
	ID string
}

func MyHandlerWorkflow(workflow.Context, MyInput) (MyOutput, error) {
	return MyOutput{}, nil
}

func main() {
	op := temporalnexus.NewWorkflowRunOperation(
		"my-async-operation",
		MyHandlerWorkflow,
		func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
			return client.StartWorkflowOptions{
				// Workflow ID is required and must be deterministically generated from the input in order
				// for the operation to be idempotent as the request to start the operation may be retried.
				ID: input.ID,
			}, nil
		})

	service := nexus.NewService("my-service")
	_ = service.Register(op)

	c, _ := client.Dial(client.Options{
		HostPort:  "localhost:7233",
		Namespace: "my-namespace",
	})
	w := worker.New(c, "my-task-queue", worker.Options{})
	w.RegisterWorkflow(MyHandlerWorkflow)
	w.RegisterNexusService(service)
}
Output:

func NewWorkflowRunOperationWithOptions

func NewWorkflowRunOperationWithOptions[I, O any](options WorkflowRunOperationOptions[I, O]) (nexus.Operation[I, O], error)

NewWorkflowRunOperation map an operation to a workflow run with the given options. Returns an error if invalid options are provided.

Example
package main

import (
	"context"

	"github.com/nexus-rpc/sdk-go/nexus"
	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/temporalnexus"
	"go.temporal.io/sdk/worker"
	"go.temporal.io/sdk/workflow"
)

type MyWorkflowInput struct {
}

type MyOutput struct {
}

type MyInput struct {
	ID string
}

func MyHandlerWorkflow(workflow.Context, MyInput) (MyOutput, error) {
	return MyOutput{}, nil
}

func MyHandlerWorkflowWithAlternativeInput(workflow.Context, MyWorkflowInput) (MyOutput, error) {
	return MyOutput{}, nil
}

func main() {
	// Alternative 1 - long form version of NewWorkflowRunOperation.
	opAlt1, _ := temporalnexus.NewWorkflowRunOperationWithOptions(
		temporalnexus.WorkflowRunOperationOptions[MyInput, MyOutput]{
			Name:     "my-async-op-1",
			Workflow: MyHandlerWorkflow,
			GetOptions: func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
				return client.StartWorkflowOptions{
					// Workflow ID is required and must be deterministically generated from the input in order
					// for the operation to be idempotent as the request to start the operation may be retried.
					ID: input.ID,
				}, nil
			},
		})

	// Alternative 2 - start a workflow with alternative inputs.
	opAlt2, _ := temporalnexus.NewWorkflowRunOperationWithOptions(
		temporalnexus.WorkflowRunOperationOptions[MyInput, MyOutput]{
			Name: "my-async-op-2",
			Handler: func(ctx context.Context, input MyInput, opts nexus.StartOperationOptions) (temporalnexus.WorkflowHandle[MyOutput], error) {
				// Workflows started with this API must take a single input and return single output.
				// To start workflows with different signatures, use ExecuteUntypedWorkflow.
				return temporalnexus.ExecuteWorkflow(ctx, opts, client.StartWorkflowOptions{
					// Workflow ID is required and must be deterministically generated from the input in order
					// for the operation to be idempotent as the request to start the operation may be retried.
					ID: input.ID,
				}, MyHandlerWorkflowWithAlternativeInput, MyWorkflowInput{})
			},
		})

	service := nexus.NewService("my-service")
	_ = service.Register(opAlt1, opAlt2)

	c, _ := client.Dial(client.Options{
		HostPort:  "localhost:7233",
		Namespace: "my-namespace",
	})
	w := worker.New(c, "my-task-queue", worker.Options{})
	w.RegisterWorkflow(MyHandlerWorkflow)
	w.RegisterWorkflow(MyHandlerWorkflowWithAlternativeInput)
	w.RegisterNexusService(service)
}
Output:

Types

type WorkflowHandle

type WorkflowHandle[T any] interface {
	// ID is the workflow's ID.
	ID() string
	// ID is the workflow's run ID.
	RunID() string
	// contains filtered or unexported methods
}

WorkflowHandle is a readonly representation of a workflow run backing a Nexus operation. It's created via the ExecuteWorkflow and ExecuteUntypedWorkflow methods.

func ExecuteUntypedWorkflow

func ExecuteUntypedWorkflow[R any](
	ctx context.Context,
	nexusOptions nexus.StartOperationOptions,
	startWorkflowOptions client.StartWorkflowOptions,
	workflow any,
	args ...any,
) (WorkflowHandle[R], error)

ExecuteUntypedWorkflow starts a workflow with by function reference or string name, linking the execution chain to a Nexus operation. Useful for invoking workflows that don't follow the single argument - single return type signature. See ExecuteWorkflow for more information.

func ExecuteWorkflow

func ExecuteWorkflow[I, O any, WF func(workflow.Context, I) (O, error)](
	ctx context.Context,
	nexusOptions nexus.StartOperationOptions,
	startWorkflowOptions client.StartWorkflowOptions,
	workflow WF,
	arg I,
) (WorkflowHandle[O], error)

ExecuteWorkflow starts a workflow run for a WorkflowRunOperationOptions Handler, linking the execution chain to a Nexus operation (subsequent runs started from continue-as-new and retries). Automatically propagates the callback and request ID from the nexus options to the workflow.

type WorkflowRunOperationOptions

type WorkflowRunOperationOptions[I, O any] struct {
	// Operation name.
	Name string
	// Workflow function to map this operation to. The operation input maps directly to workflow input.
	// The workflow name is resolved as it would when using this function in client.ExecuteOperation.
	// GetOptions must be provided when setting this option. Mutually exclusive with Handler.
	Workflow func(workflow.Context, I) (O, error)
	// Options for starting the workflow. Must be set if Workflow is set. Mutually exclusive with Handler.
	// The options returned must include a workflow ID that is deterministically generated from the input in order
	// for the operation to be idempotent as the request to start the operation may be retried.
	// TaskQueue is optional and defaults to the current worker's task queue.
	// WorkflowExecutionErrorWhenAlreadyStarted is ignored and always set to true.
	// WorkflowIDConflictPolicy is by default set to fail if a workflow is already running. That is,
	// if a caller executes another operation that starts the same workflow, it will fail. You can set
	// it to WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING to attach the caller's callback to the existing
	// running workflow. This way, all attached callers will be notified when the workflow completes.
	GetOptions func(context.Context, I, nexus.StartOperationOptions) (client.StartWorkflowOptions, error)
	// Handler for starting a workflow with a different input than the operation. Mutually exclusive with Workflow
	// and GetOptions.
	Handler func(context.Context, I, nexus.StartOperationOptions) (WorkflowHandle[O], error)
}

WorkflowRunOperationOptions are options for NewWorkflowRunOperationWithOptions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳