Documentation
¶
Overview ¶
Package controllerruntime is a minimal framework to implement Kumori controllers where a NATS resource is the primary resource (the target state), and NATS and/or Kube resources are the secondary resources (on which action will be taken to reach this target state). External resources can be managed too, but the framework doesn't provide much help: unexpected changes in external resources are not detected, so the own controller must decide how to manage it (for example: just detect them in periodic resync operations). Its design is based on the IoC (inversion of control): the controller must implement portions of code, but the flow of control resides in the framework (the reconciliation loop) Basically:
- The controller must implement an interface (IController), and instantiate the framework (a ControllerRuntime object).
- The ControllerRuntime is in charge of detecting changes in the target state and invoking the controller functions in charge of reaching it. A Kubernestes workqueue is used to "keep track" of primary resources pending review, as well as reconciliation retries.
About the workqueue flow:
| |new key | +----------|----------+ | | key +----- workqueue.get() -<--------------------+ | | | | | +----------|----------+ | | |key | | | | | +----------|----------+ +----------|----------+ | | | | | | | valid item? | | workqueue.add() | | | | | [with ratelimite] | | +----------|----------+ | | | |yes +----------^----------+ |no | | | +----------|----------+ | | | | | | | syncHandler() | error | | | [conciliate] ----------------------+ | | | | +----------|----------+ | | | | ok | | | +----------|----------+ | | | +--->| workqueue.forget() | | | +----------|----------+ | | +----------|----------+ | | | workqueue.done() | | | +---------------------+
Index ¶
- Constants
- type ControllerRuntime
- func (c ...) DecomposeKubeSecondaryResource(obj any) (secondaryResource metav1.Object, primaryKey string, err error)
- func (c ...) EnqueueKey(key string, operation Operation)
- func (c ...) EnqueueKeyAfter(key string, operation Operation, delay time.Duration)
- func (c ...) Event(object runtime.Object, eventtype string, reason string, message string)
- func (c ...) Run(threadiness int, stopCh <-chan struct{}, errCh chan<- error)
- func (c ...) SetCustomController(customController IController)
- type IController
- type Operation
Constants ¶
const ( ReasonResourceSynced = "Synced" ReasonErrResourceExists = "ErrResourceExists" MessageErrResourceExists = "Resource %q already exists and is not managed by this controller" MessageResourceSyncedCreated = "%q synced successfully (created)" MessageResourceSyncedUpdated = "%q synced successfully (updated)" MessageResourceSyncedDeleted = "%q synced successfully (deleted)" )
Messages used with Kubernetes events
const ( OwnerKeyLabel = "kumori/ownerkey" OwnerkindLabel = "kumori/ownerkind" OwnerKeyAnnotation = "kumori/ownerkey" OwnerkindAnnotation = "kumori/ownerkind" )
Labels in secondary objects
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ControllerRuntime ¶
type ControllerRuntime[ TNatsPrim entityv1.IEntity, TNatsSec1 entityv1.IEntity, TNatsSec2 entityv1.IEntity, TNatsSec3 entityv1.IEntity, TNatsSec4 entityv1.IEntity, TNatsTer1 entityv1.IEntity, TNatsTer2 entityv1.IEntity, TNatsTer3 entityv1.IEntity, TNatsTer4 entityv1.IEntity, ] struct { // contains filtered or unexported fields }
ControllerRuntime is the base to implement a controller working in the way described in ...
https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/controllers.md https://github.com/kubernetes/sample-controller
... but modified to take into account primary resources stored in a NATS store.
It means:
- There is a primary resource, that you'll be updating Status for. It is In this case, the primary resource MUST be a NATS object.
- There are secondary resources, that CAN be a Kubernetes/etcd object or a NATS object, to be created/updated/deleted.
- There are terciary resources. They are like secondaries, but they are only used to get additional info (read).
- All the secondary resources created will have a reference to the primary NATS resource (equivalent to the OwnerRef standard label for kubernetes parent-child relationship)
func NewControllerRuntime ¶
func NewControllerRuntime[ TNatsPrim entityv1.IEntity, TNatsSec1 entityv1.IEntity, TNatsSec2 entityv1.IEntity, TNatsSec3 entityv1.IEntity, TNatsSec4 entityv1.IEntity, TNatsTer1 entityv1.IEntity, TNatsTer2 entityv1.IEntity, TNatsTer3 entityv1.IEntity, TNatsTer4 entityv1.IEntity, ]( name string, kind string, kubeClientset kubeclientset.Interface, primaryNatsClient entityv1.IEntityClient[TNatsPrim], secondarySharedInformers []cache.SharedIndexInformer, terciarySharedInformers []cache.SharedIndexInformer, secondaryNatsClient1 entityv1.IEntityClient[TNatsSec1], secondaryNatsClient2 entityv1.IEntityClient[TNatsSec2], secondaryNatsClient3 entityv1.IEntityClient[TNatsSec3], secondaryNatsClient4 entityv1.IEntityClient[TNatsSec4], terciaryNatsClient1 entityv1.IEntityClient[TNatsTer1], terciaryNatsClient2 entityv1.IEntityClient[TNatsTer2], terciaryNatsClient3 entityv1.IEntityClient[TNatsTer3], terciaryNatsClient4 entityv1.IEntityClient[TNatsTer4], secondaryExternalClients []externalv1.IExternalClient, resyncInterval time.Duration, ) ( controllerRuntime *ControllerRuntime[TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4], )
NewControllerRuntime returns a ControllerRuntime object
func (*ControllerRuntime[TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4]) DecomposeKubeSecondaryResource ¶
func (c *ControllerRuntime[ TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4, ]) DecomposeKubeSecondaryResource( obj any, ) ( secondaryResource metav1.Object, primaryKey string, err error, )
DecomposeKubeSecondaryResource returns, using a generic any reference, the secondary kube resource and the key of the related primary resource (its owner, if exists).
func (*ControllerRuntime[TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4]) EnqueueKey ¶
func (c *ControllerRuntime[ TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4, ]) EnqueueKey(key string, operation Operation)
EnqueueKey takes a NATS primary resource key and put in onto the work queue
func (*ControllerRuntime[TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4]) EnqueueKeyAfter ¶ added in v0.0.6
func (c *ControllerRuntime[ TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4, ]) EnqueueKeyAfter(key string, operation Operation, delay time.Duration)
EnqueueKeyAfter takes a NATS primary resource key and put in onto the work queue but after the indicated duration has passed
func (*ControllerRuntime[TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4]) Event ¶
func (c *ControllerRuntime[ TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4, ]) Event(object runtime.Object, eventtype string, reason string, message string)
Event constructs an event from the given information and puts it in the queue for sending. The resulting event will be created in the same namespace as the object.
- 'object' is the object this event is about. TBD: problem: the recorder work with "kubernetes objects", not "nats objects"
- 'eventtype' of this event, and can be one of Normal, Warning. New types could be added in future
- 'reason' is the reason this event is generated. 'reason' should be short and unique; it should be in UpperCamelCase format (starting with a capital letter). "reason" will be used to automate handling of events, so imagine people writing switch statements to handle them. You want to make that easy.
- 'message' is intended to be human readable.
func (*ControllerRuntime[TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4]) Run ¶
func (c *ControllerRuntime[ TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4, ]) Run(threadiness int, stopCh <-chan struct{}, errCh chan<- error)
Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. It will block until stopCh is closed, at which point it will shutdown the workqueue and wait for workers to finish processing their current work items.
func (*ControllerRuntime[TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4]) SetCustomController ¶
func (c *ControllerRuntime[ TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4, ]) SetCustomController(customController IController)
type IController ¶
type IController interface { // Run start the reconcyling loop Run(threadiness int, stopCh <-chan struct{}, errCh chan<- error) // SyncHandler compares the actual state with the desired, and attempts to // converge the two. It is triggered every time a primary resource is pulled // from the work-queue, // IF SYNCHANDLER RETURNS AN ERROR, THE PRIMARY RESOURCE IS REENQUEUED TO BE // EVALUATED LATER SyncHandler(string) error // HandleSecondary is triggered when a secondary object has changed, // and it is processed in a very simply way: force to re-evaluation of the // related primary resource enqueueing it. HandleSecondary(obj any) // GarbageSecondaries is in charge of search secondary objects whose owner not // exists (so the "DELETE/PURGE" operation has not been execute due to some // reason) GarbageSecondaries() // Clean must perform any necessary actions when the controller is stopped. Clean() }
IController is the interface that any custom controller must implement