Documentation
¶
Index ¶
Constants ¶
View Source
const ( // TopicAppLogTmpl is Kafka topic name template for LogMessage TopicAppLogTmpl = "app-log-%s" // TopicCFMetrics is Kafka topic name for ValueMetric TopicCFMetric = "cf-metrics" )
View Source
const ( // Default topic name for each event DefaultValueMetricTopic = "value-metric" DefaultLogMessageTopic = "log-message" DefaultKafkaRepartitionMax = 5 DefaultKafkaRetryMax = 1 DefaultKafkaRetryBackoff = 100 * time.Millisecond DefaultChannelBufferSize = 512 DefaultSubInputBufferSize = 1024 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KafkaProducer ¶
type KafkaProducer struct { *kafka.Producer Logger *log.Logger Stats *stats.Stats // contains filtered or unexported fields }
KafkaProducer implements NozzleProducer interfaces
func (*KafkaProducer) Errors ¶
func (kp *KafkaProducer) Errors() <-chan *kafka.Error
func (*KafkaProducer) Produce ¶
func (kp *KafkaProducer) Produce(ctx context.Context, eventCh <-chan *events.Envelope)
Produce produces event to kafka
func (*KafkaProducer) ReadDeliveryChan ¶
func (kp *KafkaProducer) ReadDeliveryChan()
func (*KafkaProducer) Successes ¶
func (kp *KafkaProducer) Successes() <-chan *kafka.Message
type LogProducer ¶
LogProducer implements NozzleProducer interfaces. This producer is mainly used for debugging reason.
func (*LogProducer) Close ¶
func (p *LogProducer) Close()
func (*LogProducer) Errors ¶
func (p *LogProducer) Errors() <-chan *kafka.Error
func (*LogProducer) Produce ¶
func (p *LogProducer) Produce(ctx context.Context, eventCh <-chan *events.Envelope)
func (*LogProducer) ReadDeliveryChan ¶
func (p *LogProducer) ReadDeliveryChan()
func (*LogProducer) Successes ¶
func (p *LogProducer) Successes() <-chan *kafka.Message
type NozzleProducer ¶
type NozzleProducer interface { // Produce produces firehose events Produce(context.Context, <-chan *events.Envelope) // Errors returns error channel Errors() <-chan *kafka.Error // Success returns sarama.ProducerMessage Successes() <-chan *kafka.Message // Close shuts down the producer and flushes any messages it may have buffered. Close() ReadDeliveryChan() }
func NewKafkaProducer ¶
func NewKafkaProducer(logger *log.Logger, stats *stats.Stats, config *config.Config) (NozzleProducer, error)
Create new KafkaProducer with the possibility to handle security
func NewLogProducer ¶
func NewLogProducer(logger *log.Logger) NozzleProducer
Click to show internal directories.
Click to hide internal directories.