controllerruntime

package
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2025 License: EUPL-1.2 Imports: 15 Imported by: 0

Documentation

Overview

Package controllerruntime is a minimal framework to implement Kumori controllers where a NATS resource is the primary resource (the target state), and NATS and/or Kube resources are the secondary resources (on which action will be taken to reach this target state). External resources can be managed too, but the framework doesn't provide much help: unexpected changes in external resources are not detected, so the own controller must decide how to manage it (for example: just detect them in periodic resync operations). Its design is based on the IoC (inversion of control): the controller must implement portions of code, but the flow of control resides in the framework (the reconciliation loop) Basically:

  • The controller must implement an interface (IController), and instantiate the framework (a ControllerRuntime object).
  • The ControllerRuntime is in charge of detecting changes in the target state and invoking the controller functions in charge of reaching it. A Kubernestes workqueue is used to "keep track" of primary resources pending review, as well as reconciliation retries.

About the workqueue flow:

                |
                |new key
                |
     +----------|----------+
     |                     |   key
+-----  workqueue.get()    -<--------------------+
|    |                     |                     |
|    +----------|----------+                     |
|               |key                             |
|               |                                |
|    +----------|----------+          +----------|----------+
|    |                     |          |                     |
|    |  valid item?        |          |  workqueue.add()    |
|    |                     |          |  [with ratelimite]  |
|    +----------|----------+          |                     |
|               |yes                  +----------^----------+
|no             |                                |
|    +----------|----------+                     |
|    |                     |                     |
|    |  syncHandler()      |   error             |
|    |  [conciliate]       ----------------------+
|    |                     |
|    +----------|----------+
|               |
|               | ok
|               |
|    +----------|----------+
|    |                     |
+--->|  workqueue.forget() |
     |                     |
     +----------|----------+
                |
                |
     +----------|----------+
     |                     |
     |  workqueue.done()   |
     |                     |
     +---------------------+

Index

Constants

View Source
const (
	ReasonResourceSynced         = "Synced"
	ReasonErrResourceExists      = "ErrResourceExists"
	MessageErrResourceExists     = "Resource %q already exists and is not managed by this controller"
	MessageResourceSyncedCreated = "%q synced successfully (created)"
	MessageResourceSyncedUpdated = "%q synced successfully (updated)"
	MessageResourceSyncedDeleted = "%q synced successfully (deleted)"
)

Messages used with Kubernetes events

View Source
const (
	OwnerKeyLabel       = "kumori/ownerkey"
	OwnerkindLabel      = "kumori/ownerkind"
	OwnerKeyAnnotation  = "kumori/ownerkey"
	OwnerkindAnnotation = "kumori/ownerkind"
)

Labels in secondary objects

Variables

This section is empty.

Functions

This section is empty.

Types

type ControllerRuntime

type ControllerRuntime[
	TNatsPrim entityv1.IEntity,
	TNatsSec1 entityv1.IEntity, TNatsSec2 entityv1.IEntity, TNatsSec3 entityv1.IEntity, TNatsSec4 entityv1.IEntity,
	TNatsTer1 entityv1.IEntity, TNatsTer2 entityv1.IEntity, TNatsTer3 entityv1.IEntity, TNatsTer4 entityv1.IEntity,
] struct {
	// contains filtered or unexported fields
}

ControllerRuntime is the base to implement a controller working in the way described in ...

https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/controllers.md
https://github.com/kubernetes/sample-controller

... but modified to take into account primary resources stored in a NATS store.

It means:

  • There is a primary resource, that you'll be updating Status for. It is In this case, the primary resource MUST be a NATS object.
  • There are secondary resources, that CAN be a Kubernetes/etcd object or a NATS object, to be created/updated/deleted.
  • There are terciary resources. They are like secondaries, but they are only used to get additional info (read).
  • All the secondary resources created will have a reference to the primary NATS resource (equivalent to the OwnerRef standard label for kubernetes parent-child relationship)

func NewControllerRuntime

func NewControllerRuntime[
	TNatsPrim entityv1.IEntity,
	TNatsSec1 entityv1.IEntity, TNatsSec2 entityv1.IEntity, TNatsSec3 entityv1.IEntity, TNatsSec4 entityv1.IEntity,
	TNatsTer1 entityv1.IEntity, TNatsTer2 entityv1.IEntity, TNatsTer3 entityv1.IEntity, TNatsTer4 entityv1.IEntity,
](
	name string,
	kind string,
	kubeClientset kubeclientset.Interface,
	primaryNatsClient entityv1.IEntityClient[TNatsPrim],
	secondarySharedInformers []cache.SharedIndexInformer,
	terciarySharedInformers []cache.SharedIndexInformer,
	secondaryNatsClient1 entityv1.IEntityClient[TNatsSec1],
	secondaryNatsClient2 entityv1.IEntityClient[TNatsSec2],
	secondaryNatsClient3 entityv1.IEntityClient[TNatsSec3],
	secondaryNatsClient4 entityv1.IEntityClient[TNatsSec4],
	terciaryNatsClient1 entityv1.IEntityClient[TNatsTer1],
	terciaryNatsClient2 entityv1.IEntityClient[TNatsTer2],
	terciaryNatsClient3 entityv1.IEntityClient[TNatsTer3],
	terciaryNatsClient4 entityv1.IEntityClient[TNatsTer4],
	secondaryExternalClients []externalv1.IExternalClient,
	resyncInterval time.Duration,
) (
	controllerRuntime *ControllerRuntime[TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4],
)

NewControllerRuntime returns a ControllerRuntime object

func (*ControllerRuntime[TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4]) DecomposeKubeSecondaryResource

func (c *ControllerRuntime[
	TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4,
]) DecomposeKubeSecondaryResource(
	obj any,
) (
	secondaryResource metav1.Object,
	primaryKey string,
	err error,
)

DecomposeKubeSecondaryResource returns, using a generic any reference, the secondary kube resource and the key of the related primary resource (its owner, if exists).

func (*ControllerRuntime[TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4]) EnqueueKey

func (c *ControllerRuntime[
	TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4,
]) EnqueueKey(key string, operation Operation)

EnqueueKey takes a NATS primary resource key and put in onto the work queue

func (*ControllerRuntime[TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4]) EnqueueKeyAfter added in v0.0.6

func (c *ControllerRuntime[
	TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4,
]) EnqueueKeyAfter(key string, operation Operation, delay time.Duration)

EnqueueKeyAfter takes a NATS primary resource key and put in onto the work queue but after the indicated duration has passed

func (*ControllerRuntime[TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4]) Event

func (c *ControllerRuntime[
	TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4,
]) Event(object runtime.Object, eventtype string, reason string, message string)

Event constructs an event from the given information and puts it in the queue for sending. The resulting event will be created in the same namespace as the object.

  • 'object' is the object this event is about. TBD: problem: the recorder work with "kubernetes objects", not "nats objects"
  • 'eventtype' of this event, and can be one of Normal, Warning. New types could be added in future
  • 'reason' is the reason this event is generated. 'reason' should be short and unique; it should be in UpperCamelCase format (starting with a capital letter). "reason" will be used to automate handling of events, so imagine people writing switch statements to handle them. You want to make that easy.
  • 'message' is intended to be human readable.

func (*ControllerRuntime[TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4]) Run

func (c *ControllerRuntime[
	TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4,
]) Run(threadiness int, stopCh <-chan struct{}, errCh chan<- error)

Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. It will block until stopCh is closed, at which point it will shutdown the workqueue and wait for workers to finish processing their current work items.

func (*ControllerRuntime[TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4]) SetCustomController

func (c *ControllerRuntime[
	TNatsPrim, TNatsSec1, TNatsSec2, TNatsSec3, TNatsSec4, TNatsTer1, TNatsTer2, TNatsTer3, TNatsTer4,
]) SetCustomController(customController IController)

type IController

type IController interface {

	// Run start the reconcyling loop
	Run(threadiness int, stopCh <-chan struct{}, errCh chan<- error)

	// SyncHandler compares the actual state with the desired, and attempts to
	// converge the two. It is triggered every time a primary resource is pulled
	// from the work-queue,
	// IF SYNCHANDLER RETURNS AN ERROR, THE PRIMARY RESOURCE IS REENQUEUED TO BE
	// EVALUATED LATER
	SyncHandler(string) error

	// HandleSecondary is triggered when a secondary object has changed,
	// and it is processed in a very simply way: force to re-evaluation of the
	// related primary resource enqueueing it.
	HandleSecondary(obj any)

	// GarbageSecondaries is in charge of search secondary objects whose owner not
	// exists (so the "DELETE/PURGE" operation has not been execute due to some
	// reason)
	GarbageSecondaries()

	// Clean must perform any necessary actions when the controller is stopped.
	Clean()
}

IController is the interface that any custom controller must implement

type Operation

type Operation uint8

Operation type is just a codification for NATS operations

const (
	OperationPut Operation = iota
	OperationDelete
	OperationPurge
	OperationSecondary
)

Allowed values for Operation type

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳