partition

package
v3.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2025 License: AGPL-3.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetGroupLag added in v3.4.0

func GetGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string, fallbackOffsetMillis int64) (kadm.GroupLag, error)

GetGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants. Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits.

The lag is the difference between the last produced offset (high watermark) and an offset in the "past". If the block builder committed an offset for a given partition to the consumer group at least once, then the lag is the difference between the last produced offset and the offset committed in the consumer group. Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis.

Types

type Committer

type Committer interface {
	Commit(ctx context.Context, offset int64) error
	EnqueueOffset(offset int64)
}

Committer defines an interface for committing offsets

type Consumer

type Consumer interface {
	Start(ctx context.Context, recordsChan <-chan []Record) func()
}

type ConsumerFactory

type ConsumerFactory func(committer Committer, logger log.Logger) (Consumer, error)

type KafkaOffsetManager added in v3.4.0

type KafkaOffsetManager struct {
	// contains filtered or unexported fields
}

func NewKafkaOffsetManager added in v3.4.0

func NewKafkaOffsetManager(
	cfg kafka.Config,
	instanceID string,
	logger log.Logger,
	reg prometheus.Registerer,
) (*KafkaOffsetManager, error)

func (*KafkaOffsetManager) Commit added in v3.4.0

func (r *KafkaOffsetManager) Commit(ctx context.Context, partitionID int32, offset int64) error

Commit commits an offset to the consumer group

func (*KafkaOffsetManager) ConsumerGroup added in v3.4.0

func (r *KafkaOffsetManager) ConsumerGroup() string

func (*KafkaOffsetManager) FetchLastCommittedOffset added in v3.4.0

func (r *KafkaOffsetManager) FetchLastCommittedOffset(ctx context.Context, partitionID int32) (int64, error)

FetchLastCommittedOffset retrieves the last committed offset for this partition

func (*KafkaOffsetManager) FetchPartitionOffset added in v3.4.0

func (r *KafkaOffsetManager) FetchPartitionOffset(ctx context.Context, partitionID int32, position SpecialOffset) (int64, error)

FetchPartitionOffset retrieves the offset for a specific position

func (*KafkaOffsetManager) GroupLag added in v3.4.0

func (r *KafkaOffsetManager) GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]Lag, error)

GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits.

func (*KafkaOffsetManager) Topic added in v3.4.0

func (r *KafkaOffsetManager) Topic() string

Topic returns the topic being read

type KafkaReader added in v3.4.0

type KafkaReader struct {
	// contains filtered or unexported fields
}

KafkaReader provides low-level access to Kafka partition reading operations

func NewKafkaReader added in v3.4.0

func NewKafkaReader(
	cfg kafka.Config,
	partitionID int32,
	logger log.Logger,
	metrics *ReaderMetrics,
) (*KafkaReader, error)

func (*KafkaReader) Partition added in v3.4.0

func (r *KafkaReader) Partition() int32

Partition returns the partition being read

func (*KafkaReader) Poll added in v3.4.0

func (r *KafkaReader) Poll(ctx context.Context, maxPollRecords int) ([]Record, error)

Poll retrieves the next batch of records from Kafka Number of records fetched can be limited by configuring maxPollRecords to a non-zero value.

func (*KafkaReader) SetOffsetForConsumption added in v3.4.0

func (r *KafkaReader) SetOffsetForConsumption(offset int64)

func (*KafkaReader) SetPhase added in v3.4.0

func (r *KafkaReader) SetPhase(phase string)

SetPhase sets the phase for the reader. This is used to differentiate between different phases of the reader. For example, we can use this to differentiate between the startup phase and the running phase.

func (*KafkaReader) Topic added in v3.4.0

func (r *KafkaReader) Topic() string

Topic returns the topic being read

type Lag added in v3.4.0

type Lag struct {
	// contains filtered or unexported fields
}

Partition level metadata in a more easily digestible form than what Kafka provides

func NewLag added in v3.4.0

func NewLag(startOffset, endOffset, committedOffset, rawLag int64) Lag

func (Lag) FirstUncommittedOffset added in v3.4.0

func (l Lag) FirstUncommittedOffset() int64

FirstUncommittedOffset returns the first offset that has not yet been committed

func (Lag) Lag added in v3.4.0

func (l Lag) Lag() int64

Lag returns the difference between the last produced offset and the first Uncommitted (but available) offset

func (Lag) LastCommittedOffset added in v3.4.0

func (l Lag) LastCommittedOffset() int64

func (Lag) NextAvailableOffset added in v3.4.0

func (l Lag) NextAvailableOffset() int64

NextAvailableOffset returns the next unwritten offset in a partition, i.e. the end offset (exclusive)

type OffsetManager added in v3.4.0

type OffsetManager interface {
	Topic() string
	ConsumerGroup() string

	// GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits.
	GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]Lag, error)
	FetchLastCommittedOffset(ctx context.Context, partition int32) (int64, error)
	FetchPartitionOffset(ctx context.Context, partition int32, position SpecialOffset) (int64, error)
	Commit(ctx context.Context, partition int32, offset int64) error
}

type Reader

type Reader interface {
	Topic() string
	Partition() int32
	Poll(ctx context.Context, maxPollRecords int) ([]Record, error)
	// Set the target offset for consumption. reads will begin from here.
	SetOffsetForConsumption(offset int64)
	// SetPhase sets the phase for the reader. This is used to differentiate between different phases of the reader.
	// For example, we can use this to differentiate between the startup phase and the running phase.
	SetPhase(phase string)
}

type ReaderConfig added in v3.4.0

type ReaderConfig struct {
	MaxConsumerLagAtStartup       time.Duration
	ConsumerGroupOffsetCommitFreq time.Duration
}

type ReaderMetrics added in v3.4.0

type ReaderMetrics struct {
	// contains filtered or unexported fields
}

ReaderMetrics contains metrics specific to Kafka reading operations

func NewReaderMetrics added in v3.4.0

func NewReaderMetrics(r prometheus.Registerer) *ReaderMetrics

type ReaderService added in v3.4.0

type ReaderService struct {
	services.Service
	// contains filtered or unexported fields
}

func NewReaderService added in v3.4.0

func NewReaderService(
	kafkaCfg kafka.Config,
	partitionID int32,
	instanceID string,
	consumerFactory ConsumerFactory,
	logger log.Logger,
	reg prometheus.Registerer,
) (*ReaderService, error)

mimics `NewReader` constructor but builds a reader service using a reader.

type Record

type Record struct {
	// Context holds the tracing (and potentially other) info, that the record was enriched with on fetch from Kafka.
	Ctx      context.Context
	TenantID string
	Content  []byte
	Offset   int64
}

type SpecialOffset added in v3.4.0

type SpecialOffset int
const (
	KafkaStartOffset SpecialOffset = -2
	KafkaEndOffset   SpecialOffset = -1
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳