Documentation
¶
Index ¶
- Constants
- Variables
- func AliveWindowBuckets(staleness, bucketSize time.Duration) []string
- func BucketDeadTime(bucketName string, bucketSize, staleness time.Duration) time.Time
- func BucketName(ts time.Time, bucketSize time.Duration) string
- func BucketTime(bucketName string, bucketSize time.Duration) time.Time
- func ContextWithSelector(ctx context.Context, selector string) context.Context
- func DeadWindowBuckets(staleness, bucketSize time.Duration) []string
- func LoggerFromContext(ctx context.Context) logr.Logger
- func NormalizeAny(t any) (any, error)
- func NormalizeFQN(fqn, defaultNamespace string) (string, error)
- func NormalizeSelector(selector, defaultNamespace string) (string, error)
- func ScalarFromString(val string, scalar PrimitiveType) (any, error)
- func ScalarString(val any) string
- func SelectorFromContext(ctx context.Context) (string, error)
- func ToLowLevelValue[T LowLevelValue](v any) T
- type AggrFn
- type BindConfig
- type CollectNotification
- type CollectNotifierFactory
- type ContextKey
- type DataSource
- type DataSourceGetter
- type DataSourceManager
- type DataSourceReconcile
- type DataSourceReconcileRequest
- type Engine
- type ExtendedManager
- type FeatureApply
- type FeatureDescriptor
- type FeatureDescriptorGetter
- type FeatureManager
- type HistoricalWriter
- type HistoricalWriterFactory
- type KeepPrevious
- type Keys
- type Logger
- type LowLevelValue
- type ManagerEngine
- type Middleware
- type MiddlewareHandler
- type ModelDescriptor
- type ModelReconcileRequest
- type ModelServer
- type Notification
- type Notifier
- type NotifierFactory
- type ParsedProgram
- type Pipeliner
- type Plugins
- type PrimitiveType
- type RawBucket
- type RawBuckets
- type RuntimeManager
- type State
- type StateFactory
- type StateMethod
- type Value
- type WindowResultMap
- type WriteNotification
- type WriteNotifierFactory
Constants ¶
const DeadGracePeriod = time.Minute * 10
DeadGracePeriod is the *extra* time that the bucket should be kept alive on top of the feature's Staleness. Bucket TTL = staleness + DeadGracePeriod
const ModelBuilder = "model"
const SourcelessBuilder = "sourceless"
Variables ¶
var ErrFeatureAlreadyExists = fmt.Errorf("feature already exists")
ErrFeatureAlreadyExists is returned when a feature is already registered in the Core's engine manager.
var ErrFeatureNotFound = fmt.Errorf("feature not found")
ErrFeatureNotFound is returned when a feature is not found in the Core's engine manager.
var ErrInvalidPipelineContext = fmt.Errorf("invalid pipeline context")
ErrInvalidPipelineContext is returned when the context is invalid for pipelining.
var ErrUnsupportedAggrError = fmt.Errorf("unsupported aggr")
ErrUnsupportedAggrError is returned when an aggregate function is not supported.
var ErrUnsupportedPrimitiveError = fmt.Errorf("unsupported primitive")
ErrUnsupportedPrimitiveError is returned when a primitive is not supported.
var FQNRegExp = regexp.MustCompile(`(?si)^((?P<namespace>[a-z0-9]+(?:_[a-z0-9]+)*)\.)?(?P<name>[a-z0-9]+(?:_[a-z0-9]+)*)(\+(?P<aggrFn>([a-z]+_*[a-z]+)))?(@-(?P<version>([0-9]+)))?(\[(?P<encoding>([a-z]+_*[a-z]+))])?$`)
Functions ¶
func AliveWindowBuckets ¶
func AliveWindowBuckets(staleness, bucketSize time.Duration) []string
AliveWindowBuckets returns a list of all the *valid* buckets up until now
func BucketDeadTime ¶
func BucketDeadTime(bucketName string, bucketSize, staleness time.Duration) time.Time
BucketDeadTime returns the end time of a given bucket by its name
func BucketName ¶
func BucketName(ts time.Time, bucketSize time.Duration) string
BucketName returns a bucket name for a given timestamp and a bucket size
func BucketTime ¶
func BucketTime(bucketName string, bucketSize time.Duration) time.Time
BucketTime returns the start time of a given bucket by its name
func ContextWithSelector ¶
func ContextWithSelector(ctx context.Context, selector string) context.Context
func DeadWindowBuckets ¶
func DeadWindowBuckets(staleness, bucketSize time.Duration) []string
DeadWindowBuckets returns a list of bucket names of *dead* bucket (bucket that is outside the window) that should be available
func LoggerFromContext ¶
func LoggerFromContext(ctx context.Context) logr.Logger
LoggerFromContext returns the logger from the context. If not found it returns a discarded logger.
func NormalizeAny ¶
func NormalizeAny(t any) (any, error)
func NormalizeFQN ¶
func NormalizeFQN(fqn, defaultNamespace string) (string, error)
NormalizeFQN returns an FQN with the namespace
func NormalizeSelector ¶
func NormalizeSelector(selector, defaultNamespace string) (string, error)
NormalizeSelector returns a selector with the default namespace if not specified
func ScalarFromString ¶
func ScalarFromString(val string, scalar PrimitiveType) (any, error)
func ScalarString ¶
func ScalarString(val any) string
func SelectorFromContext ¶
func SelectorFromContext(ctx context.Context) (string, error)
func ToLowLevelValue ¶
func ToLowLevelValue[T LowLevelValue](v any) T
ToLowLevelValue returns the low level value of the feature
Types ¶
type AggrFn ¶
type AggrFn int
AggrFn is an aggregation function
const (
AggrFnUnknown AggrFn = iota
AggrFnSum
AggrFnAvg
AggrFnMax
AggrFnMin
AggrFnCount
)
func ParseSelector ¶
func ParseSelector(fqn string) (namespace, name string, aggrFn AggrFn, version uint, encoding string, err error)
func StringToAggrFn ¶
func StringToAggrFn(s string) AggrFn
func StringsToAggrFns ¶
func StringsToAggrFns(fns []string) ([]AggrFn, error)
type BindConfig ¶
type BindConfig func(set *pflag.FlagSet) error
BindConfig adds config flags for the plugin.
type CollectNotification ¶
type CollectNotification struct {
FQN string `json:"fqn"`
EncodedKeys string `json:"encoded_keys"`
Bucket string `json:"bucket,omitempty"`
}
type CollectNotifierFactory ¶
type CollectNotifierFactory NotifierFactory[CollectNotification]
type ContextKey ¶
type ContextKey int
ContextKey is a key to store data in context.
const (
// ContextKeyCachePostGet is a key to store the flag to cache in the storage postGet value.
// If not set it is defaulting to true.
ContextKeyCachePostGet ContextKey = iota
// ContextKeyCacheFresh is a key to store the flag that indicate if the result from the cache was fresh.
ContextKeyCacheFresh
// ContextKeyFromCache is a key to store the flag to indicate if the value is from the cache.
ContextKeyFromCache
// ContextKeyLogger is a key to store a logger.
ContextKeyLogger
// ContextKeySelector is a key to store the requested Feature Selector.
ContextKeySelector
)
type DataSource ¶
type DataSource struct {
FQN string `json:"fqn"`
Kind string `json:"kind"`
Config manifests.ParsedConfig `json:"config"`
}
DataSource is a parsed abstracted representation of a manifests.DataSource
func DataSourceFromManifest ¶
func DataSourceFromManifest(ctx context.Context, src *manifests.DataSource, r client.Reader) (DataSource, error)
DataSourceFromManifest returns a DataSource from a manifests.DataSource
type DataSourceGetter ¶
type DataSourceGetter interface {
GetDataSource(FQN string) (DataSource, error)
}
DataSourceGetter is a simple interface that returns a DataSource
type DataSourceManager ¶
type DataSourceManager interface {
BindDataSource(fd DataSource) error
UnbindDataSource(FQN string) error
HasDataSource(FQN string) bool
}
DataSourceManager is managing DataSource(s) within Core It is responsible for maintaining the DataSource(s) in an internal store
type DataSourceReconcile ¶
type DataSourceReconcile func(ctx context.Context, rr DataSourceReconcileRequest) (changed bool, err error)
DataSourceReconcile is the interface to be implemented by plugins that want to be reconciled in the operator. This is useful for plugins that need to spawn an external Feature Ingestion.
It returns ture if the reconciliation has changed the object (and therefore the operator should re-queue).
type DataSourceReconcileRequest ¶
type DataSourceReconcileRequest struct {
DataSource *manifests.DataSource
RuntimeManager RuntimeManager
Client client.Client
Scheme *runtime.Scheme
CoreAddress string
}
DataSourceReconcileRequest contains metadata for the reconcile.
type Engine ¶
type Engine interface {
// FeatureDescriptor returns the FeatureDescriptor for the given FQN
FeatureDescriptor(ctx context.Context, selector string) (FeatureDescriptor, error)
// Get returns the value for the given FQN and keys
// If the feature is not available, it returns nil.
// If the feature is windowed, the returned Value is a map from window function to Value.
Get(ctx context.Context, selector string, keys Keys) (Value, FeatureDescriptor, error)
// Set sets the raw value for the given FQN and keys
// If the feature's primitive is a List, it replaces the entire list.
// If the feature is windowed, it is aliased to WindowAdd instead of Set.
Set(ctx context.Context, FQN string, keys Keys, val any, ts time.Time) error
// Append appends to the raw value for the given FQN and keys
// If the feature's primitive is NOT a List it will throw an error.
Append(ctx context.Context, FQN string, keys Keys, val any, ts time.Time) error
// Incr increments the raw value of the feature.
// If the feature's primitive is NOT a Scalar it will throw an error.
// It returns the updated value in the state, and an error if occurred.
Incr(ctx context.Context, FQN string, keys Keys, by any, ts time.Time) error
// Update is the common function to update a feature SimpleValue.
// Under the hood, it utilizes lower-level functions depending on the type of the feature.
// - Set for Scalars
// - Append for Lists
// - WindowAdd for Windows
Update(ctx context.Context, FQN string, keys Keys, val any, ts time.Time) error
}
Engine is the main engine of the Core It is responsible for the low-level operation for the features against the feature store
type ExtendedManager ¶
type ExtendedManager interface {
Engine
RuntimeManager
DataSourceGetter
}
ExtendedManager is an Engine that has a DataSource
type FeatureApply ¶
type FeatureApply func(fd FeatureDescriptor, builder manifests.FeatureBuilder, pl Pipeliner, src ExtendedManager) error
FeatureApply applies changes on the feature abstraction.
type FeatureDescriptor ¶
type FeatureDescriptor struct {
FQN string `json:"FQN"`
Primitive PrimitiveType `json:"primitive"`
Aggr []AggrFn `json:"aggr"`
Freshness time.Duration `json:"freshness"`
Staleness time.Duration `json:"staleness"`
Timeout time.Duration `json:"timeout"`
KeepPrevious *KeepPrevious `json:"keep_previous"`
Keys []string `json:"keys"`
Builder string `json:"builder"`
RuntimeEnv string `json:"runtimeEnv"`
DataSource string `json:"data_source"`
Dependencies []string `json:"dependencies"`
}
FeatureDescriptor is describing a feature definition for an internal use of the Core.
func FeatureDescriptorFromManifest ¶
func FeatureDescriptorFromManifest(in *manifests.Feature) (*FeatureDescriptor, error)
FeatureDescriptorFromManifest returns a FeatureDescriptor from a manifests.Feature
func (FeatureDescriptor) ValidWindow ¶
func (fd FeatureDescriptor) ValidWindow() bool
ValidWindow checks if the feature have aggregation enabled, and if it is valid
type FeatureDescriptorGetter ¶
type FeatureDescriptorGetter func(ctx context.Context, FQN string) (FeatureDescriptor, error)
type FeatureManager ¶
type FeatureManager interface {
BindFeature(in *manifests.Feature) error
UnbindFeature(FQN string) error
HasFeature(FQN string) bool
}
FeatureManager is managing Feature(s) within Core It is responsible for managing features as well as operating on them
type HistoricalWriter ¶
type HistoricalWriter interface {
Commit(context.Context, WriteNotification) error
Flush(ctx context.Context, fqn string) error
FlushAll(context.Context) error
Close(ctx context.Context) error
BindFeature(fd *FeatureDescriptor, model *manifests.ModelSpec, getter FeatureDescriptorGetter) error
}
type HistoricalWriterFactory ¶
type HistoricalWriterFactory func(viper *viper.Viper) (HistoricalWriter, error)
type KeepPrevious ¶
type KeepPrevious struct {
Versions uint
Over time.Duration
}
type Logger ¶
type Logger interface {
Logger() logr.Logger
}
Logger is a simple interface that returns a Logr.Logger
type LowLevelValue ¶
type LowLevelValue interface {
~int | ~string | ~float64 | time.Time | ~[]int | ~[]string | ~[]float64 | ~[]time.Time | WindowResultMap
}
LowLevelValue is a low level value that can be cast to any type
type ManagerEngine ¶
type ManagerEngine interface {
Logger
FeatureManager
DataSourceManager
RuntimeManager
Engine
}
ManagerEngine is the business-logic implementation of the Core
type Middleware ¶
type Middleware func(next MiddlewareHandler) MiddlewareHandler
type MiddlewareHandler ¶
type MiddlewareHandler func(ctx context.Context, fd FeatureDescriptor, keys Keys, val Value) (Value, error)
type ModelDescriptor ¶
type ModelDescriptor struct {
Features []string `json:"features"`
KeyFeature string `json:"keyFeature,omitempty"`
Keys []string `json:"keys"`
ModelFramework string `json:"modelFramework"`
ModelServer string `json:"modelServer"`
InferenceConfig manifests.ParsedConfig `json:"inferenceConfig"`
}
type ModelReconcileRequest ¶
type ModelReconcileRequest struct {
Model *manifests.Model
Client client.Client
Scheme *runtime.Scheme
}
ModelReconcileRequest contains metadata for the reconcile.
type ModelServer ¶
type ModelServer interface {
Reconcile(ctx context.Context, rr ModelReconcileRequest) (changed bool, err error)
Owns() []client.Object
Serve(ctx context.Context, fd FeatureDescriptor, md ModelDescriptor, val Value) (Value, error)
}
ModelServer is the interface to be implemented by plugins that implements a Model Server.
type Notification ¶
type Notification interface {
CollectNotification | WriteNotification
}
type Notifier ¶
type Notifier[T Notification] interface {
Notify(context.Context, T) error
Subscribe(context.Context) (<-chan T, error)
}
Notifier is the interface to be implemented by plugins that want to provide a Queue implementation The Queue is used to sync notifications between instances
type NotifierFactory ¶
type NotifierFactory[T Notification] func(viper *viper.Viper) (Notifier[T], error)
NotifierFactory is the interface to be implemented by plugins that implements Notifier.
type ParsedProgram ¶
type ParsedProgram struct {
// Primitive is the primitive that this program is returning
Primitive PrimitiveType
// Dependencies is a list of FQNs that this program *might* be depended on
Dependencies []string
}
type Pipeliner ¶
type Pipeliner interface {
AddPreGetMiddleware(priority int, fn Middleware)
AddPostGetMiddleware(priority int, fn Middleware)
AddPreSetMiddleware(priority int, fn Middleware)
AddPostSetMiddleware(priority int, fn Middleware)
}
Pipeliner is the interface that plugins can use to modify the Core's feature pipelines on creation time
type Plugins ¶
type Plugins interface {
BindConfig | FeatureApply | DataSourceReconcile | StateFactory |
CollectNotifierFactory | WriteNotifierFactory |
HistoricalWriterFactory
}
type PrimitiveType ¶
type PrimitiveType int
const (
PrimitiveTypeUnknown PrimitiveType = iota
PrimitiveTypeString
PrimitiveTypeInteger
PrimitiveTypeFloat
PrimitiveTypeBoolean
PrimitiveTypeTimestamp
PrimitiveTypeStringList
PrimitiveTypeIntegerList
PrimitiveTypeFloatList
PrimitiveTypeBooleanList
PrimitiveTypeTimestampList
)
func StringToPrimitiveType ¶
func StringToPrimitiveType(s string) PrimitiveType
func TypeDetect ¶
func TypeDetect(t any) PrimitiveType
TypeDetect detects the PrimitiveType of the value.
type RawBucket ¶
type RawBucket struct {
FQN string `json:"FQN"`
Bucket string `json:"bucket"`
EncodedKeys string `json:"encoded_keys"`
Data WindowResultMap `json:"raw"`
}
RawBucket is the data that is stored in the raw bucket.
type RawBuckets ¶
type RawBuckets []RawBucket
type RuntimeManager ¶
type RuntimeManager interface {
// LoadProgram loads a program into the runtime.
LoadProgram(env, fqn, program string, packages []string) (*ParsedProgram, error)
// ExecuteProgram executes a program in the runtime.
ExecuteProgram(ctx context.Context, env string, fqn string, keys Keys, row map[string]any, ts time.Time, dryRun bool) (value Value, keyz Keys, err error)
// GetSidecars returns the sidecar containers attached to the current container.
GetSidecars() []v1.Container
// GetDefaultEnv returns the default environment for the current container.
GetDefaultEnv() string
}
type State ¶
type State interface {
// Get returns the SimpleValue of the feature.
// If the feature is not available, it returns nil.
// If the feature is windowed, the returned SimpleValue is a map from window function to SimpleValue.
// version indicates the previous version of the feature. If version is 0, the latest version is returned.
Get(ctx context.Context, fd FeatureDescriptor, keys Keys, version uint) (*Value, error)
// Set sets the SimpleValue of the feature.
// If the feature's primitive is a List, it replaces the entire list.
// If the feature is windowed, it is aliased to WindowAdd instead of Set.
Set(ctx context.Context, fd FeatureDescriptor, keys Keys, val any, timestamp time.Time) error
// Append appends the SimpleValue to the feature.
// If the feature's primitive is NOT a List it will throw an error.
Append(ctx context.Context, fd FeatureDescriptor, keys Keys, val any, ts time.Time) error
// Incr increments the SimpleValue of the feature.
// If the feature's primitive is NOT a Scalar it will throw an error.
// It returns the updated value in the state, and an error if occurred.
Incr(ctx context.Context, fd FeatureDescriptor, keys Keys, by any, timestamp time.Time) error
// Update is the common function to update a feature SimpleValue.
// Under the hood, it utilizes lower-level functions depending on the type of the feature.
// - Set for Scalars
// - Append for Lists
// - WindowAdd for Windows
Update(ctx context.Context, fd FeatureDescriptor, keys Keys, val any, timestamp time.Time) error
// WindowAdd adds a Bucket to the window that contains aggregated data internally
// Later the bucket's aggregations should be aggregated for the whole Window via Get
//
// Buckets should last *at least* as long as the feature's staleness time + DeadGracePeriod
WindowAdd(ctx context.Context, fd FeatureDescriptor, keys Keys, val any, timestamp time.Time) error
// WindowBuckets returns the list of RawBuckets for the feature and specific Keys.
WindowBuckets(ctx context.Context, fd FeatureDescriptor, keys Keys, buckets []string) (RawBuckets, error)
// DeadWindowBuckets returns the list of all the dead feature's RawBuckets of all the entities.
DeadWindowBuckets(ctx context.Context, fd FeatureDescriptor, ignore RawBuckets) (RawBuckets, error)
// Ping is a simple keepalive check for the state.
// It should return an error in case an error occurred, or nil if everything is alright.
Ping(ctx context.Context) error
}
State is a feature state management layer
type StateFactory ¶
type StateFactory func(viper *viper.Viper) (State, error)
StateFactory is the interface to be implemented by plugins that implements storage providers.
type StateMethod ¶
type StateMethod int
StateMethod is a method that can be used with a State.
const (
StateMethodGet StateMethod = iota
StateMethodSet
StateMethodAppend
StateMethodIncr
StateMethodUpdate
StateMethodWindowAdd
)
type Value ¶
type Value struct {
// Value can be cast to LowLevelValue using the ToLowLevelValue() method
Value any `json:"value"`
Timestamp time.Time `json:"timestamp"`
Fresh bool `json:"fresh"`
}
Value is storing a feature value.
type WindowResultMap ¶
type WindowResultMap map[AggrFn]float64
WindowResultMap is a map of AggrFn and their aggregated results
type WriteNotification ¶
type WriteNotification struct {
FQN string `json:"fqn"`
EncodedKeys string `json:"encoded_keys"`
Bucket string `json:"bucket,omitempty"`
ActiveBucket bool `json:"active_bucket,omitempty"`
Value *Value `json:"value,omitempty"`
}
type WriteNotifierFactory ¶
type WriteNotifierFactory NotifierFactory[WriteNotification]
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
proto
|
|
gen/go
Module
|
|
Package v1alpha1 contains API Schema definitions for the k8s.raptor.ml v1alpha1 API group +kubebuilder:object:generate=true +groupName=k8s.raptor.ml
|
Package v1alpha1 contains API Schema definitions for the k8s.raptor.ml v1alpha1 API group +kubebuilder:object:generate=true +groupName=k8s.raptor.ml |