Documentation
¶
Index ¶
- Constants
- Variables
- func ActivityTypePtr(v s.ActivityType) *s.ActivityType
- func AddSecondsToBaseTime(baseTimeInNanoSec int64, durationInSeconds int64) int64
- func AggregateYarpcOptions(ctx context.Context, opts ...yarpc.CallOption) []yarpc.CallOption
- func ArchivalStatusPtr(t s.ArchivalStatus) *s.ArchivalStatus
- func AwaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool
- func BoolDefault(v *bool) bool
- func BoolPtr(v bool) *bool
- func CancelExternalWorkflowExecutionFailedCausePtr(t s.CancelExternalWorkflowExecutionFailedCause) *s.CancelExternalWorkflowExecutionFailedCause
- func CheckEventBlobSizeLimit(actualSize, warnLimit, errorLimit int, domainID, workflowID, runID string, ...) error
- func ChildPolicyPtr(t s.ChildPolicy) *s.ChildPolicy
- func ChildWorkflowExecutionFailedCausePtr(t s.ChildWorkflowExecutionFailedCause) *s.ChildWorkflowExecutionFailedCause
- func CreateAdminServiceRetryPolicy() backoff.RetryPolicy
- func CreateBlobstoreClientRetryPolicy() backoff.RetryPolicy
- func CreateFrontendServiceRetryPolicy() backoff.RetryPolicy
- func CreateHistoryServiceRetryPolicy() backoff.RetryPolicy
- func CreateHistoryStartWorkflowRequest(domainID string, startRequest *workflow.StartWorkflowExecutionRequest) *h.StartWorkflowExecutionRequest
- func CreateKafkaOperationRetryPolicy() backoff.RetryPolicy
- func CreateMatchingPollForDecisionTaskResponse(historyResponse *h.RecordDecisionTaskStartedResponse, ...) *m.PollForDecisionTaskResponse
- func CreateMatchingServiceRetryPolicy() backoff.RetryPolicy
- func CreatePersistanceRetryPolicy() backoff.RetryPolicy
- func CreatePublicClientRetryPolicy() backoff.RetryPolicy
- func DecisionTaskFailedCausePtr(t s.DecisionTaskFailedCause) *s.DecisionTaskFailedCause
- func DecisionTypePtr(t s.DecisionType) *s.DecisionType
- func EventTypePtr(t s.EventType) *s.EventType
- func Float64Ptr(v float64) *float64
- func GenerateRandomString(n int) string
- func Int32Default(v *int32) int32
- func Int32Ptr(v int32) *int32
- func Int64Default(v *int64) int64
- func Int64Ptr(v int64) *int64
- func IntPtr(v int) *int
- func IsBlobstoreNonRetryableError(err error) bool
- func IsBlobstoreTransientError(err error) bool
- func IsKafkaTransientError(err error) bool
- func IsPersistenceTransientError(err error) bool
- func IsServiceNonRetryableError(err error) bool
- func IsServiceTransientError(err error) bool
- func IsValidContext(ctx context.Context) error
- func IsWhitelistServiceTransientError(err error) bool
- func MinInt32(a, b int32) int32
- func PrettyPrintHistory(history *workflow.History, logger bark.Logger)
- func SignalExternalWorkflowExecutionFailedCausePtr(t s.SignalExternalWorkflowExecutionFailedCause) *s.SignalExternalWorkflowExecutionFailedCause
- func StringDefault(v *string) string
- func StringPtr(v string) *string
- func TDeserialize(msg thrift.TStruct, b []byte) (err error)
- func TDeserializeString(msg thrift.TStruct, s string) (err error)
- func TListDeserialize(msgType reflect.Type, b []byte) (msgs []thrift.TStruct, err error)
- func TListSerialize(msgs []thrift.TStruct) (b []byte, err error)
- func TSerialize(msg thrift.TStruct) (b []byte, err error)
- func TSerializeString(msg thrift.TStruct) (s string, err error)
- func TaskListKindPtr(t s.TaskListKind) *s.TaskListKind
- func TaskListPtr(v s.TaskList) *s.TaskList
- func TaskListTypePtr(t s.TaskListType) *s.TaskListType
- func TimeoutTypePtr(t s.TimeoutType) *s.TimeoutType
- func Uint32Ptr(v uint32) *uint32
- func Uint64Ptr(v uint64) *uint64
- func ValidateCronSchedule(cronSchedule string) error
- func ValidateRetryPolicy(policy *workflow.RetryPolicy) error
- func WorkflowIDToHistoryShard(workflowID string, numberOfShards int) int
- func WorkflowTypePtr(t s.WorkflowType) *s.WorkflowType
- type ClientCache
- type Daemon
- type EncodingType
- type EventTimeSource
- type PProfInitializer
- type PriorityTokenBucket
- type QueryTaskToken
- type RPCFactory
- type RealTimeSource
- type TaskToken
- type TaskTokenSerializer
- type TimeSource
- type TokenBucket
- type TokenBucketFactory
Constants ¶
const (
// FirstEventID is the id of the first event in the history
FirstEventID int64 = 1
// EmptyEventID is the id of the empty event
EmptyEventID int64 = -23
// EmptyVersion is used as the default value for failover version when no value is provided
EmptyVersion int64 = -24
// EndEventID is the id of the end event, here we use the int64 max
EndEventID int64 = 1<<63 - 1
// BufferedEventID is the id of the buffered event
BufferedEventID int64 = -123
// EmptyEventTaskID is uninitialized id of the task id within event
EmptyEventTaskID int64 = -1234
// TransientEventID is the id of the transient event
TransientEventID int64 = -124
// FirstBlobPageToken is the page token identifying the first blob for each history archival
FirstBlobPageToken = 1
// LastBlobNextPageToken is the next page token on the last blob for each history archival
LastBlobNextPageToken = -1
)
const (
// FrontendServiceName is the name of the frontend service
FrontendServiceName = "cadence-frontend"
// HistoryServiceName is the name of the history service
HistoryServiceName = "cadence-history"
// MatchingServiceName is the name of the matching service
MatchingServiceName = "cadence-matching"
// WorkerServiceName is the name of the worker service
WorkerServiceName = "cadence-worker"
)
const (
EncodingTypeJSON EncodingType = "json"
EncodingTypeThriftRW = "thriftrw"
EncodingTypeGob = "gob"
EncodingTypeUnknown = "unknow"
)
Data encoding types
const (
// GetHistoryWarnSizeLimit is the threshold for emitting warn log
GetHistoryWarnSizeLimit = 500 * 1024 // Warn when size goes over 500KB
// GetHistoryMaxPageSize is the max page size for get history
GetHistoryMaxPageSize = 1000
)
const (
// DaemonStatusInitialized coroutine pool initialized
DaemonStatusInitialized int32 = 0
// DaemonStatusStarted coroutine pool started
DaemonStatusStarted int32 = 1
// DaemonStatusStopped coroutine pool stopped
DaemonStatusStopped int32 = 2
)
const (
// LibraryVersionHeaderName refers to the name of the
// tchannel / http header that contains the client
// library version
LibraryVersionHeaderName = "cadence-client-library-version"
// FeatureVersionHeaderName refers to the name of the
// tchannel / http header that contains the client
// feature version
// the feature version sent from client represents the
// feature set of the cadence client library support.
// This can be used for client capibility check, on
// Cadence server, for backward compatibility
FeatureVersionHeaderName = "cadence-client-feature-version"
// ClientImplHeaderName refers to the name of the
// header that contains the client implementation
ClientImplHeaderName = "cadence-client-name"
)
const (
// FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit
FailureReasonCompleteResultExceedsLimit = "COMPLETE_RESULT_EXCEEDS_LIMIT"
// FailureReasonFailureDetailsExceedsLimit is failureReason for failure details exceeds limit
FailureReasonFailureDetailsExceedsLimit = "FAILURE_DETAILS_EXCEEDS_LIMIT"
// FailureReasonCancelDetailsExceedsLimit is failureReason for cancel details exceeds limit
FailureReasonCancelDetailsExceedsLimit = "CANCEL_DETAILS_EXCEEDS_LIMIT"
//FailureReasonHeartbeatExceedsLimit is failureReason for heartbeat exceeds limit
FailureReasonHeartbeatExceedsLimit = "HEARTBEAT_EXCEEDS_LIMIT"
// FailureReasonDecisionBlobSizeExceedsLimit is the failureReason for decision blob exceeds size limit
FailureReasonDecisionBlobSizeExceedsLimit = "DECISION_BLOB_SIZE_EXCEEDS_LIMIT"
// TerminateReasonSizeExceedsLimit is reason to terminate workflow when history size exceed limit
TerminateReasonSizeExceedsLimit = "HISTORY_SIZE_EXCEEDS_LIMIT"
)
const MaxTaskTimeout = 31622400
MaxTaskTimeout is maximum task timeout allowed. 366 days in seconds
const NoRetryBackoff = time.Duration(-1)
NoRetryBackoff is used to represent backoff when no retry is needed
const (
// VisibilityAppName is used to find kafka topics and ES indexName for visibility
VisibilityAppName = "visibility"
)
Variables ¶
var (
// ErrBlobSizeExceedsLimit is error for event blob size exceeds limit
ErrBlobSizeExceedsLimit = &workflow.BadRequestError{Message: "Blob data size exceeds limit."}
)
Functions ¶
func ActivityTypePtr ¶
func ActivityTypePtr(v s.ActivityType) *s.ActivityType
ActivityTypePtr makes a copy and returns the pointer to a ActivityType.
func AddSecondsToBaseTime ¶
func AddSecondsToBaseTime(baseTimeInNanoSec int64, durationInSeconds int64) int64
AddSecondsToBaseTime - Gets the UnixNano with given duration and base time.
func AggregateYarpcOptions ¶ added in v0.3.5
func AggregateYarpcOptions(ctx context.Context, opts ...yarpc.CallOption) []yarpc.CallOption
AggregateYarpcOptions aggregate the header information from context to existing yarpc call options
func ArchivalStatusPtr ¶ added in v0.5.0
func ArchivalStatusPtr(t s.ArchivalStatus) *s.ArchivalStatus
ArchivalStatusPtr makes a copy and returns the pointer to an ArchivalStatus.
func AwaitWaitGroup ¶
func AwaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool
AwaitWaitGroup calls Wait on the given wait Returns true if the Wait() call succeeded before the timeout Returns false if the Wait() did not return before the timeout
func BoolDefault ¶ added in v0.3.2
func BoolDefault(v *bool) bool
BoolDefault returns value if bool pointer is set otherwise default value of bool
func CancelExternalWorkflowExecutionFailedCausePtr ¶ added in v0.3.2
func CancelExternalWorkflowExecutionFailedCausePtr(t s.CancelExternalWorkflowExecutionFailedCause) *s.CancelExternalWorkflowExecutionFailedCause
CancelExternalWorkflowExecutionFailedCausePtr makes a copy and returns the pointer to a CancelExternalWorkflowExecutionFailedCause.
func CheckEventBlobSizeLimit ¶ added in v0.5.0
func CheckEventBlobSizeLimit(actualSize, warnLimit, errorLimit int, domainID, workflowID, runID string, metricsClient metrics.Client, scope int, logger bark.Logger) error
CheckEventBlobSizeLimit checks if a blob data exceeds limits. It logs a warning if it exceeds warnLimit, and return ErrBlobSizeExceedsLimit if it exceeds errorLimit.
func ChildPolicyPtr ¶ added in v0.3.2
func ChildPolicyPtr(t s.ChildPolicy) *s.ChildPolicy
ChildPolicyPtr makes a copy and returns the pointer to a ChildPolicy.
func ChildWorkflowExecutionFailedCausePtr ¶ added in v0.3.2
func ChildWorkflowExecutionFailedCausePtr(t s.ChildWorkflowExecutionFailedCause) *s.ChildWorkflowExecutionFailedCause
ChildWorkflowExecutionFailedCausePtr makes a copy and returns the pointer to a ChildWorkflowExecutionFailedCause.
func CreateAdminServiceRetryPolicy ¶ added in v0.5.0
func CreateAdminServiceRetryPolicy() backoff.RetryPolicy
CreateAdminServiceRetryPolicy creates a retry policy for calls to matching service
func CreateBlobstoreClientRetryPolicy ¶ added in v0.5.2
func CreateBlobstoreClientRetryPolicy() backoff.RetryPolicy
CreateBlobstoreClientRetryPolicy creates a retry policy for blobstore client
func CreateFrontendServiceRetryPolicy ¶ added in v0.3.11
func CreateFrontendServiceRetryPolicy() backoff.RetryPolicy
CreateFrontendServiceRetryPolicy creates a retry policy for calls to frontend service
func CreateHistoryServiceRetryPolicy ¶
func CreateHistoryServiceRetryPolicy() backoff.RetryPolicy
CreateHistoryServiceRetryPolicy creates a retry policy for calls to history service
func CreateHistoryStartWorkflowRequest ¶ added in v0.4.0
func CreateHistoryStartWorkflowRequest(domainID string, startRequest *workflow.StartWorkflowExecutionRequest) *h.StartWorkflowExecutionRequest
CreateHistoryStartWorkflowRequest create a start workflow request for history
func CreateKafkaOperationRetryPolicy ¶ added in v0.5.2
func CreateKafkaOperationRetryPolicy() backoff.RetryPolicy
CreateKafkaOperationRetryPolicy creates a retry policy for kafka operation
func CreateMatchingPollForDecisionTaskResponse ¶ added in v0.3.12
func CreateMatchingPollForDecisionTaskResponse(historyResponse *h.RecordDecisionTaskStartedResponse, workflowExecution *workflow.WorkflowExecution, token []byte) *m.PollForDecisionTaskResponse
CreateMatchingPollForDecisionTaskResponse create response for matching's PollForDecisionTask
func CreateMatchingServiceRetryPolicy ¶ added in v0.5.0
func CreateMatchingServiceRetryPolicy() backoff.RetryPolicy
CreateMatchingServiceRetryPolicy creates a retry policy for calls to matching service
func CreatePersistanceRetryPolicy ¶
func CreatePersistanceRetryPolicy() backoff.RetryPolicy
CreatePersistanceRetryPolicy creates a retry policy for persistence layer operations
func CreatePublicClientRetryPolicy ¶ added in v0.5.2
func CreatePublicClientRetryPolicy() backoff.RetryPolicy
CreatePublicClientRetryPolicy creates a retry policy for calls to frontend service
func DecisionTaskFailedCausePtr ¶ added in v0.3.2
func DecisionTaskFailedCausePtr(t s.DecisionTaskFailedCause) *s.DecisionTaskFailedCause
DecisionTaskFailedCausePtr makes a copy and returns the pointer to a DecisionTaskFailedCause.
func DecisionTypePtr ¶
func DecisionTypePtr(t s.DecisionType) *s.DecisionType
DecisionTypePtr makes a copy and returns the pointer to a DecisionType.
func EventTypePtr ¶
func EventTypePtr(t s.EventType) *s.EventType
EventTypePtr makes a copy and returns the pointer to a EventType.
func Float64Ptr ¶
func Float64Ptr(v float64) *float64
Float64Ptr makes a copy and returns the pointer to an int64.
func GenerateRandomString ¶ added in v0.3.12
func GenerateRandomString(n int) string
GenerateRandomString is used for generate test string
func Int32Default ¶ added in v0.3.2
func Int32Default(v *int32) int32
Int32Default returns value if int32 pointer is set otherwise default value of int32
func Int32Ptr ¶
func Int32Ptr(v int32) *int32
Int32Ptr makes a copy and returns the pointer to an int32.
func Int64Default ¶ added in v0.3.2
func Int64Default(v *int64) int64
Int64Default returns value if int64 pointer is set otherwise default value of int64
func Int64Ptr ¶
func Int64Ptr(v int64) *int64
Int64Ptr makes a copy and returns the pointer to an int64.
func IsBlobstoreNonRetryableError ¶ added in v0.5.2
func IsBlobstoreNonRetryableError(err error) bool
IsBlobstoreNonRetryableError checks if the error is a non retryable error.
func IsBlobstoreTransientError ¶ added in v0.5.2
func IsBlobstoreTransientError(err error) bool
IsBlobstoreTransientError checks if the error is a retryable error.
func IsKafkaTransientError ¶ added in v0.5.2
func IsKafkaTransientError(err error) bool
IsKafkaTransientError check if the error is a transient kafka error
func IsPersistenceTransientError ¶
func IsPersistenceTransientError(err error) bool
IsPersistenceTransientError checks if the error is a transient persistence error
func IsServiceNonRetryableError ¶
func IsServiceNonRetryableError(err error) bool
IsServiceNonRetryableError checks if the error is a non retryable error.
func IsServiceTransientError ¶ added in v0.3.11
func IsServiceTransientError(err error) bool
IsServiceTransientError checks if the error is a retryable error.
func IsValidContext ¶
func IsValidContext(ctx context.Context) error
IsValidContext checks that the thrift context is not expired on cancelled. Returns nil if the context is still valid. Otherwise, returns the result of ctx.Err()
func IsWhitelistServiceTransientError ¶ added in v0.3.14
func IsWhitelistServiceTransientError(err error) bool
IsWhitelistServiceTransientError checks if the error is a transient error.
func MinInt32 ¶ added in v0.3.14
func MinInt32(a, b int32) int32
MinInt32 return smaller one of two inputs int32
func PrettyPrintHistory ¶
func PrettyPrintHistory(history *workflow.History, logger bark.Logger)
PrettyPrintHistory prints history in human readable format
func SignalExternalWorkflowExecutionFailedCausePtr ¶ added in v0.3.6
func SignalExternalWorkflowExecutionFailedCausePtr(t s.SignalExternalWorkflowExecutionFailedCause) *s.SignalExternalWorkflowExecutionFailedCause
SignalExternalWorkflowExecutionFailedCausePtr makes a copy and returns the pointer to a SignalExternalWorkflowExecutionFailedCause.
func StringDefault ¶ added in v0.3.2
func StringDefault(v *string) string
StringDefault returns value if string pointer is set otherwise default value of string
func StringPtr ¶
func StringPtr(v string) *string
StringPtr makes a copy and returns the pointer to a string.
func TDeserialize ¶
func TDeserialize(msg thrift.TStruct, b []byte) (err error)
TDeserialize is used to deserialize []byte to thrift TStruct
func TDeserializeString ¶
func TDeserializeString(msg thrift.TStruct, s string) (err error)
TDeserializeString is used to deserialize string to thrift TStruct
func TListDeserialize ¶
func TListDeserialize(msgType reflect.Type, b []byte) (msgs []thrift.TStruct, err error)
TListDeserialize is used to deserialize []byte to list of thrift TStruct
func TListSerialize ¶
func TListSerialize(msgs []thrift.TStruct) (b []byte, err error)
TListSerialize is used to serialize list of thrift TStruct to []byte
func TSerialize ¶
func TSerialize(msg thrift.TStruct) (b []byte, err error)
TSerialize is used to serialize thrift TStruct to []byte
func TSerializeString ¶
func TSerializeString(msg thrift.TStruct) (s string, err error)
TSerializeString is used to serialize thrift TStruct to string
func TaskListKindPtr ¶ added in v0.3.6
func TaskListKindPtr(t s.TaskListKind) *s.TaskListKind
TaskListKindPtr makes a copy and returns the pointer to a TaskListKind.
func TaskListPtr ¶
func TaskListPtr(v s.TaskList) *s.TaskList
TaskListPtr makes a copy and returns the pointer to a TaskList.
func TaskListTypePtr ¶ added in v0.4.0
func TaskListTypePtr(t s.TaskListType) *s.TaskListType
TaskListTypePtr makes a copy and returns the pointer to a TaskListKind.
func TimeoutTypePtr ¶ added in v0.3.2
func TimeoutTypePtr(t s.TimeoutType) *s.TimeoutType
TimeoutTypePtr makes a copy and returns the pointer to a TimeoutType.
func Uint32Ptr ¶
func Uint32Ptr(v uint32) *uint32
Uint32Ptr makes a copy and returns the pointer to a uint32.
func Uint64Ptr ¶
func Uint64Ptr(v uint64) *uint64
Uint64Ptr makes a copy and returns the pointer to a uint64.
func ValidateCronSchedule ¶ added in v0.5.0
func ValidateCronSchedule(cronSchedule string) error
ValidateCronSchedule validates a cron schedule spec
func ValidateRetryPolicy ¶ added in v0.4.0
func ValidateRetryPolicy(policy *workflow.RetryPolicy) error
ValidateRetryPolicy validates a retry policy
func WorkflowIDToHistoryShard ¶
func WorkflowIDToHistoryShard(workflowID string, numberOfShards int) int
WorkflowIDToHistoryShard is used to map workflowID to a shardID
func WorkflowTypePtr ¶
func WorkflowTypePtr(t s.WorkflowType) *s.WorkflowType
WorkflowTypePtr makes a copy and returns the pointer to a WorkflowType.
Types ¶
type ClientCache ¶ added in v0.5.0
type ClientCache interface {
GetClientForKey(key string) (interface{}, error)
GetClientForClientKey(clientKey string) (interface{}, error)
}
ClientCache store initialized clients
func NewClientCache ¶ added in v0.5.0
func NewClientCache(
keyResolver keyResolver,
clientProvider clientProvider,
) ClientCache
NewClientCache creates a new client cache based on membership
type Daemon ¶
type Daemon interface {
Start()
Stop()
}
Daemon is the base interfaces implemented by background tasks within cherami
type EncodingType ¶
type EncodingType string
EncodingType is an enum that represents various data encoding types
type EventTimeSource ¶ added in v0.3.14
type EventTimeSource struct {
// contains filtered or unexported fields
}
EventTimeSource serves fake controlled time
func NewEventTimeSource ¶ added in v0.3.14
func NewEventTimeSource() *EventTimeSource
NewEventTimeSource returns a time source that servers fake controlled time
type PProfInitializer ¶ added in v0.3.5
type PProfInitializer interface {
Start() error
}
PProfInitializer initialize the pprof based on config
type PriorityTokenBucket ¶ added in v0.4.0
type PriorityTokenBucket interface {
// GetToken attempts to take count tokens from the
// bucket with that priority. Priority 0 is highest.
// Returns true on success, false
// otherwise along with the duration for the next refill
GetToken(priority, count int) (bool, time.Duration)
}
PriorityTokenBucket is the interface for rate limiter with priority
func NewFullPriorityTokenBucket ¶ added in v0.4.0
func NewFullPriorityTokenBucket(numOfPriority, rps int, timeSource TimeSource) PriorityTokenBucket
NewFullPriorityTokenBucket creates and returns a new priority token bucket with all bucket init with full tokens. With all buckets full, get tokens from low priority buckets won't be missed initially, but may caused bursts.
func NewPriorityTokenBucket ¶ added in v0.4.0
func NewPriorityTokenBucket(numOfPriority, rps int, timeSource TimeSource) PriorityTokenBucket
NewPriorityTokenBucket creates and returns a new token bucket rate limiter support priority. There are n buckets for n priorities. It replenishes the top priority bucket every 100 milliseconds, unused tokens flows to next bucket. The idea comes from Dual Token Bucket Algorithms. Thread safe.
@param numOfPriority
Number of priorities
@param rps
Desired rate per second
type QueryTaskToken ¶ added in v0.3.2
type QueryTaskToken struct {
DomainID string `json:"domainId"`
TaskList string `json:"taskList"`
TaskID string `json:"taskId"`
}
QueryTaskToken identifies a query task
type RPCFactory ¶ added in v0.3.2
type RPCFactory interface {
CreateDispatcher() *yarpc.Dispatcher
CreateDispatcherForOutbound(callerName, serviceName, hostName string) *yarpc.Dispatcher
}
RPCFactory Creates a dispatcher that knows how to transport requests.
type RealTimeSource ¶ added in v0.3.12
type RealTimeSource struct{}
RealTimeSource serves real wall-clock time
func NewRealTimeSource ¶
func NewRealTimeSource() *RealTimeSource
NewRealTimeSource returns a time source that servers real wall clock time
type TaskToken ¶
type TaskToken struct {
DomainID string `json:"domainId"`
WorkflowID string `json:"workflowId"`
RunID string `json:"runId"`
ScheduleID int64 `json:"scheduleId"`
ScheduleAttempt int64 `json:"scheduleAttempt"`
ActivityID string `json:"activityId"`
}
TaskToken identifies a task
type TaskTokenSerializer ¶
type TaskTokenSerializer interface {
Serialize(token *TaskToken) ([]byte, error)
Deserialize(data []byte) (*TaskToken, error)
SerializeQueryTaskToken(token *QueryTaskToken) ([]byte, error)
DeserializeQueryTaskToken(data []byte) (*QueryTaskToken, error)
}
TaskTokenSerializer serializes task tokens
func NewJSONTaskTokenSerializer ¶
func NewJSONTaskTokenSerializer() TaskTokenSerializer
NewJSONTaskTokenSerializer creates a new instance of TaskTokenSerializer
type TimeSource ¶
type TimeSource interface {
Now() time.Time
}
TimeSource is an interface for any entity that provides the current time. Its primarily used to mock out timesources in unit test
type TokenBucket ¶
type TokenBucket interface {
// TryConsume attempts to take count tokens from the
// bucket. Returns true on success, false
// otherwise along with the duration for the next refill
TryConsume(count int) (bool, time.Duration)
// Consume waits up to timeout duration to take count
// tokens from the bucket. Returns true if count
// tokens were acquired before timeout, false
// otherwise
Consume(count int, timeout time.Duration) bool
}
TokenBucket is the interface for any implementation of a token bucket rate limiter
func NewTokenBucket ¶
func NewTokenBucket(rps int, timeSource TimeSource) TokenBucket
NewTokenBucket creates and returns a new token bucket rate limiter that replenishes the bucket every 100 milliseconds. Thread safe.
@param rps
Desired rate per second
Golang.org has an alternative implementation of the rate limiter. On benchmarking, golang's implementation was order of magnitude slower. In addition, it does a lot more than what we need. These are the benchmarks under different scenarios
BenchmarkTokenBucketParallel 50000000 40.7 ns/op BenchmarkGolangRateParallel 10000000 150 ns/op BenchmarkTokenBucketParallel-8 20000000 124 ns/op BenchmarkGolangRateParallel-8 10000000 208 ns/op BenchmarkTokenBucketParallel 50000000 37.8 ns/op BenchmarkGolangRateParallel 10000000 153 ns/op BenchmarkTokenBucketParallel-8 10000000 129 ns/op BenchmarkGolangRateParallel-8 10000000 208 ns/op
type TokenBucketFactory ¶
type TokenBucketFactory interface {
CreateTokenBucket(rps int, timeSource TimeSource) TokenBucket
}
TokenBucketFactory is an interface mainly used for injecting mock implementation of TokenBucket for unit testing
func NewTokenBucketFactory ¶
func NewTokenBucketFactory() TokenBucketFactory
NewTokenBucketFactory creates an instance of factory used for creating TokenBucket instances