Documentation
¶
Overview ¶
Package sender is a set of types that implement skogul.Sender. A Sender in skogul is a simple primitive that receives skogul metrics and "does something with them".
The traditional and obvious sender accepts metrics and uses and external service to persist them to disk. E.g.: the InfluxDB sender stores the metrics to influxdb. The postgres sender accepts metrics and stores to postgres, and so forth.
The other type of senders are "internal", typical for routing. The classic examples are the "dupe" sender that accepts metrics and passes them on to multiple other senders - e.g.: Store to both postgres and influxdb. An other classic is the "fallback" sender: It has a list of senders and tries each one in order until one succeeds, allowing you to send to a primary influxdb normally - if influx fails, write to local disk, if that fails, write a message to the log.
The only thing a sender "must" do is implement Send(c *skogul.Container), and it is disallowed to modify the container in Send(), since multiple senders might be working on it at the same time.
To make a sender configurable, simply ensure data types in the type definition can be Unmarshalled from JSON. A small note on that is that it is necessary to use "SenderRef" and "HandlerRef" objects instead of Sender and Handler directly for now. This is to let the config engine track references that haven't resolved yet.
It also means certain data types need to be avoided or worked around. Currently, time.Duration is such an example, as it is missing a JSON unmrashaller. For such data types, a simple wrapper will do the trick, e.g. skogul.Duration wraps time.Duration.
Index ¶
- Variables
- type Backoff
- type Batch
- type Counter
- type Debug
- type Detacher
- type Dupe
- type EnrichmentUpdater
- type ErrDiverter
- type Fallback
- type Fanout
- type File
- type ForwardAndFail
- type HTTP
- type InfluxDB
- type Kafka
- type Log
- type MQTT
- type Match
- type MnR
- type Nats
- type Net
- type Null
- type Rabbitmq
- type SNMP
- type SQL
- type Sleeper
- type Splunk
- type Switch
- type TCP
- type Test
- func (rcv *Test) Received() uint64
- func (rcv *Test) Send(c *skogul.Container) error
- func (rcv *Test) Set(v uint64)
- func (rcv *Test) SetSync(v bool)
- func (rcv *Test) TestNegative(t failer, s skogul.Sender, c *skogul.Container)
- func (rcv *Test) TestQuick(t failer, sender skogul.Sender, c *skogul.Container, received uint64)
- func (rcv *Test) TestSync(t failer, s skogul.Sender, c *skogul.Container, send int, received int)
- func (rcv *Test) TestTime(t failer, s skogul.Sender, c *skogul.Container, received uint64, ...)
Constants ¶
This section is empty.
Variables ¶
var Auto skogul.ModuleMap
Auto maps sender-names to sender implementation, used for auto configuration.
Functions ¶
This section is empty.
Types ¶
type Backoff ¶
type Backoff struct { Next skogul.SenderRef `doc:"The sender to try"` Base skogul.Duration `doc:"Initial delay after a failure. Will double for each retry"` Retries uint64 `doc:"Number of retries before giving up"` // contains filtered or unexported fields }
Backoff sender will send to Next, but retry up to Retries times, with exponential backoff, starting with time.Duration
type Batch ¶
type Batch struct { Next skogul.SenderRef `doc:"Sender that will receive batched metrics"` Interval skogul.Duration `doc:"Flush the bucket after this duration regardless of how full it is. Default is 1s."` Threshold int `doc:"Flush the bucket after reaching this amount of metrics. Default is 10."` Threads int `doc:"Number of threads for batch sender. Defaults to number of CPU cores."` Burner skogul.SenderRef `` /* 520-byte string literal not displayed */ // contains filtered or unexported fields }
Batch sender collects metrics into a single container then passes them on after Threshold number of metrics are collected. In case Threshold is "never" reached, it will periodically flush metrics if no message has been received in Interval time.
Internally, the Batch sender consists of three parts. The first part is the Send() part, which just pushes the received container onto a channel.
The second part, which is a single, dedicated go routine, picks up said container and adds it to a batch-container. When the batch container is "full" (e.g.: exceeds Threshold) - or a timeout is reached - the batch container is pushed onto a second channel and a new, empty, batch container is created.
The third part picks up the ready-to-send containers and issues next.Send() on them. This is a separate go routine, one per NumCPU.
This means that:
- Batch sender will both do a "fan-in" from potentially multiple Send() calls.
- ... and do a fan-out afterwards.
- Send() will only block if two channels are full.
type Counter ¶
type Counter struct { Next skogul.SenderRef `doc:"Reference to the next sender in the chain"` Stats skogul.HandlerRef `doc:"Handler that will receive the stats periodically"` Period skogul.Duration `doc:"How often to emit stats. Defaults to 1 second." example:"5s"` // contains filtered or unexported fields }
Counter sender emits, periodically, the flow-rate of metrics through it. The stats are sent on to the Stats-sender every Period.
To avoid locks and support multiple go routines using the same counter, stats are sent over a channel to a separate goroutine that does the actual aggregation and calculation.
type Debug ¶
type Debug struct { Prefix string `doc:"Prefix to print before any metric"` Encoder skogul.EncoderRef `doc:"Which encoder to use. Defaults to prettyjson."` }
Debug sender simply prints the metrics in json-marshalled format to stdout.
type Detacher ¶
type Detacher struct { Next skogul.SenderRef `doc:"Sender that receives the metrics."` Depth int `doc:"How many containers can be pending delivery before we start blocking. Defaults to 1000."` // contains filtered or unexported fields }
Detacher accepts a message, sends it to a channel, then picks it up on the other end in a separate go routine. This, unfortunately, leads to fan-in: if used in conjunction with HTTP receiver, for example, you end up going from multiple independent go routines to a single one, which is probably not what you want.
The purpose is to smooth out reading.
type Dupe ¶
type Dupe struct {
Next []*skogul.SenderRef `doc:"List of senders that will receive metrics, in order."`
}
Dupe sender executes all provided senders in turn.
type EnrichmentUpdater ¶ added in v0.15.1
type EnrichmentUpdater struct {
Enricher skogul.TransformerRef `doc:"The enrichment transformer to update."`
}
EnrichmentUpdater sends any received container/metric to the update-function of the provided transformer, allowing on-the-fly updates to enrichment.
func (*EnrichmentUpdater) Send ¶ added in v0.15.1
func (e *EnrichmentUpdater) Send(c *skogul.Container) error
Uses received metrics to update the enrichment transformer
func (*EnrichmentUpdater) Verify ¶ added in v0.15.1
func (e *EnrichmentUpdater) Verify() error
type ErrDiverter ¶
type ErrDiverter struct { Next skogul.SenderRef `doc:"Send normal metrics here."` Err skogul.HandlerRef `doc:"If the sender under Next fails, convert the error to a metric and send it here."` RetErr bool `` /* 130-byte string literal not displayed */ }
ErrDiverter calls the Next sender, but if it fails, it will convert the error to a Container and send that to Err.
type Fallback ¶
type Fallback struct {
Next []*skogul.SenderRef `doc:"Ordered list of senders that will potentially receive metrics."`
}
Fallback sender tries each provided sender in turn before failing.
E.g.:
primary := sender.InfluxDB{....} secondary := sender.Queue{....} // Not implemented yet emergency := sender.Debug{} fallback := sender.Fallback{} fallback.Add(&primary) fallback.Add(&secondary) fallback.Add(&emergency)
This will send data to Influx normally. If Influx fails, it will send it to a queue. If that fails, it will print it to stdout.
type Fanout ¶
type Fanout struct { Next skogul.SenderRef `doc:"Sender receiving the metrics"` Workers int `doc:"Number of worker threads in use. To _fan_in_ you can set this to 1."` // contains filtered or unexported fields }
Fanout sender implements a worker pool for passing data on. This SHOULD be unnecessary, as the receiver should ideally do this for us (e.g.: the HTTP receiver does this natively). However, there might be times where it makes sense, specially since this can be used in reverse too: you can use the Fanout sender to limit the degree of concurrency that downstream is exposed to.
Again, this should really not be needed. If you use the fanout sender, be sure you understand why.
There only settings provided is "Next" to provide the next sender, and "Workers", that defines the size of the worker pool.
type File ¶ added in v0.8.0
type File struct { Path string `doc:"Absolute path to file to write. DEPRECATED - replaced by option File (to keep options more consistent across modules)."` File string `doc:"Absolute path to file to write to."` Append bool `` /* 130-byte string literal not displayed */ Encoder skogul.EncoderRef `doc:"Which encoder to use. Defaults to JSON."` // contains filtered or unexported fields }
File sender writes data to a file in various different fashions. Typical use will be debugging (write to disk) and writing to a FIFO for example.
Created file under path given by File option.
When SIGHUP signal is received File will be truncated. In case SIGHUP is received and the file doesn't exists the file will be created.
When Append option is supplied, and this sender receives a SIGHUP data will be appended to file, if the file exists.
func (*File) Deprecated ¶ added in v0.19.0
type ForwardAndFail ¶
ForwardAndFail sender will pass the container to the Next sender, but always returns an error. The use-case for this is to allow the fallback Sender or similar to eventually send data to a sender that ALWAYS works, e.g. the Debug-sender or just printing a message in the log, but we still want to propagate the error upwards in the stack so clients can take appropriate action.
Example use:
faf := sender.ForwardAndFail{Next: skogul.Debug{}} fb := sender.Fallback{Next: []skogul.Sender{influx, faf}}
type HTTP ¶
type HTTP struct { URL string `doc:"Fully qualified URL to send data to." example:"http://localhost:6081/ https://user:password@[::1]:6082/"` Headers map[string]string `doc:"HTTP headers to be added to every request"` Timeout skogul.Duration `doc:"HTTP timeout."` Insecure bool `doc:"Disable TLS certificate validation."` ConnsPerHost int `doc:"Max concurrent connections per host. Should reflect ulimit -n. Defaults to unlimited."` IdleConnsPerHost int `doc:"Max idle connections retained per host. Should reflect expected concurrency. Defaults to 2 + runtime.NumCPU."` RootCA string `doc:"Path to an alternate root CA used to verify server certificates. Leave blank to use system defaults."` Certfile string `doc:"Path to certificate file for TLS Client Certificate."` Keyfile string `doc:"Path to key file for TLS Client Certificate."` Encoder skogul.EncoderRef `doc:"Encoder to use. Defaults to JSON-encoding."` // contains filtered or unexported fields }
HTTP sender POSTs the Skogul JSON-encoded data to the provided URL.
type InfluxDB ¶
type InfluxDB struct { URL string `doc:"URL to InfluxDB API. Must include write end-point and database to write to." example:"http://[::1]:8086/write?db=foo"` Measurement string `doc:"Measurement name to write to."` MeasurementFromMetadata string `` /* 177-byte string literal not displayed */ Timeout skogul.Duration `doc:"HTTP timeout"` ConvertIntToFloat bool `doc:"Convert all integers to floats. Don't do this unless you really know why you're doing this."` Token skogul.Secret `doc:"Authorization token used in InfluxDB 2.0"` // contains filtered or unexported fields }
InfluxDB posts data to the provided URL and measurement, using the InfluxDB line format over HTTP.
type Kafka ¶ added in v0.16.0
type Kafka struct { Topic string `doc:"Topic to write to."` Sync bool `doc:"Synchronous or not. By default, the sender is async."` Address string `doc:"Address for the broker."` ClientID string `doc:"ClientID to use - uses lower-case skogul by default."` TLS bool `doc:"Enable TLS, off by default."` Username string `doc:"Username for SASL auth."` Password string `doc:"Password for SASL auth."` Encoder skogul.EncoderRef // contains filtered or unexported fields }
Kafka sender is a MVP-variant, and further features are reasonable and expected, including but not limited to:
- Authentication (coming before release) - Better control of batching, probably - Dynamic keys from metadata - Adjustment of various timeouts
type Log ¶
type Log struct {
Message string `doc:"Message to print."`
}
Log sender simply executes log.Print() on a predefined message.
Intended use is in combination with other senders, e.g. to explain WHY sender.Debug() was used.
type MQTT ¶
type MQTT struct { Broker string `doc:"Address of broker to send to" example:"[::1]:8888"` Topics []string `doc:"Topic(s) to publish events to"` Username string `doc:"MQTT broker authorization username"` Password string `doc:"MQTT broker authorization password"` ClientID string `doc:"Custom client id to use (default: random)"` // contains filtered or unexported fields }
MQTT Sender publishes messages on a MQTT message bus.
FIXME: The MQTT-sender and receiver should be updated to not use the url-encoded scheme.
type Match ¶ added in v0.15.0
type Match struct { Conditions []map[string]interface{} `doc:"Array of metadata headers and required values."` Next *skogul.SenderRef `doc:"Sender to use in case of a match."` }
Match describes a list of conditions that need to match for a sender to receive metrics.
type MnR ¶
type MnR struct { Address string `doc:"Address to send data to" example:"192.168.1.99:1234"` DefaultGroup string `doc:"Default group to use if the metadatafield group is missing."` }
MnR sender writes to M&R port collector.
The output format is:
<timestamp>\t<groupname>\t<variable>\t<value>(\t<property>=<value>)*
Example:
1199145600 group myDevice.Variable1 100 device=myDevice name=MyVariable1
Two special metadata fields can be provided: "group" will set the M&R storage group, and "prefix" will be used to prefix all individual data variables.
E.g:
{ "template": { "timestamp": "2019-03-15T11:08:02+01:00", "metadata": { "server": "somewhere.example.com" } }, "metrics": [ { "metadata": { "prefix": "myDevice.", "key": "value", "paramkey": "paramvalue" }, "data": { "astring": "text", "float": 1.11, "integer": 5 } } ] }
Will result in:
1552644482 group myDevice.astring text key=value paramkey=paramvalue server=somewhere.example.com 1552644482 group myDevice.float 1.11 key=value paramkey=paramvalue server=somewhere.example.com 1552644482 group myDevice.integer 5 key=value paramkey=paramvalue server=somewhere.example.com
The default group is set to that of MnR DefaultGroup. If this is unset, the default group is "group". Meaning:
- If metadata provides "group" key, this is used - Otherwise, if DefaultGroup is set in MnR sender, this is used - Otherwise, "group" is used.
func (*MnR) Send ¶
Send to MnR.
Implementation details: We need to write each value as its own variable to MnR, so we start by constructing two buffers for what comes before and after the key\tvalue, then iterate over m.Data.
Also, we open a new TCP connection for each call to Send() at the moment, which is really suboptimal for large quantities of data, but ok for occasional data dumps. If large metric containers are received, the cost will be negligible. But this should, of course, be fixed in the future.
type Nats ¶ added in v0.21.0
type Nats struct { Servers string `doc:"Comma separated list of nats URLs"` Subject string `doc:"Subject to publish messages on"` SubjectAppend []string `doc:"Append theese Metadata fields to subject"` Name string `doc:"Client name"` Username string `doc:"Client username"` Password string `doc:"Client password"` TLSClientKey string `doc:"TLS client key file path"` TLSClientCert string `doc:"TLS client cert file path"` TLSCACert string `doc:"CA cert file path"` UserCreds string `doc:"Nats credentials file path"` NKeyFile string `doc:"Nats nkey file path"` Insecure bool `doc:"TLS InsecureSkipVerify"` Encoder skogul.EncoderRef // contains filtered or unexported fields }
type Net ¶
type Net struct { Address string `doc:"Address to send data to" example:"192.168.1.99:1234"` Network string `doc:"Network, according to net.Dial. Typically udp or tcp."` }
Net sends metrics to a network address FIXME: Use Encoder
type Rabbitmq ¶ added in v0.24.0
type Rabbitmq struct { Username skogul.Secret `doc:"Username for rabbitmq instance"` Password skogul.Secret `doc:"Password for rabbitmq instance"` Host string `doc:"Hostname for rabbitmq instance. Fallback is localhost"` Port string `doc:"Port for rabbitmq instance. Fallback is 5672"` Queue string `doc:"Queue to write to"` Encoder skogul.EncoderRef `doc:"Encoder to use. Fallback is json"` Timeout int `doc:"Timeout for rabbitmq instance connection. Fallback is 10 seconds."` // contains filtered or unexported fields }
type SNMP ¶ added in v0.24.0
type SNMP struct { Port uint16 `doc:"Snmp port"` Community string `doc:"Snmp communit field"` Version string `doc:"Snmp version possible values: 2c, 3"` Target string `doc:"Snmp target"` Oidmap map[string]interface{} `doc:"Snmp oid to json field mapping"` Timeout uint `doc:"Snmp timeout, default 5 seconds"` SnmpTrapOID string `doc:"Value of the snmp trap oid pdu"` // contains filtered or unexported fields }
type SQL ¶
type SQL struct { ConnStr string `` /* 233-byte string literal not displayed */ Query string `` /* 708-byte string literal not displayed */ Driver string `doc:"Database driver/system. Currently suported: mysql and postgres."` // contains filtered or unexported fields }
SQL sender connects to a SQL Database, currently either MySQL(or Mariadb I suppose) or Postgres. The Connection String for MySQL is specified at https://github.com/go-sql-driver/mysql/ and postgres at http://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-CONNSTRING .
The query is expanded using os.Expand() and will fill in timestamp/metadata/data. The sender will prep the query and essentially covert INSERT INTO foo VLAUES(${timestamp},${metadata.foo},${someData}) to foo("INSERT INTO foo VALUES(?,?,?)", timestamp, foo, someData), so they will be sensibly escaped.
type Sleeper ¶
type Sleeper struct { Next skogul.SenderRef `doc:"Sender that will receive delayed metrics"` MaxDelay skogul.Duration `doc:"The maximum delay we will suffer"` Base skogul.Duration `doc:"The baseline - or minimum - delay"` Verbose bool `doc:"If set to true, will log delay durations"` }
The Sleeper sender injects a random delay between Base and Base+MaxDelay before passing execution over to the Next sender.
The purpose is testing.
type Splunk ¶ added in v0.10.0
type Splunk struct { URL string `doc:"URL to Splunk HTTP Event Collector (HEC)"` Token skogul.Secret `doc:"Token for HTTP Authorization header for HEC endpoint."` Index string `doc:"Custom Splunk index to send event to."` HostnameField string `` /* 134-byte string literal not displayed */ SourceField string `doc:"Name of the metadata field with the source. Will fallback to the value set in Source if not found."` Source string `` /* 138-byte string literal not displayed */ HTTP *HTTP `doc:"HTTP sender options. URL is overwritten from this config, the rest will be HTTP sender defaults unless overridden."` // contains filtered or unexported fields }
Splunk contains the configuration parameters for this sender.
type Switch ¶ added in v0.15.0
type Switch struct { Default *skogul.SenderRef `doc:"Default sender to use if no other match is made. If not specified, metrics are discarded."` Map []Match `doc:"List of match conditions."` Next []*skogul.SenderRef `doc:"Ordered list of senders that will potentially receive metrics."` }
Switch sender sends metrics selectively based on metadata.
Example config:
{ "type": "switch", "map": [ { "conditions": [{ "router": "foo", "interface": "ae12" }, { "router": "bar", "interface": "ae0" }], "next": "customerExportA" }, { "conditions": [{ "router": "foo", "interface": "ae5" }], "next": "customerExportB" } ], "default": "log-no-customer"
type TCP ¶ added in v0.25.0
type TCP struct { Address string `doc:"Address to send data to" example:"192.168.1.99:1234"` MaxRetries int `` /* 150-byte string literal not displayed */ Encoder skogul.EncoderRef `doc:"Encoder to use. Defaults to JSON-encoding."` DialTimeout skogul.Duration `doc:"Timeout for dialing. Includes DNS lookups and tcp connect."` WriteTimeout skogul.Duration `doc:"Write timeout. Strongly advised to set this to single-digit seconds."` KeepAlive skogul.Duration `doc:"Keepalive timer for TCP."` Delimiter []byte `` /* 128-byte string literal not displayed */ Threads int `` /* 177-byte string literal not displayed */ // contains filtered or unexported fields }
TCP sender is optimized around TCP sockets, though it is by no means perfect. Ideally, use a properly stateful protocol like HTTP.
The main issue with the TCP sender is buffering and error detection. It is difficult to be both performant and detect errors as they happen, and this isn't made easier by the abstractions of Go.
As such, this sender works well, but does not offer a guarantee that errors are actually detected when they happen.
type Test ¶
type Test struct {
// contains filtered or unexported fields
}
Test sender is used to facilitate tests, and discards any metrics, but increments the Received counter.
func (*Test) SetSync ¶
SetSync sets the tester up for synchronized testing. Probably should be the default from now on....
func (*Test) TestNegative ¶
TestNegative sends data on s and expects to fail.
func (*Test) TestQuick ¶
TestQuick sends data on the sender and waits 5 milliseconds before checking that the data was received on the other end.