Documentation
¶
Index ¶
- Variables
- func CloseReader(reader *Reader, context string)
- func CloseWriter(writer *Writer, context string)
- func Consume(reader *Reader, consumerHandler func(Message))
- func CreateDialer(config *clowder.BrokerConfig) (*kafka.Dialer, error)
- func CreateSaslMechanism(saslConfig *clowder.KafkaSASLConfig) (sasl.Mechanism, error)
- func CreateTLSConfig(caContents *string) *tls.Config
- func CreateTransport(sasl sasl.Mechanism, tls *tls.Config) *kafka.Transport
- func Produce(writer *Writer, message *Message) error
- type Header
- type Logger
- type Message
- func (message *Message) AddHeaders(headers []Header)
- func (message *Message) AddValue(record []byte)
- func (message *Message) AddValueAsJSON(record interface{}) error
- func (message *Message) GetHeader(name string) string
- func (message *Message) ParseTo(output interface{}) error
- func (message *Message) SetKeyFromHeaders()
- func (message *Message) TranslateHeaders() []Header
- type Options
- type Reader
- type Writer
Constants ¶
This section is empty.
Variables ¶
var ( // package level sharable kafka variables, safe to use concurrently. Dialer *kafka.Dialer Transport *kafka.Transport TlsConfig *tls.Config SaslMechanism sasl.Mechanism )
Functions ¶
func CloseReader ¶
CloseReader attempts to close the provided reader and logs the error if it fails.
func CloseWriter ¶
CloseWriter attempts to close the provided writer and logs the error if it fails.
func CreateDialer ¶
func CreateDialer(config *clowder.BrokerConfig) (*kafka.Dialer, error)
CreateDialer returns a Kafka dialer for the Kafka Go library, which includes the TLS configuration and the Sasl mechanism to connect to the managed Kafka.
func CreateSaslMechanism ¶
func CreateSaslMechanism(saslConfig *clowder.KafkaSASLConfig) (sasl.Mechanism, error)
CreateSaslMechanism returns a Sasl mechanism that Kafka Go requires for setting up the connection. Currently, we support plain, scram-sha-256 and scram-sha-512 mechanisms.
func CreateTLSConfig ¶
CreateTLSConfig returns a TLS configuration. The minimum TLS version is set to 1.2 and if the "caContents" are not empty the provided certificate is included as "trusted" for the TLS configuration.
func CreateTransport ¶
CreateTransport returns a kafka transport that is memoized since it can be used concurrently
Types ¶
type Logger ¶
type Logger interface { Debugf(msg string, args ...interface{}) Errorf(msg string, args ...interface{}) }
wrapper around the logger methods we need
type Message ¶
type Message kafka.Message
func (*Message) AddHeaders ¶
func (*Message) AddValueAsJSON ¶
func (*Message) SetKeyFromHeaders ¶
func (message *Message) SetKeyFromHeaders()
Set the key on the kafka message from the headers, using this precedence: 1. OrgID, every req _should_ have one of these. 2. EBS Account Number, fallback 3. XRHID, if we have neither...hopefully there is a x-rh-identity we can use!
func (*Message) TranslateHeaders ¶
translate a kafka message's headers from segmentio/kafka -> our kafka
type Options ¶
type Options struct { // REQUIRED FIELDS BrokerConfig []clowder.BrokerConfig Topic string // only used for reader, optional. GroupID *string // logger to pass along Logger Logger }
Options is a struct for creating a reader/writer