kafka

package
v0.0.0-...-4623b9b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2020 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// TopicAppLogTmpl is Kafka topic name template for LogMessage
	TopicAppLogTmpl = "app-log-%s"

	// TopicCFMetrics is Kafka topic name for ValueMetric
	TopicCFMetric = "cf-metrics"
)
View Source
const (
	// Default topic name for each event
	DefaultValueMetricTopic = "value-metric"
	DefaultLogMessageTopic  = "log-message"

	DefaultKafkaRepartitionMax = 5
	DefaultKafkaRetryMax       = 1
	DefaultKafkaRetryBackoff   = 100 * time.Millisecond

	DefaultChannelBufferSize  = 512
	DefaultSubInputBufferSize = 1024
)

Variables

This section is empty.

Functions

This section is empty.

Types

type KafkaProducer

type KafkaProducer struct {
	*kafka.Producer

	Logger *log.Logger
	Stats  *stats.Stats
	// contains filtered or unexported fields
}

KafkaProducer implements NozzleProducer interfaces

func (*KafkaProducer) Errors

func (kp *KafkaProducer) Errors() <-chan *kafka.Error

func (*KafkaProducer) Produce

func (kp *KafkaProducer) Produce(ctx context.Context, eventCh <-chan *events.Envelope)

Produce produces event to kafka

func (*KafkaProducer) ReadDeliveryChan

func (kp *KafkaProducer) ReadDeliveryChan()

func (*KafkaProducer) Successes

func (kp *KafkaProducer) Successes() <-chan *kafka.Message

type LogProducer

type LogProducer struct {
	Logger *log.Logger
	// contains filtered or unexported fields
}

LogProducer implements NozzleProducer interfaces. This producer is mainly used for debugging reason.

func (*LogProducer) Close

func (p *LogProducer) Close()

func (*LogProducer) Errors

func (p *LogProducer) Errors() <-chan *kafka.Error

func (*LogProducer) Produce

func (p *LogProducer) Produce(ctx context.Context, eventCh <-chan *events.Envelope)

func (*LogProducer) ReadDeliveryChan

func (p *LogProducer) ReadDeliveryChan()

func (*LogProducer) Successes

func (p *LogProducer) Successes() <-chan *kafka.Message

type NozzleProducer

type NozzleProducer interface {
	// Produce produces firehose events
	Produce(context.Context, <-chan *events.Envelope)

	// Errors returns error channel
	Errors() <-chan *kafka.Error

	// Success returns sarama.ProducerMessage
	Successes() <-chan *kafka.Message

	// Close shuts down the producer and flushes any messages it may have buffered.
	Close()

	ReadDeliveryChan()
}

func NewKafkaProducer

func NewKafkaProducer(logger *log.Logger, stats *stats.Stats, config *config.Config) (NozzleProducer, error)

Create new KafkaProducer with the possibility to handle security

func NewLogProducer

func NewLogProducer(logger *log.Logger) NozzleProducer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳