Documentation
¶
Overview ¶
Package kafka provides encoding and decoding functionality for Loki's Kafka integration.
Index ¶
Constants ¶
const (
// ProducerBatchMaxBytes is the max allowed size of a batch of Kafka records.
ProducerBatchMaxBytes = 16_000_000
)
Variables ¶
var (
ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured")
ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured")
ErrInconsistentSASLUsernameAndPassword = errors.New("both sasl username and password must be set")
ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit)
)
Functions ¶
func Encode ¶
func Encode(partitionID int32, tenantID string, stream logproto.Stream, maxSize int) ([]*kgo.Record, error)
Encode converts a logproto.Stream into one or more Kafka records. It handles splitting large streams into multiple records if necessary.
The encoding process works as follows: 1. If the stream size is smaller than maxSize, it's encoded into a single record. 2. For larger streams, it splits the entries into multiple batches, each under maxSize. 3. The data is wrapped in a Kafka record with the tenant ID as the key.
The format of each record is: - Key: Tenant ID (used for routing, not for partitioning) - Value: Protobuf serialized logproto.Stream - Partition: As specified in the partitionID parameter
Parameters: - partitionID: The Kafka partition ID for the record - tenantID: The tenant ID for the stream - stream: The logproto.Stream to be encoded - maxSize: The maximum size of each Kafka record
Types ¶
type Config ¶
type Config struct {
Address string `yaml:"address"`
Topic string `yaml:"topic"`
ClientID string `yaml:"client_id"`
DialTimeout time.Duration `yaml:"dial_timeout"`
WriteTimeout time.Duration `yaml:"write_timeout"`
SASLUsername string `yaml:"sasl_username"`
SASLPassword flagext.Secret `yaml:"sasl_password"`
ConsumerGroup string `yaml:"consumer_group"`
ConsumerGroupOffsetCommitInterval time.Duration `yaml:"consumer_group_offset_commit_interval"`
LastProducedOffsetRetryTimeout time.Duration `yaml:"last_produced_offset_retry_timeout"`
AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"`
AutoCreateTopicDefaultPartitions int `yaml:"auto_create_topic_default_partitions"`
ProducerMaxRecordSizeBytes int `yaml:"producer_max_record_size_bytes"`
ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"`
MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"`
}
Config holds the generic config for the Kafka backend.
func (*Config) GetConsumerGroup ¶
func (cfg *Config) GetConsumerGroup(instanceID string) string
GetConsumerGroup returns the consumer group to use for the given instanceID and partitionID.
func (*Config) RegisterFlags ¶
func (cfg *Config) RegisterFlags(f *flag.FlagSet)
func (*Config) RegisterFlagsWithPrefix ¶
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
type Decoder ¶
type Decoder struct {
// contains filtered or unexported fields
}
Decoder is responsible for decoding Kafka record data back into logproto.Stream format. It caches parsed labels for efficiency.
func NewDecoder ¶
func NewDecoder() (*Decoder, error)
func (*Decoder) Decode ¶
func (d *Decoder) Decode(data []byte) (logproto.Stream, labels.Labels, error)
Decode converts a Kafka record's byte data back into a logproto.Stream and labels.Labels. The decoding process works as follows: 1. Unmarshal the data into a logproto.Stream. 2. Parse and cache the labels for efficiency in future decodes.
Returns the decoded logproto.Stream, parsed labels, and any error encountered.
func (*Decoder) DecodeWithoutLabels ¶
func (d *Decoder) DecodeWithoutLabels(data []byte) (logproto.Stream, error)
DecodeWithoutLabels converts a Kafka record's byte data back into a logproto.Stream without parsing labels.