Documentation
¶
Index ¶
- Constants
- type Config
- type Evictable
- type Evictor
- type IngestLimits
- func (s *IngestLimits) CheckReady(ctx context.Context) error
- func (s *IngestLimits) Collect(m chan<- prometheus.Metric)
- func (s *IngestLimits) Describe(descs chan<- *prometheus.Desc)
- func (s *IngestLimits) Flush()
- func (s *IngestLimits) GetAssignedPartitions(_ context.Context, _ *logproto.GetAssignedPartitionsRequest) (*logproto.GetAssignedPartitionsResponse, error)
- func (s *IngestLimits) GetStreamUsage(_ context.Context, req *logproto.GetStreamUsageRequest) (*logproto.GetStreamUsageResponse, error)
- func (s *IngestLimits) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (s *IngestLimits) TransferOut(_ context.Context) error
- type PartitionManager
- func (m *PartitionManager) Assign(_ context.Context, _ *kgo.Client, partitions map[string][]int32)
- func (m *PartitionManager) Has(partitionID int32) bool
- func (m *PartitionManager) List() map[int32]int64
- func (m *PartitionManager) Remove(_ context.Context, _ *kgo.Client, partitions map[string][]int32)
Constants ¶
const ( // Ring RingKey = "ingest-limits" RingName = "ingest-limits" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // Enabled enables the ingest limits service. Enabled bool `yaml:"enabled"` // WindowSize defines the time window for which stream metadata is considered active. // Stream metadata older than WindowSize will be evicted from the metadata map. WindowSize time.Duration `yaml:"window_size"` // RateWindow defines the time window for rate calculation. // This should match the window used in Prometheus rate() queries for consistency, // when using the `loki_ingest_limits_ingested_bytes_total` metric. // Defaults to 5 minutes if not specified. RateWindow time.Duration `yaml:"rate_window"` // BucketDuration defines the granularity of time buckets used for sliding window rate calculation. // Smaller buckets provide more precise rate tracking but require more memory. // Defaults to 1 minute if not specified. BucketDuration time.Duration `yaml:"bucket_duration"` // LifecyclerConfig is the config to build a ring lifecycler. LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"` KafkaConfig kafka.Config `yaml:"-"` // The number of partitions for the Kafka topic used to read and write stream metadata. // It is fixed, not a maximum. NumPartitions int `yaml:"num_partitions"` }
Config represents the configuration for the ingest limits service.
func (*Config) RegisterFlags ¶
type Evictor ¶
type Evictor struct {
// contains filtered or unexported fields
}
Evictor runs scheduled evictions.
type IngestLimits ¶
IngestLimits is a service that manages stream metadata limits.
func NewIngestLimits ¶
func NewIngestLimits(cfg Config, logger log.Logger, reg prometheus.Registerer) (*IngestLimits, error)
NewIngestLimits creates a new IngestLimits service. It initializes the metadata map and sets up a Kafka client The client is configured to consume stream metadata from a dedicated topic with the metadata suffix.
func (*IngestLimits) CheckReady ¶
func (s *IngestLimits) CheckReady(ctx context.Context) error
func (*IngestLimits) Collect ¶
func (s *IngestLimits) Collect(m chan<- prometheus.Metric)
func (*IngestLimits) Describe ¶
func (s *IngestLimits) Describe(descs chan<- *prometheus.Desc)
func (*IngestLimits) Flush ¶
func (s *IngestLimits) Flush()
Flush implements ring.FlushTransferer. It transfers state to another ingest limits instance.
func (*IngestLimits) GetAssignedPartitions ¶
func (s *IngestLimits) GetAssignedPartitions(_ context.Context, _ *logproto.GetAssignedPartitionsRequest) (*logproto.GetAssignedPartitionsResponse, error)
GetAssignedPartitions implements the logproto.IngestLimitsServer interface. It returns the partitions that the tenant is assigned to and the instance still owns.
func (*IngestLimits) GetStreamUsage ¶
func (s *IngestLimits) GetStreamUsage(_ context.Context, req *logproto.GetStreamUsageRequest) (*logproto.GetStreamUsageResponse, error)
GetStreamUsage implements the logproto.IngestLimitsServer interface. It returns the number of active streams for a tenant and the status of requested streams.
func (*IngestLimits) ServeHTTP ¶
func (s *IngestLimits) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements the http.Handler interface. It returns the current stream counts and status per tenant as a JSON response.
func (*IngestLimits) TransferOut ¶
func (s *IngestLimits) TransferOut(_ context.Context) error
TransferOut implements ring.FlushTransferer. It transfers state to another ingest limits instance.
type PartitionManager ¶
type PartitionManager struct {
// contains filtered or unexported fields
}
PartitionManager keeps track of the partitions assigned and for each partition a timestamp of when it was last updated.
func NewPartitionManager ¶
func NewPartitionManager(logger log.Logger) *PartitionManager
NewPartitionManager returns a new PartitionManager.
func (*PartitionManager) Assign ¶
Assign assigns the partitions and sets the last updated timestamp for each partition to the current time.
func (*PartitionManager) Has ¶
func (m *PartitionManager) Has(partitionID int32) bool
Has returns true if the partition is assigned, otherwise false.
func (*PartitionManager) List ¶
func (m *PartitionManager) List() map[int32]int64
List returns a map of all assigned partitions and their last updated timestamps.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package client provides gRPC client implementation for limits service.
|
Package client provides gRPC client implementation for limits service. |
Package frontend contains provides a frontend service for ingest limits.
|
Package frontend contains provides a frontend service for ingest limits. |
client
Package client provides gRPC client implementation for limits-frontend.
|
Package client provides gRPC client implementation for limits-frontend. |