Documentation
¶
Index ¶
- Constants
- func ArgsOf(args ...interface{}) []interface{}
- func IdentityKeyOfFunction(fun interface{}) string
- func ValueFromConetxt(ctx context.Context, key string) string
- func WithValues(ctx context.Context, kvs map[string]string) context.Context
- type Backend
- type Client
- type CronClient
- type DefaultClient
- func (c *DefaultClient) ListTasks(ctx context.Context, group, name string) ([]Task, error)
- func (c *DefaultClient) RemoveTask(ctx context.Context, group, name string, uid string) error
- func (c *DefaultClient) SubmitTask(ctx context.Context, task Task) error
- func (c *DefaultClient) WatchTasks(ctx context.Context, group, name string, ...) error
- type InmemoryBackend
- func (t *InmemoryBackend) Del(ctx context.Context, key string) error
- func (t *InmemoryBackend) Get(ctx context.Context, key string) ([]byte, error)
- func (t *InmemoryBackend) List(ctx context.Context, keyprefix string) (map[string][]byte, error)
- func (t *InmemoryBackend) Pub(ctx context.Context, name string, key string, val []byte) error
- func (t *InmemoryBackend) Put(ctx context.Context, key string, val []byte, ttl ...time.Duration) error
- func (t *InmemoryBackend) Sub(ctx context.Context, name string, onchange OnChangeFunc, opts ...SubOption) error
- func (t *InmemoryBackend) Watch(ctx context.Context, key string, onchange OnChangeFunc) error
- type OnChangeFunc
- type Options
- type RedisBackend
- func (b *RedisBackend) Del(ctx context.Context, key string) error
- func (b *RedisBackend) Get(ctx context.Context, key string) ([]byte, error)
- func (b *RedisBackend) List(ctx context.Context, keyprefix string) (map[string][]byte, error)
- func (b *RedisBackend) Pub(ctx context.Context, name string, key string, val []byte) error
- func (b *RedisBackend) Put(ctx context.Context, key string, val []byte, ttl ...time.Duration) error
- func (b *RedisBackend) Sub(ctx context.Context, name string, onchange OnChangeFunc, opts ...SubOption) error
- func (b *RedisBackend) Watch(ctx context.Context, key string, onchange OnChangeFunc) error
- type RemoteClient
- func (r *RemoteClient) ListTasks(ctx context.Context, group string, name string) ([]Task, error)
- func (r *RemoteClient) RemoveTask(ctx context.Context, group string, name string, uid string) error
- func (r *RemoteClient) SubmitTask(ctx context.Context, task Task) error
- func (r *RemoteClient) WatchTasks(ctx context.Context, group string, name string, ...) error
- type RemoteClientServer
- type RuntimeValuesContext
- type Server
- type Step
- type SubOption
- type SubOptions
- type Task
- type TaskStatus
- type TaskStatusCode
Constants ¶
View Source
const (
DefaultGroup = "workflow-group"
)
View Source
const (
DefaultTaskTimeout = 5 * time.Minute
)
Variables ¶
This section is empty.
Functions ¶
func IdentityKeyOfFunction ¶
func IdentityKeyOfFunction(fun interface{}) string
func ValueFromConetxt ¶
func ValueFromConetxt(ctx context.Context, key string) string
func WithValues ¶
func WithValues(ctx context.Context, kvs map[string]string) context.Context
Types ¶
type Backend ¶
type Backend interface {
// 队列
Sub(ctx context.Context, name string, onchange OnChangeFunc, opts ...SubOption) error
// 这里的sub要求多个消费者共享同一个topic下的数据,且无重复。
Pub(ctx context.Context, name string, key string, val []byte) error
// kv存储
Get(ctx context.Context, key string) ([]byte, error)
Put(ctx context.Context, key string, val []byte, ttl ...time.Duration) error
Del(ctx context.Context, key string) error
List(ctx context.Context, keyprefix string) (map[string][]byte, error)
Watch(ctx context.Context, key string, onchange OnChangeFunc) error
}
type Client ¶
type Client interface {
SubmitTask(ctx context.Context, task Task) error
ListTasks(ctx context.Context, group, name string) ([]Task, error)
RemoveTask(ctx context.Context, group, name string, uid string) error
WatchTasks(ctx context.Context, group, name string, onchange func(ctx context.Context, task *Task) error) error
}
func NewClientFromBackend ¶
func NewClientFromBackend(backend Backend) Client
type CronClient ¶ added in v1.24.4
type CronClient struct {
Client
// contains filtered or unexported fields
}
func NewCronSubmiter ¶ added in v1.24.4
func NewCronSubmiter(client Client) *CronClient
func (*CronClient) SubmitCronTask ¶ added in v1.24.4
func (s *CronClient) SubmitCronTask(ctx context.Context, task Task, crontabexp string) error
type DefaultClient ¶ added in v1.24.4
type DefaultClient struct {
// contains filtered or unexported fields
}
func (*DefaultClient) ListTasks ¶ added in v1.24.4
func (c *DefaultClient) ListTasks(ctx context.Context, group, name string) ([]Task, error)
func (*DefaultClient) RemoveTask ¶ added in v1.24.4
func (c *DefaultClient) RemoveTask(ctx context.Context, group, name string, uid string) error
func (*DefaultClient) SubmitTask ¶ added in v1.24.4
func (c *DefaultClient) SubmitTask(ctx context.Context, task Task) error
func (*DefaultClient) WatchTasks ¶ added in v1.24.4
func (c *DefaultClient) WatchTasks(ctx context.Context, group, name string, onchange func(ctx context.Context, task *Task) error) error
type InmemoryBackend ¶ added in v1.24.4
type InmemoryBackend struct {
// contains filtered or unexported fields
}
worker 仅允许一个实例启动,并且队列中的任务仅存在内存中,不支持持久化。
func NewInmemoryBackend ¶ added in v1.24.4
func NewInmemoryBackend(ctx context.Context) *InmemoryBackend
func (*InmemoryBackend) Del ¶ added in v1.24.4
func (t *InmemoryBackend) Del(ctx context.Context, key string) error
Del implements Backend.
func (*InmemoryBackend) Get ¶ added in v1.24.4
func (t *InmemoryBackend) Get(ctx context.Context, key string) ([]byte, error)
Get implements Backend.
func (*InmemoryBackend) List ¶ added in v1.24.4
func (t *InmemoryBackend) List(ctx context.Context, keyprefix string) (map[string][]byte, error)
List implements Backend.
func (*InmemoryBackend) Pub ¶ added in v1.24.4
func (t *InmemoryBackend) Pub(ctx context.Context, name string, key string, val []byte) error
Pub implements Backend.
func (*InmemoryBackend) Put ¶ added in v1.24.4
func (t *InmemoryBackend) Put(ctx context.Context, key string, val []byte, ttl ...time.Duration) error
Put implements Backend.
type OnChangeFunc ¶
type OnChangeFunc func(ctx context.Context, key string, val []byte) error
type Options ¶
type Options struct {
Addr string `json:"addr,omitempty"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
}
type RedisBackend ¶
type RedisBackend struct {
// contains filtered or unexported fields
}
func NewRedisBackendFromClient ¶
func NewRedisBackendFromClient(c *redis.Client) *RedisBackend
func (*RedisBackend) Get ¶
func (b *RedisBackend) Get(ctx context.Context, key string) ([]byte, error)
func (*RedisBackend) List ¶
func (b *RedisBackend) List(ctx context.Context, keyprefix string) (map[string][]byte, error)
func (*RedisBackend) Pub ¶
func (b *RedisBackend) Pub(ctx context.Context, name string, key string, val []byte) error
func (*RedisBackend) Put ¶
func (b *RedisBackend) Put(ctx context.Context, key string, val []byte, ttl ...time.Duration) error
kv存储
type RemoteClient ¶ added in v1.24.4
type RemoteClient struct {
Address string
// contains filtered or unexported fields
}
func NewDefaultRemoteClient ¶ added in v1.24.4
func NewDefaultRemoteClient() *RemoteClient
func NewRemoteClient ¶ added in v1.24.4
func NewRemoteClient(address string) *RemoteClient
func (*RemoteClient) ListTasks ¶ added in v1.24.4
func (r *RemoteClient) ListTasks(ctx context.Context, group string, name string) ([]Task, error)
ListTasks implements Client.
func (*RemoteClient) RemoveTask ¶ added in v1.24.4
func (r *RemoteClient) RemoveTask(ctx context.Context, group string, name string, uid string) error
RemoveTask implements Client.
func (*RemoteClient) SubmitTask ¶ added in v1.24.4
func (r *RemoteClient) SubmitTask(ctx context.Context, task Task) error
SubmitTask implements Client.
func (*RemoteClient) WatchTasks ¶ added in v1.24.4
func (r *RemoteClient) WatchTasks(ctx context.Context, group string, name string, onchange func(ctx context.Context, task *Task) error) error
WatchTasks implements Client.
type RemoteClientServer ¶ added in v1.24.4
type RemoteClientServer struct {
Client Client
}
func NewRemoteClientServer ¶ added in v1.24.4
func NewRemoteClientServer(client Client) *RemoteClientServer
type RuntimeValuesContext ¶
type RuntimeValuesContext struct {
// contains filtered or unexported fields
}
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
func NewServerFromBackend ¶
func NewServerFromBackend(backend Backend) *Server
type Step ¶
type Step struct {
Name string `json:"name,omitempty"`
Function string `json:"function,omitempty"` // 任务所使用的 函数/组件/插件
Args []interface{} `json:"args,omitempty"` // 对应的参数
SubSteps []Step `json:"subSteps,omitempty"` // 子任务
Status *TaskStatus `json:"status,omitempty"`
}
type SubOption ¶
type SubOption func(o *SubOptions)
func WithAutoACK ¶
func WithAutoACK(ack bool) SubOption
func WithConcurrency ¶
func WithConcurrency(con int) SubOption
type SubOptions ¶
type SubOptions struct {
AutoACK bool // 自动确认,无论结果是否为 error
Concurrency int // 支持的并发数量
}
type Task ¶
type Task struct {
UID string `json:"uid,omitempty"`
Name string `json:"name,omitempty"` // 任务名称,例如 更新镜像,同步数据等。
Group string `json:"group,omitempty"` // 任务类型分组
Steps []Step `json:"steps,omitempty"`
CreationTimestamp metav1.Time `json:"creationTimestamp,omitempty"`
Addtionals map[string]string `json:"addtionals,omitempty"` // 额外信息
Status *TaskStatus `json:"status,omitempty"`
}
type TaskStatus ¶
type TaskStatus struct {
StartTimestamp metav1.Time `json:"startTimestamp,omitempty"`
FinishTimestamp metav1.Time `json:"finishTimestamp,omitempty"`
Status TaskStatusCode `json:"status,omitempty"`
Result []interface{} `json:"result,omitempty"`
Executer string `json:"executer,omitempty"`
Message string `json:"message,omitempty"`
}
type TaskStatusCode ¶
type TaskStatusCode string
const (
TaskStatusPending TaskStatusCode = "Pending"
TaskStatusRunning TaskStatusCode = "Running"
TaskStatusSuccess TaskStatusCode = "Success"
TaskStatusError TaskStatusCode = "Error"
)
Click to show internal directories.
Click to hide internal directories.