Documentation
¶
Index ¶
- func GetGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string, ...) (kadm.GroupLag, error)
- type Committer
- type Consumer
- type ConsumerFactory
- type KafkaOffsetManager
- func (r *KafkaOffsetManager) Commit(ctx context.Context, partitionID int32, offset int64) error
- func (r *KafkaOffsetManager) ConsumerGroup() string
- func (r *KafkaOffsetManager) FetchLastCommittedOffset(ctx context.Context, partitionID int32) (int64, error)
- func (r *KafkaOffsetManager) FetchPartitionOffset(ctx context.Context, partitionID int32, position SpecialOffset) (int64, error)
- func (r *KafkaOffsetManager) GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]Lag, error)
- func (r *KafkaOffsetManager) Topic() string
- type KafkaReader
- type Lag
- type OffsetManager
- type Reader
- type ReaderConfig
- type ReaderMetrics
- type ReaderService
- type Record
- type SpecialOffset
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetGroupLag ¶ added in v3.4.0
func GetGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string, fallbackOffsetMillis int64) (kadm.GroupLag, error)
GetGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants. Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits.
The lag is the difference between the last produced offset (high watermark) and an offset in the "past". If the block builder committed an offset for a given partition to the consumer group at least once, then the lag is the difference between the last produced offset and the offset committed in the consumer group. Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis.
Types ¶
type Committer ¶
type Committer interface { Commit(ctx context.Context, offset int64) error EnqueueOffset(offset int64) }
Committer defines an interface for committing offsets
type ConsumerFactory ¶
type KafkaOffsetManager ¶ added in v3.4.0
type KafkaOffsetManager struct {
// contains filtered or unexported fields
}
func NewKafkaOffsetManager ¶ added in v3.4.0
func NewKafkaOffsetManager( cfg kafka.Config, instanceID string, logger log.Logger, reg prometheus.Registerer, ) (*KafkaOffsetManager, error)
func (*KafkaOffsetManager) ConsumerGroup ¶ added in v3.4.0
func (r *KafkaOffsetManager) ConsumerGroup() string
func (*KafkaOffsetManager) FetchLastCommittedOffset ¶ added in v3.4.0
func (r *KafkaOffsetManager) FetchLastCommittedOffset(ctx context.Context, partitionID int32) (int64, error)
FetchLastCommittedOffset retrieves the last committed offset for this partition
func (*KafkaOffsetManager) FetchPartitionOffset ¶ added in v3.4.0
func (r *KafkaOffsetManager) FetchPartitionOffset(ctx context.Context, partitionID int32, position SpecialOffset) (int64, error)
FetchPartitionOffset retrieves the offset for a specific position
func (*KafkaOffsetManager) GroupLag ¶ added in v3.4.0
func (r *KafkaOffsetManager) GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]Lag, error)
GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits.
func (*KafkaOffsetManager) Topic ¶ added in v3.4.0
func (r *KafkaOffsetManager) Topic() string
Topic returns the topic being read
type KafkaReader ¶ added in v3.4.0
type KafkaReader struct {
// contains filtered or unexported fields
}
KafkaReader provides low-level access to Kafka partition reading operations
func NewKafkaReader ¶ added in v3.4.0
func NewKafkaReader( cfg kafka.Config, partitionID int32, logger log.Logger, metrics *ReaderMetrics, ) (*KafkaReader, error)
func (*KafkaReader) Partition ¶ added in v3.4.0
func (r *KafkaReader) Partition() int32
Partition returns the partition being read
func (*KafkaReader) Poll ¶ added in v3.4.0
Poll retrieves the next batch of records from Kafka Number of records fetched can be limited by configuring maxPollRecords to a non-zero value.
func (*KafkaReader) SetOffsetForConsumption ¶ added in v3.4.0
func (r *KafkaReader) SetOffsetForConsumption(offset int64)
func (*KafkaReader) SetPhase ¶ added in v3.4.0
func (r *KafkaReader) SetPhase(phase string)
SetPhase sets the phase for the reader. This is used to differentiate between different phases of the reader. For example, we can use this to differentiate between the startup phase and the running phase.
func (*KafkaReader) Topic ¶ added in v3.4.0
func (r *KafkaReader) Topic() string
Topic returns the topic being read
type Lag ¶ added in v3.4.0
type Lag struct {
// contains filtered or unexported fields
}
Partition level metadata in a more easily digestible form than what Kafka provides
func (Lag) FirstUncommittedOffset ¶ added in v3.4.0
FirstUncommittedOffset returns the first offset that has not yet been committed
func (Lag) Lag ¶ added in v3.4.0
Lag returns the difference between the last produced offset and the first Uncommitted (but available) offset
func (Lag) LastCommittedOffset ¶ added in v3.4.0
func (Lag) NextAvailableOffset ¶ added in v3.4.0
NextAvailableOffset returns the next unwritten offset in a partition, i.e. the end offset (exclusive)
type OffsetManager ¶ added in v3.4.0
type OffsetManager interface { Topic() string ConsumerGroup() string // GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits. GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]Lag, error) FetchLastCommittedOffset(ctx context.Context, partition int32) (int64, error) FetchPartitionOffset(ctx context.Context, partition int32, position SpecialOffset) (int64, error) Commit(ctx context.Context, partition int32, offset int64) error }
type Reader ¶
type Reader interface { Topic() string Partition() int32 Poll(ctx context.Context, maxPollRecords int) ([]Record, error) // Set the target offset for consumption. reads will begin from here. SetOffsetForConsumption(offset int64) // SetPhase sets the phase for the reader. This is used to differentiate between different phases of the reader. // For example, we can use this to differentiate between the startup phase and the running phase. SetPhase(phase string) }
type ReaderConfig ¶ added in v3.4.0
type ReaderMetrics ¶ added in v3.4.0
type ReaderMetrics struct {
// contains filtered or unexported fields
}
ReaderMetrics contains metrics specific to Kafka reading operations
func NewReaderMetrics ¶ added in v3.4.0
func NewReaderMetrics(r prometheus.Registerer) *ReaderMetrics
type ReaderService ¶ added in v3.4.0
func NewReaderService ¶ added in v3.4.0
func NewReaderService( kafkaCfg kafka.Config, partitionID int32, instanceID string, consumerFactory ConsumerFactory, logger log.Logger, reg prometheus.Registerer, ) (*ReaderService, error)
mimics `NewReader` constructor but builds a reader service using a reader.
type SpecialOffset ¶ added in v3.4.0
type SpecialOffset int
const ( KafkaStartOffset SpecialOffset = -2 KafkaEndOffset SpecialOffset = -1 )