Documentation
¶
Index ¶
- type Config
- type ErrorHandler
- type ErrorHandlerFunc
- type GroupAggregator
- type GroupAggregatorFunc
- type Handler
- type LogLevel
- type Logger
- type Option
- type OptionType
- type RedisClientOpt
- type RedisClusterClientOpt
- type RedisConnOpt
- type RedisFailoverClientOpt
- type ResultWriter
- type RetryDelayFunc
- type Server
- type Task
- type TaskInfo
- type TaskState
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // Maximum number of concurrent processing of tasks. // // If set to a zero or negative value, NewServer will overwrite the value // to the number of CPUs usable by the current process. Concurrency int // BaseContext optionally specifies a function that returns the base context for Handler invocations on this server. // // If BaseContext is nil, the default is context.Background(). // If this is defined, then it MUST return a non-nil context BaseContext func() context.Context // Function to calculate retry delay for a failed task. // // By default, it uses exponential backoff algorithm to calculate the delay. RetryDelayFunc RetryDelayFunc // Predicate function to determine whether the error returned from Handler is a failure. // If the function returns false, Server will not increment the retried counter for the task, // and Server won't record the queue stats (processed and failed stats) to avoid skewing the error // rate of the queue. // // By default, if the given error is non-nil the function returns true. IsFailure func(error) bool // List of queues to process with given priority value. Keys are the names of the // queues and values are associated priority value. // // If set to nil or not specified, the server will process only the "default" queue. // // Priority is treated as follows to avoid starving low priority queues. // // Example: // // Queues: map[string]int{ // "critical": 6, // "default": 3, // "low": 1, // } // // With the above config and given that all queues are not empty, the tasks // in "critical", "default", "low" should be processed 60%, 30%, 10% of // the time respectively. // // If a queue has a zero or negative priority value, the queue will be ignored. Queues map[string]int // StrictPriority indicates whether the queue priority should be treated strictly. // // If set to true, tasks in the queue with the highest priority is processed first. // The tasks in lower priority queues are processed only when those queues with // higher priorities are empty. StrictPriority bool // ErrorHandler handles errors returned by the task handler. // // HandleError is invoked only if the task handler returns a non-nil error. // // Example: // // func reportError(ctx context, task *redisq.Task, err error) { // retried, _ := redisq.GetRetryCount(ctx) // maxRetry, _ := redisq.GetMaxRetry(ctx) // if retried >= maxRetry { // err = fmt.Errorf("retry exhausted for task %s: %w", task.Type, err) // } // errorReportingService.Notify(err) // }) // // ErrorHandler: redisq.ErrorHandlerFunc(reportError) ErrorHandler ErrorHandler // Logger specifies the logger used by the server instance. // // If unset, default logger is used. Logger Logger // LogLevel specifies the minimum log level to enable. // // If unset, InfoLevel is used by default. LogLevel LogLevel // ShutdownTimeout specifies the duration to wait to let workers finish their tasks // before forcing them to abort when stopping the server. // // If unset or zero, default timeout of 8 seconds is used. ShutdownTimeout time.Duration // HealthCheckFunc is called periodically with any errors encountered during ping to the // connected redis server. HealthCheckFunc func(error) // HealthCheckInterval specifies the interval between healthchecks. // // If unset or zero, the interval is set to 15 seconds. HealthCheckInterval time.Duration // DelayedTaskCheckInterval specifies the interval between checks run on 'scheduled' and 'retry' // tasks, and forwarding them to 'pending' state if they are ready to be processed. // // If unset or zero, the interval is set to 5 seconds. DelayedTaskCheckInterval time.Duration // GroupGracePeriod specifies the amount of time the server will wait for an incoming task before aggregating // the tasks in a group. If an incoming task is received within this period, the server will wait for another // period of the same length, up to GroupMaxDelay if specified. // // If unset or zero, the grace period is set to 1 minute. // Minimum duration for GroupGracePeriod is 1 second. If value specified is less than a second, the call to // NewServer will panic. GroupGracePeriod time.Duration // GroupMaxDelay specifies the maximum amount of time the server will wait for incoming tasks before aggregating // the tasks in a group. // // If unset or zero, no delay limit is used. GroupMaxDelay time.Duration // GroupMaxSize specifies the maximum number of tasks that can be aggregated into a single task within a group. // If GroupMaxSize is reached, the server will aggregate the tasks into one immediately. // // If unset or zero, no size limit is used. GroupMaxSize int // GroupAggregator specifies the aggregation function used to aggregate multiple tasks in a group into one task. // // If unset or nil, the group aggregation feature will be disabled on the server. GroupAggregator GroupAggregator }
Config specifies the server's background-task processing behavior.
type ErrorHandler ¶
An ErrorHandler handles an error occurred during task processing.
type ErrorHandlerFunc ¶
The ErrorHandlerFunc type is an adapter to allow the use of ordinary functions as a ErrorHandler. If f is a function with the appropriate signature, ErrorHandlerFunc(f) is a ErrorHandler that calls f.
func (ErrorHandlerFunc) HandleError ¶
func (fn ErrorHandlerFunc) HandleError(ctx context.Context, task *Task, err error)
HandleError calls fn(ctx, task, err)
type GroupAggregator ¶
type GroupAggregator interface { // Aggregate aggregates the given tasks in a group with the given group name, // and returns a new task which is the aggregation of those tasks. // // Use NewTask(typename, payload, opts...) to set any options for the aggregated task. // The Queue option, if provided, will be ignored and the aggregated task will always be enqueued // to the same queue the group belonged. Aggregate(group string, tasks []*Task) *Task }
GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler.
type GroupAggregatorFunc ¶
The GroupAggregatorFunc type is an adapter to allow the use of ordinary functions as a GroupAggregator. If f is a function with the appropriate signature, GroupAggregatorFunc(f) is a GroupAggregator that calls f.
type Handler ¶
A Handler processes tasks.
ProcessTask should return nil if the processing of a task is successful.
If ProcessTask returns a non-nil error or panics, the task will be retried after delay if retry-count is remaining, otherwise the task will be archived.
One exception to this rule is when ProcessTask returns a SkipRetry error. If the returned error is SkipRetry or an error wraps SkipRetry, retry is skipped and the task will be immediately archived instead.
type LogLevel ¶
type LogLevel int32
LogLevel represents logging level.
It satisfies flag.Value interface.
const ( // DebugLevel is the lowest level of logging. // Debug logs are intended for debugging and development purposes. DebugLevel LogLevel // InfoLevel is used for general informational log messages. InfoLevel // WarnLevel is used for undesired but relatively expected events, // which may indicate a problem. WarnLevel // ErrorLevel is used for undesired and unexpected events that // the program can recover from. ErrorLevel // FatalLevel is used for undesired and unexpected events that // the program cannot recover from. FatalLevel )
type Logger ¶
type Logger interface { // Debug logs a message at Debug level. Debug(args ...interface{}) // Info logs a message at Info level. Info(args ...interface{}) // Warn logs a message at Warning level. Warn(args ...interface{}) // Error logs a message at Error level. Error(args ...interface{}) // Fatal logs a message at Fatal level // and process will exit with status set to 1. Fatal(args ...interface{}) }
Logger supports logging at various log levels.
type Option ¶
type Option interface { // String returns a string representation of the option. String() string // Type describes the type of the option. Type() OptionType // Value returns a value used to create this option. Value() interface{} }
Option specifies the task processing behavior.
type OptionType ¶
type OptionType int
const ( MaxRetryOpt OptionType = iota QueueOpt TimeoutOpt DeadlineOpt UniqueOpt ProcessAtOpt ProcessInOpt TaskIDOpt RetentionOpt GroupOpt )
type RedisClientOpt ¶
type RedisClientOpt struct { // Network type to use, either tcp or unix. // Default is tcp. Network string // Redis server address in "host:port" format. Addr string // Username to authenticate the current connection when Redis ACLs are used. // See: https://redis.io/commands/auth. Username string // Password to authenticate the current connection. // See: https://redis.io/commands/auth. Password string // Redis DB to select after connecting to a server. // See: https://redis.io/commands/select. DB int // Dial timeout for establishing new connections. // Default is 5 seconds. DialTimeout time.Duration // Timeout for socket reads. // If timeout is reached, read commands will fail with a timeout error // instead of blocking. // // Use value -1 for no timeout and 0 for default. // Default is 3 seconds. ReadTimeout time.Duration // Timeout for socket writes. // If timeout is reached, write commands will fail with a timeout error // instead of blocking. // // Use value -1 for no timeout and 0 for default. // Default is ReadTimout. WriteTimeout time.Duration // Maximum number of socket connections. // Default is 10 connections per every CPU as reported by runtime.NumCPU. PoolSize int // TLS Config used to connect to a server. // TLS will be negotiated only if this field is set. TLSConfig *tls.Config }
RedisClientOpt is used to create a redis client that connects to a redis server directly.
func (RedisClientOpt) MakeRedisClient ¶
func (opt RedisClientOpt) MakeRedisClient() interface{}
type RedisClusterClientOpt ¶
type RedisClusterClientOpt struct { // A seed list of host:port addresses of cluster nodes. Addrs []string // The maximum number of retries before giving up. // Command is retried on network errors and MOVED/ASK redirects. // Default is 8 retries. MaxRedirects int // Username to authenticate the current connection when Redis ACLs are used. // See: https://redis.io/commands/auth. Username string // Password to authenticate the current connection. // See: https://redis.io/commands/auth. Password string // Dial timeout for establishing new connections. // Default is 5 seconds. DialTimeout time.Duration // Timeout for socket reads. // If timeout is reached, read commands will fail with a timeout error // instead of blocking. // // Use value -1 for no timeout and 0 for default. // Default is 3 seconds. ReadTimeout time.Duration // Timeout for socket writes. // If timeout is reached, write commands will fail with a timeout error // instead of blocking. // // Use value -1 for no timeout and 0 for default. // Default is ReadTimeout. WriteTimeout time.Duration // TLS Config used to connect to a server. // TLS will be negotiated only if this field is set. TLSConfig *tls.Config }
RedisClusterClientOpt is used to creates a redis client that connects to redis cluster.
func (RedisClusterClientOpt) MakeRedisClient ¶
func (opt RedisClusterClientOpt) MakeRedisClient() interface{}
type RedisConnOpt ¶
type RedisConnOpt interface { // MakeRedisClient returns a new redis client instance. // Return value is intentionally opaque to hide the implementation detail of redis client. MakeRedisClient() interface{} }
RedisConnOpt is a discriminated union of types that represent Redis connection configuration option.
RedisConnOpt represents a sum of following types:
- RedisClientOpt
- RedisFailoverClientOpt
- RedisClusterClientOpt
func ParseRedisURI ¶
func ParseRedisURI(uri string) (RedisConnOpt, error)
ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid. It returns a non-nil error if uri cannot be parsed.
Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:. Supported formats are:
redis://[:password@]host[:port][/dbnumber] rediss://[:password@]host[:port][/dbnumber] redis-socket://[:password@]path[?db=dbnumber] redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
type RedisFailoverClientOpt ¶
type RedisFailoverClientOpt struct { // Redis master name that monitored by sentinels. MasterName string // Addresses of sentinels in "host:port" format. // Use at least three sentinels to avoid problems described in // https://redis.io/topics/sentinel. SentinelAddrs []string // Redis sentinel password. SentinelPassword string // Username to authenticate the current connection when Redis ACLs are used. // See: https://redis.io/commands/auth. Username string // Password to authenticate the current connection. // See: https://redis.io/commands/auth. Password string // Redis DB to select after connecting to a server. // See: https://redis.io/commands/select. DB int // Dial timeout for establishing new connections. // Default is 5 seconds. DialTimeout time.Duration // Timeout for socket reads. // If timeout is reached, read commands will fail with a timeout error // instead of blocking. // // Use value -1 for no timeout and 0 for default. // Default is 3 seconds. ReadTimeout time.Duration // Timeout for socket writes. // If timeout is reached, write commands will fail with a timeout error // instead of blocking. // // Use value -1 for no timeout and 0 for default. // Default is ReadTimeout WriteTimeout time.Duration // Maximum number of socket connections. // Default is 10 connections per every CPU as reported by runtime.NumCPU. PoolSize int // TLS Config used to connect to a server. // TLS will be negotiated only if this field is set. TLSConfig *tls.Config }
RedisFailoverClientOpt is used to creates a redis client that talks to redis sentinels for service discovery and has an automatic failover capability.
func (RedisFailoverClientOpt) MakeRedisClient ¶
func (opt RedisFailoverClientOpt) MakeRedisClient() interface{}
type ResultWriter ¶
type ResultWriter struct {
// contains filtered or unexported fields
}
ResultWriter is a client interface to write result data for a task. It writes the data to the redis instance the server is connected to.
func (*ResultWriter) TaskID ¶
func (w *ResultWriter) TaskID() string
TaskID returns the ID of the task the ResultWriter is associated with.
type RetryDelayFunc ¶
RetryDelayFunc calculates the retry delay duration for a failed task given the retry count, error, and the task.
n is the number of times the task has been retried. e is the error returned by the task handler. t is the task in question.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is responsible for task processing and task lifecycle management.
Server pulls tasks off queues and processes them. If the processing of a task is unsuccessful, server will schedule it for a retry.
A task will be retried until either the task gets processed successfully or until it reaches its max retry count.
If a task exhausts its retries, it will be moved to the archive and will be kept in the archive set. Note that the archive size is finite and once it reaches its max size, oldest tasks in the archive will be deleted.
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task represents a unit of work to be performed.
func NewTask ¶
NewTask returns a new Task given a type name and payload data. Options can be passed to configure task processing behavior.
func (*Task) ResultWriter ¶
func (t *Task) ResultWriter() *ResultWriter
ResultWriter returns a pointer to the ResultWriter associated with the task.
Nil pointer is returned if called on a newly created task (i.e. task created by calling NewTask). Only the tasks passed to Handler.ProcessTask have a valid ResultWriter pointer.
type TaskInfo ¶
type TaskInfo struct { // ID is the identifier of the task. ID string // Queue is the name of the queue in which the task belongs. Queue string // Type is the type name of the task. Type string // Payload is the payload data of the task. Payload []byte // State indicates the task state. State TaskState // MaxRetry is the maximum number of times the task can be retried. MaxRetry int // Retried is the number of times the task has retried so far. Retried int // LastErr is the error message from the last failure. LastErr string // LastFailedAt is the time time of the last failure if any. // If the task has no failures, LastFailedAt is zero time (i.e. time.Time{}). LastFailedAt time.Time // Timeout is the duration the task can be processed by Handler before being retried, // zero if not specified Timeout time.Duration // Deadline is the deadline for the task, zero value if not specified. Deadline time.Time // Group is the name of the group in which the task belongs. // // Tasks in the same queue can be grouped together by Group name and will be aggregated into one task // by a Server processing the queue. // // Empty string (default) indicates task does not belong to any groups, and no aggregation will be applied to the task. Group string // NextProcessAt is the time the task is scheduled to be processed, // zero if not applicable. NextProcessAt time.Time // IsOrphaned describes whether the task is left in active state with no worker processing it. // An orphaned task indicates that the worker has crashed or experienced network failures and was not able to // extend its lease on the task. // // This task will be recovered by running a server against the queue the task is in. // This field is only applicable to tasks with TaskStateActive. IsOrphaned bool // Retention is duration of the retention period after the task is successfully processed. Retention time.Duration // CompletedAt is the time when the task is processed successfully. // Zero value (i.e. time.Time{}) indicates no value. CompletedAt time.Time // Result holds the result data associated with the task. // Use ResultWriter to write result data from the Handler. Result []byte }
A TaskInfo describes a task and its metadata.
type TaskState ¶
type TaskState int
TaskState denotes the state of a task.
const ( // Indicates that the task is currently being processed by Handler. TaskStateActive TaskState = iota + 1 // Indicates that the task is ready to be processed by Handler. TaskStatePending // Indicates that the task is scheduled to be processed some time in the future. TaskStateScheduled // Indicates that the task has previously failed and scheduled to be processed some time in the future. TaskStateRetry // Indicates that the task is archived and stored for inspection purposes. TaskStateArchived // Indicates that the task is processed successfully and retained until the retention TTL expires. TaskStateCompleted // Indicates that the task is waiting in a group to be aggregated into one task. TaskStateAggregating )
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
internal
|
|
log
Package log exports logging related types and functions.
|
Package log exports logging related types and functions. |
timeutil
Package timeutil exports functions and types related to time and date.
|
Package timeutil exports functions and types related to time and date. |