Documentation
¶
Index ¶
- Constants
- Variables
- func Run(config *viper.Viper, s chan error) (err error)
- type Agent
- func (a *Agent) GetSchemas() map[string]map[string]map[string]*sources.Column
- func (a *Agent) HealthCheck() (alive bool)
- func (a *Agent) InitAgent() (err error)
- func (a *Agent) InitRemoteMeta()
- func (a *Agent) LoadDeMultiplexer(demux *map[string][]string) error
- func (a *Agent) LoadMultiplexer(multiplexer *map[string][]string) error
- func (a *Agent) LoadSink(sinkName string, sinkType string) (err error)
- func (a *Agent) LoadSinks() (err error)
- func (a *Agent) LoadSource(sourceName string, sourceType string) (err error)
- func (a *Agent) LoadSources(multiplexer *map[string][]string, demux *map[string][]string) (err error)
- func (a *Agent) Poller()
- func (a *Agent) ProcessTask(task utils.Task) (err error)
- func (a *Agent) RemoteInit() error
- func (a *Agent) SendCapabilities() (err error)
- func (a *Agent) Start() error
- type Auth
- type Controller
- func (c *Controller) GetConfiguration() (config []byte, err error)
- func (c *Controller) GetMeta(metaName string) (meta utils.Metas, err error)
- func (c *Controller) GetTasks(limit int) (task []utils.Task, err error)
- func (c *Controller) SendCapabilities(tasks map[string]*utils.TaskDescription) (err error)
- func (c *Controller) SendMeta(meta utils.Metas) (err error)
- func (c *Controller) SendSchema(sourceName string, schema map[string]map[string]*sources.Column) (err error)
- func (c *Controller) SendSourcesCapabilities(sourceName string, tasks map[string]*utils.TaskDescription) (err error)
- func (c *Controller) UpdateTasks(task utils.Task) (err error)
- type ControllerConfig
- type DeMultiplexer
- type Multiplexer
- type Schema
Constants ¶
const ( AgentStatusStarting = "STARTING" AgentStatusWaitingForConf = "WAITING_FOT_CONFIGURATION" AgentStatusRegistred = "REGISTRED" AgentStatusUnRegistred = "UNREGISTRED" AgentStatusOnline = "ONLINE" AgentStatusOffline = "OFFLINE" AgentStatusOnError = "ON_ERROR" )
Possible Statuses
const AuthPath = "/auth/token"
AuthPath path of the authentication endpoint
const (
DefaultTimeOut = 60
)
const HeaderDcc = "X-Dcc-Auth"
Variables ¶
var WaitAuth = time.Second * 5
WaitAuth number of second to wait if authentication failed
Functions ¶
Types ¶
type Agent ¶
Agent representation of agent
func (*Agent) GetSchemas ¶
GetSchemas get schema from sources
func (*Agent) HealthCheck ¶
HealthCheck returns true if agent and all source are up
func (*Agent) InitAgent ¶ added in v0.2.0
InitAgent prepare agent from conf init multiplexer , source and sinks
func (*Agent) InitRemoteMeta ¶ added in v0.2.0
func (a *Agent) InitRemoteMeta()
InitRemoteMeta get meta from remote and assign it to sources
func (*Agent) LoadDeMultiplexer ¶ added in v0.2.0
LoadDeMultiplexer setup DeMultiplexer each source has its own DeMultiplexer
func (*Agent) LoadMultiplexer ¶
LoadMultiplexer setup Multiplexer each source as is own multiplexer
func (*Agent) LoadSource ¶
LoadSource create Source from name
func (*Agent) LoadSources ¶
func (a *Agent) LoadSources(multiplexer *map[string][]string, demux *map[string][]string) (err error)
LoadSources Load all Sources init sources from conf
func (*Agent) Poller ¶ added in v0.2.0
func (a *Agent) Poller()
Poller send meta and get task at each tick on every tick call sendMetaAndGetProcessTask
func (*Agent) ProcessTask ¶ added in v0.2.0
ProcessTask process given task from server and update status
func (*Agent) RemoteInit ¶ added in v0.2.0
RemoteInit init controller get configuration and meta from remote server send capabilities and schema to remote server
func (*Agent) SendCapabilities ¶ added in v0.2.0
SendCapabilities send capability to server
type Auth ¶
type Auth struct {
// contains filtered or unexported fields
}
Auth representation of auth
type Controller ¶
type Controller struct { PendingTask int // contains filtered or unexported fields }
Controller allow the collector to be controlled by API
func NewControllerClient ¶
func NewControllerClient(conf *viper.Viper, auth *Auth) *Controller
NewControllerClient create new controller from configuration return an initialized controller
func (*Controller) GetConfiguration ¶ added in v0.2.0
func (c *Controller) GetConfiguration() (config []byte, err error)
GetConfiguration get Configuration from server return sources and sinks
func (*Controller) GetMeta ¶ added in v0.2.0
func (c *Controller) GetMeta(metaName string) (meta utils.Metas, err error)
GetMeta get metadata by name from API get meta from name if metaName is empty get all metas return meta as Metas object
func (*Controller) GetTasks ¶ added in v0.2.0
func (c *Controller) GetTasks(limit int) (task []utils.Task, err error)
GetTasks get the list of tasks the collector needs to execute from the API if limit is set to -1 return all task else return limit set
func (*Controller) SendCapabilities ¶ added in v0.2.0
func (c *Controller) SendCapabilities(tasks map[string]*utils.TaskDescription) (err error)
SendCapabilities send the currently supported capabilities of this collector to the API
func (*Controller) SendMeta ¶ added in v0.2.0
func (c *Controller) SendMeta(meta utils.Metas) (err error)
SendMeta send meta to the API
func (*Controller) SendSchema ¶ added in v0.2.0
func (c *Controller) SendSchema(sourceName string, schema map[string]map[string]*sources.Column) (err error)
SendSchema send the schema to the API
func (*Controller) SendSourcesCapabilities ¶ added in v0.2.0
func (c *Controller) SendSourcesCapabilities(sourceName string, tasks map[string]*utils.TaskDescription) (err error)
SendSourcesCapabilities send the currently supported capabilities of the configured source to the API
func (*Controller) UpdateTasks ¶ added in v0.2.0
func (c *Controller) UpdateTasks(task utils.Task) (err error)
UpdateTasks update the status (and results if needed) of a task that has been executed by sending it to the API
type ControllerConfig ¶
type ControllerConfig struct { BaseURL string `mapstructure:"base_url" json:"base_url"` PollerTicker string `mapstructure:"poller_ticker" json:"poller_ticker"` Worker int `json:"worker"` }
ControllerConfig representation of controller config
type DeMultiplexer ¶ added in v0.2.0
type DeMultiplexer struct {
// contains filtered or unexported fields
}
DeMultiplexer is used to send offset from sink to source
func NewDemultiplexer ¶ added in v0.2.0
func NewDemultiplexer(ins []chan interface{}, out chan interface{}) (demux *DeMultiplexer)
NewDemultiplexer create new DeMultiplexer return an initialized DeMultiplexer object
type Multiplexer ¶
type Multiplexer struct {
// contains filtered or unexported fields
}
Multiplexer represent the Multiplexer of collector
func NewMultiplexer ¶
func NewMultiplexer(in chan events.LookatchEvent, outs []chan events.LookatchEvent) (multiplexer *Multiplexer)
NewMultiplexer create a new multiplexer