limits

package
v3.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2025 License: AGPL-3.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Ring
	RingKey  = "ingest-limits"
	RingName = "ingest-limits"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Enabled enables the ingest limits service.
	Enabled bool `yaml:"enabled"`

	// WindowSize defines the time window for which stream metadata is considered active.
	// Stream metadata older than WindowSize will be evicted from the metadata map.
	WindowSize time.Duration `yaml:"window_size"`

	// RateWindow defines the time window for rate calculation.
	// This should match the window used in Prometheus rate() queries for consistency,
	// when using the `loki_ingest_limits_ingested_bytes_total` metric.
	// Defaults to 5 minutes if not specified.
	RateWindow time.Duration `yaml:"rate_window"`

	// BucketDuration defines the granularity of time buckets used for sliding window rate calculation.
	// Smaller buckets provide more precise rate tracking but require more memory.
	// Defaults to 1 minute if not specified.
	BucketDuration time.Duration `yaml:"bucket_duration"`

	// LifecyclerConfig is the config to build a ring lifecycler.
	LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`

	KafkaConfig kafka.Config `yaml:"-"`

	// The number of partitions for the Kafka topic used to read and write stream metadata.
	// It is fixed, not a maximum.
	NumPartitions int `yaml:"num_partitions"`
}

Config represents the configuration for the ingest limits service.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

func (*Config) Validate

func (cfg *Config) Validate() error

type Evictable

type Evictable interface {
	Evict(context.Context) error
}

type Evictor

type Evictor struct {
	// contains filtered or unexported fields
}

Evictor runs scheduled evictions.

func NewEvictor

func NewEvictor(ctx context.Context, interval time.Duration, target Evictable, logger log.Logger) (*Evictor, error)

NewEvictor returns a new evictor over the interval.

func (*Evictor) Run

func (e *Evictor) Run() error

Runs the scheduler loop until the context is canceled.

type IngestLimits

type IngestLimits struct {
	services.Service
	// contains filtered or unexported fields
}

IngestLimits is a service that manages stream metadata limits.

func NewIngestLimits

func NewIngestLimits(cfg Config, logger log.Logger, reg prometheus.Registerer) (*IngestLimits, error)

NewIngestLimits creates a new IngestLimits service. It initializes the metadata map and sets up a Kafka client The client is configured to consume stream metadata from a dedicated topic with the metadata suffix.

func (*IngestLimits) CheckReady

func (s *IngestLimits) CheckReady(ctx context.Context) error

func (*IngestLimits) Collect

func (s *IngestLimits) Collect(m chan<- prometheus.Metric)

func (*IngestLimits) Describe

func (s *IngestLimits) Describe(descs chan<- *prometheus.Desc)

func (*IngestLimits) Flush

func (s *IngestLimits) Flush()

Flush implements ring.FlushTransferer. It transfers state to another ingest limits instance.

func (*IngestLimits) GetAssignedPartitions

GetAssignedPartitions implements the logproto.IngestLimitsServer interface. It returns the partitions that the tenant is assigned to and the instance still owns.

func (*IngestLimits) GetStreamUsage

GetStreamUsage implements the logproto.IngestLimitsServer interface. It returns the number of active streams for a tenant and the status of requested streams.

func (*IngestLimits) ServeHTTP

func (s *IngestLimits) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements the http.Handler interface. It returns the current stream counts and status per tenant as a JSON response.

func (*IngestLimits) TransferOut

func (s *IngestLimits) TransferOut(_ context.Context) error

TransferOut implements ring.FlushTransferer. It transfers state to another ingest limits instance.

type PartitionManager

type PartitionManager struct {
	// contains filtered or unexported fields
}

PartitionManager keeps track of the partitions assigned and for each partition a timestamp of when it was last updated.

func NewPartitionManager

func NewPartitionManager(logger log.Logger) *PartitionManager

NewPartitionManager returns a new PartitionManager.

func (*PartitionManager) Assign

func (m *PartitionManager) Assign(_ context.Context, _ *kgo.Client, partitions map[string][]int32)

Assign assigns the partitions and sets the last updated timestamp for each partition to the current time.

func (*PartitionManager) Has

func (m *PartitionManager) Has(partitionID int32) bool

Has returns true if the partition is assigned, otherwise false.

func (*PartitionManager) List

func (m *PartitionManager) List() map[int32]int64

List returns a map of all assigned partitions and their last updated timestamps.

func (*PartitionManager) Remove

func (m *PartitionManager) Remove(_ context.Context, _ *kgo.Client, partitions map[string][]int32)

Directories

Path Synopsis
Package client provides gRPC client implementation for limits service.
Package client provides gRPC client implementation for limits service.
Package frontend contains provides a frontend service for ingest limits.
Package frontend contains provides a frontend service for ingest limits.
client
Package client provides gRPC client implementation for limits-frontend.
Package client provides gRPC client implementation for limits-frontend.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳