Documentation
¶
Index ¶
- type BucketFunc
- type BucketKey
- type ChainThrottler
- type GetSyncLimit
- type IsWorkflowDeleted
- type Key
- type LockKind
- type LockName
- type Manager
- func (cm *Manager) CheckWorkflowExistence()
- func (cm *Manager) Initialize(wfs []wfv1.Workflow)
- func (cm *Manager) Release(wf *wfv1.Workflow, nodeName string, syncRef *wfv1.Synchronization)
- func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool
- func (cm *Manager) TryAcquire(wf *wfv1.Workflow, nodeName string, syncLockRef *wfv1.Synchronization) (bool, bool, string, error)
- type NextWorkflow
- type PriorityMutex
- type PrioritySemaphore
- type QueueFunc
- type Semaphore
- type Throttler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BucketFunc ¶
type BucketFunc func(Key) BucketKey
var NamespaceBucket BucketFunc = func(key Key) BucketKey {
namespace, _, _ := cache.SplitMetaNamespaceKey(key)
return namespace
}
var SingleBucket BucketFunc = func(key Key) BucketKey { return "" }
type ChainThrottler ¶
type ChainThrottler []Throttler
type GetSyncLimit ¶
type GetSyncLimit func(string) (int, error)
type IsWorkflowDeleted ¶
type IsWorkflowDeleted func(string) bool
type LockKind ¶
type LockKind string
const (
LockKindConfigMap LockKind = "ConfigMap"
LockKindMutex LockKind = "Mutex"
)
type LockName ¶
type LockName struct {
Namespace string
ResourceName string
Key string
Kind LockKind
}
func DecodeLockName ¶
func DecodeLockName(lockName string) (*LockName, error)
func GetLockName ¶
func GetLockName(sync *v1alpha1.Synchronization, namespace string) (*LockName, error)
func NewLockName ¶
func NewLockName(namespace, resourceName, lockKey string, kind LockKind) *LockName
func (*LockName) EncodeName ¶
func (ln *LockName) EncodeName() string
func (*LockName) ValidateEncoding ¶
func (ln *LockName) ValidateEncoding(encoding string) string
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func NewLockManager ¶
func NewLockManager(getSyncLimit GetSyncLimit, nextWorkflow NextWorkflow, isWFDeleted IsWorkflowDeleted) *Manager
func (*Manager) CheckWorkflowExistence ¶
func (cm *Manager) CheckWorkflowExistence()
func (*Manager) Initialize ¶
func (cm *Manager) Initialize(wfs []wfv1.Workflow)
func (*Manager) Release ¶
func (cm *Manager) Release(wf *wfv1.Workflow, nodeName string, syncRef *wfv1.Synchronization)
func (*Manager) ReleaseAll ¶
func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool
func (*Manager) TryAcquire ¶
func (cm *Manager) TryAcquire(wf *wfv1.Workflow, nodeName string, syncLockRef *wfv1.Synchronization) (bool, bool, string, error)
TryAcquire tries to acquire the lock from semaphore. It returns status of acquiring a lock , status of Workflow status updated, waiting message if lock is not available and any error encountered
type NextWorkflow ¶
type NextWorkflow func(string)
type PriorityMutex ¶
type PriorityMutex struct {
// contains filtered or unexported fields
}
type PrioritySemaphore ¶
type PrioritySemaphore struct {
// contains filtered or unexported fields
}
func NewSemaphore ¶
func NewSemaphore(name string, limit int, nextWorkflow NextWorkflow, lockType string) *PrioritySemaphore
type Throttler ¶
type Throttler interface {
Init(wfs []wfv1.Workflow) error
Add(key Key, priority int32, creationTime time.Time)
// Admit returns if the item should be processed.
Admit(key Key) bool
// Remove notifies throttler that item processing is no longer needed
Remove(key Key)
}
Throttler allows the controller to limit number of items it is processing in parallel. Items are processed in priority order, and one processing starts, other items (including higher-priority items) will be kept pending until the processing is complete. Implementations should be idempotent.
func NewThrottler ¶
func NewThrottler(parallelism int, bucketFunc BucketFunc, queue QueueFunc) Throttler
NewThrottler returns a throttle that only runs `parallelism` items at once. When an item may need processing, `queue` is invoked.
Source Files
¶
Click to show internal directories.
Click to hide internal directories.