Documentation
¶
Index ¶
- Constants
- type Action
- type Adapter
- type DelayedScenario
- type DelayedScenarioManager
- type DelayedScenarioStorage
- type DelayedScenarioTask
- type ExecuteScenariosTask
- type Execution
- type RpcResult
- type Scenario
- type ScenarioExecution
- type ScenarioExecutionStorage
- type ScenarioResult
- type ScenarioStorage
- type Service
- type Task
- type TaskManager
- type TaskResult
- type WorkerPool
Constants ¶
View Source
const ( PriorityField = "priority" IdField = "_id" )
View Source
const ( TaskNew = iota TaskNotMatched TaskCancelled TaskRpcError )
View Source
const AbandonedDuration = 60
View Source
const MaxRetries = 5
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Action ¶
type Action struct { Type string `bson:"type" json:"type"` Parameters interface{} `bson:"parameters,omitempty" json:"parameters,omitempty"` // parameters for the action AlarmPatterns pattern.AlarmPatternList `bson:"alarm_patterns" json:"alarm_patterns"` EntityPatterns pattern.EntityPatternList `bson:"entity_patterns" json:"entity_patterns"` DropScenarioIfNotMatched bool `bson:"drop_scenario_if_not_matched" json:"drop_scenario_if_not_matched"` EmitTrigger bool `bson:"emit_trigger" json:"emit_trigger"` }
Action represents a canopsis Action on alarms.
func (*Action) UnmarshalBSONValue ¶
func (*Action) UnmarshalJSON ¶
type Adapter ¶
type Adapter interface { GetEnabled() ([]Scenario, error) GetEnabledById(id string) (Scenario, error) GetEnabledByIDs(ids []string) ([]Scenario, error) }
func NewAdapter ¶
type DelayedScenario ¶
type DelayedScenarioManager ¶
type DelayedScenarioManager interface { AddDelayedScenario(context.Context, types.Alarm, Scenario) error PauseDelayedScenarios(context.Context, types.Alarm) error ResumeDelayedScenarios(context.Context, types.Alarm) error Run(context.Context) (<-chan DelayedScenarioTask, error) }
func NewDelayedScenarioManager ¶
func NewDelayedScenarioManager( adapter Adapter, alarmAdapter libalarm.Adapter, storage DelayedScenarioStorage, periodicalTimeout time.Duration, logger zerolog.Logger, ) DelayedScenarioManager
type DelayedScenarioStorage ¶
type DelayedScenarioStorage interface { Add(ctx context.Context, scenario DelayedScenario) (string, error) GetAll(ctx context.Context) ([]DelayedScenario, error) Get(ctx context.Context, id string) (*DelayedScenario, error) Delete(ctx context.Context, id string) (bool, error) Update(ctx context.Context, scenario DelayedScenario) (bool, error) }
type DelayedScenarioTask ¶
type ExecuteScenariosTask ¶
type Scenario ¶
type Scenario struct { ID string `bson:"_id,omitempty" json:"_id,omitempty"` Name string `bson:"name" json:"name"` Author string `bson:"author" json:"author"` Enabled bool `bson:"enabled" json:"enabled"` DisableDuringPeriods []string `bson:"disable_during_periods" json:"disable_during_periods"` Triggers []string `bson:"triggers" json:"triggers"` Actions []Action `bson:"actions" json:"actions"` Priority int `bson:"priority" json:"priority"` Delay *types.DurationWithUnit `bson:"delay" json:"delay"` Created types.CpsTime `bson:"created,omitempty" json:"created,omitempty"` Updated types.CpsTime `bson:"updated,omitempty" json:"updated,omitempty"` }
func (Scenario) IsTriggered ¶
type ScenarioExecution ¶
type ScenarioExecution struct { ID string `json:"-"` ScenarioID string `json:"-"` AlarmID string `json:"-"` Entity types.Entity `json:"e"` ActionExecutions []Execution `json:"ae"` LastUpdate int64 `json:"u"` AckResources bool `json:"ar"` Tries int64 `json:"t"` Header map[string]string `json:"h,omitempty"` Response map[string]interface{} `json:"r,omitempty"` }
type ScenarioExecutionStorage ¶
type ScenarioExecutionStorage interface { Get(ctx context.Context, executionID string) (*ScenarioExecution, error) GetAbandoned(ctx context.Context) ([]ScenarioExecution, error) Create(ctx context.Context, execution ScenarioExecution) (string, error) Update(ctx context.Context, execution ScenarioExecution) error Del(ctx context.Context, executionID string) error Inc(ctx context.Context, id string, inc int64, drop bool) (int64, error) }
type ScenarioResult ¶
type ScenarioStorage ¶
type ScenarioStorage interface { // ReloadScenarios trigger a refresh on scenarios cache from DB ReloadScenarios() error // GetTriggeredScenarios returns scenarios which are triggered by triggers. GetTriggeredScenarios( triggers []string, alarm types.Alarm, ) (triggered []Scenario, err error) // RunDelayedScenarios starts delay timeout for scenarios which are triggered by triggers. RunDelayedScenarios( ctx context.Context, triggers []string, alarm types.Alarm, entity types.Entity, ) error // GetScenario returns scenario. GetScenario(id string) *Scenario }
ScenarioStorage is used to provide scenarios.
func NewScenarioStorage ¶
func NewScenarioStorage( actionAdapter Adapter, delayedScenarioManager DelayedScenarioManager, logger zerolog.Logger, ) ScenarioStorage
type Service ¶
type Service interface { // Process parse an event to see if an action is suitable. Process(ctx context.Context, event *types.Event) error // ListenScenarioFinish receives message when all scenarios for event are finished // and acknowledges fifo. ListenScenarioFinish(ctx context.Context, channel <-chan ScenarioResult) // ProcessAbandonedExecutions checks execution storage and processes executions which // weren't updated for a long time ProcessAbandonedExecutions(ctx context.Context) error }
Service allows you to manipulate actions.
func NewService ¶
func NewService( alarmAdapter libalarm.Adapter, scenarioInputChan chan<- ExecuteScenariosTask, delayedScenarioManager DelayedScenarioManager, storage ScenarioExecutionStorage, encoder encoding.Encoder, fifoChan libamqp.Channel, fifoExchange string, fifoQueue string, activationService libalarm.ActivationService, logger zerolog.Logger, ) Service
NewService gives the correct action adapter.
type TaskManager ¶
type TaskManager interface { Run(ctx context.Context, rpcResultChannel <-chan RpcResult, inputChannel <-chan ExecuteScenariosTask) (<-chan ScenarioResult, error) }
TaskManager is used to execute scenarios.
func NewTaskManager ¶
func NewTaskManager( workerPool WorkerPool, executionStorage ScenarioExecutionStorage, scenarioStorage ScenarioStorage, logger zerolog.Logger, ) TaskManager
type TaskResult ¶
type WorkerPool ¶
type WorkerPool interface {
RunWorkers(ctx context.Context, taskChannel <-chan Task) (<-chan TaskResult, error)
}
Click to show internal directories.
Click to hide internal directories.