Documentation
¶
Index ¶
- func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, l Limits, ...) *rateStore
- type Config
- type Distributor
- func (d *Distributor) HealthyInstancesCount() int
- func (d *Distributor) OTLPPushHandler(w http.ResponseWriter, r *http.Request)
- func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error)
- func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request)
- func (d *Distributor) ServeHTTP(w http.ResponseWriter, r *http.Request)
- type FieldDetector
- type KafkaProducer
- type KeyedStream
- type Limits
- type RateStore
- type RateStoreConfig
- type ReadLifecycler
- type RingConfig
- type ShardTracker
- type Tee
- type Validator
- func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time) (bool, time.Time, int)
- func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, labels labels.Labels, ...) error
- func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, stream logproto.Stream) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewRateStore ¶
func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, l Limits, registerer prometheus.Registerer) *rateStore
Types ¶
type Config ¶
type Config struct {
// Distributors ring
DistributorRing RingConfig `yaml:"ring,omitempty"`
PushWorkerCount int `yaml:"push_worker_count"`
// RateStore customizes the rate storing used by stream sharding.
RateStore RateStoreConfig `yaml:"rate_store"`
// WriteFailuresLoggingCfg customizes write failures logging behavior.
WriteFailuresLogging writefailures.Cfg `yaml:"write_failures_logging" doc:"description=Customize the logging of write failures."`
OTLPConfig push.GlobalOTLPConfig `yaml:"otlp_config"`
KafkaEnabled bool `yaml:"kafka_writes_enabled"`
IngesterEnabled bool `yaml:"ingester_writes_enabled"`
KafkaConfig kafka.Config `yaml:"-"`
// contains filtered or unexported fields
}
Config for a Distributor.
func (*Config) RegisterFlags ¶
func (cfg *Config) RegisterFlags(fs *flag.FlagSet)
RegisterFlags registers distributor-related flags.
type Distributor ¶
type Distributor struct {
services.Service
RequestParserWrapper push.RequestParserWrapper
// contains filtered or unexported fields
}
Distributor coordinates replicates and distribution of log streams.
func New ¶
func New(
cfg Config,
ingesterCfg ingester.Config,
clientCfg client.Config,
configs *runtime.TenantConfigs,
ingestersRing ring.ReadRing,
partitionRing ring.PartitionRingReader,
overrides Limits,
registerer prometheus.Registerer,
metricsNamespace string,
tee Tee,
usageTracker push.UsageTracker,
logger log.Logger,
) (*Distributor, error)
New a distributor creates.
func (*Distributor) HealthyInstancesCount ¶
func (d *Distributor) HealthyInstancesCount() int
HealthyInstancesCount implements the ReadLifecycler interface.
We use a ring lifecycler delegate to count the number of members of the ring. The count is then used to enforce rate limiting correctly for each distributor. $EFFECTIVE_RATE_LIMIT = $GLOBAL_RATE_LIMIT / $NUM_INSTANCES.
func (*Distributor) OTLPPushHandler ¶
func (d *Distributor) OTLPPushHandler(w http.ResponseWriter, r *http.Request)
func (*Distributor) Push ¶
func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error)
Push a set of streams. The returned error is the last one seen.
func (*Distributor) PushHandler ¶
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request)
PushHandler reads a snappy-compressed proto from the HTTP body.
func (*Distributor) ServeHTTP ¶
func (d *Distributor) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements the distributor ring status page.
If the rate limiting strategy is local instead of global, no ring is used by the distributor and as such, no ring status is returned from this function.
type FieldDetector ¶ added in v3.4.0
type FieldDetector struct {
// contains filtered or unexported fields
}
type KafkaProducer ¶ added in v3.3.0
type KafkaProducer interface {
ProduceSync(ctx context.Context, records []*kgo.Record) kgo.ProduceResults
Close()
}
type KeyedStream ¶
type KeyedStream struct {
HashKey uint32
Stream logproto.Stream
}
type Limits ¶
type Limits interface {
retention.Limits
MaxLineSize(userID string) int
MaxLineSizeTruncate(userID string) bool
MaxLabelNamesPerSeries(userID string) int
MaxLabelNameLength(userID string) int
MaxLabelValueLength(userID string) int
CreationGracePeriod(userID string) time.Duration
RejectOldSamples(userID string) bool
RejectOldSamplesMaxAge(userID string) time.Duration
IncrementDuplicateTimestamps(userID string) bool
DiscoverServiceName(userID string) []string
DiscoverGenericFields(userID string) map[string][]string
DiscoverLogLevels(userID string) bool
LogLevelFields(userID string) []string
ShardStreams(userID string) shardstreams.Config
IngestionRateStrategy() string
IngestionRateBytes(userID string) float64
IngestionBurstSizeBytes(userID string) int
AllowStructuredMetadata(userID string) bool
MaxStructuredMetadataSize(userID string) int
MaxStructuredMetadataCount(userID string) int
OTLPConfig(userID string) push.OTLPConfig
BlockIngestionUntil(userID string) time.Time
BlockIngestionStatusCode(userID string) int
EnforcedLabels(userID string) []string
IngestionPartitionsTenantShardSize(userID string) int
}
Limits is an interface for distributor limits/related configs
type RateStore ¶
type RateStore interface {
RateFor(tenantID string, streamHash uint64) (int64, float64)
}
RateStore manages the ingestion rate of streams, populated by data fetched from ingesters.
type RateStoreConfig ¶
type RateStoreConfig struct {
MaxParallelism int `yaml:"max_request_parallelism"`
StreamRateUpdateInterval time.Duration `yaml:"stream_rate_update_interval"`
IngesterReqTimeout time.Duration `yaml:"ingester_request_timeout"`
Debug bool `yaml:"debug"`
}
func (*RateStoreConfig) RegisterFlagsWithPrefix ¶
func (cfg *RateStoreConfig) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet)
type ReadLifecycler ¶
type ReadLifecycler interface {
HealthyInstancesCount() int
}
ReadLifecycler represents the read interface to the lifecycler.
type RingConfig ¶
type RingConfig struct {
KVStore kv.Config `yaml:"kvstore"`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
// Instance details
InstanceID string `yaml:"instance_id" doc:"hidden"`
InstanceInterfaceNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]"`
InstancePort int `yaml:"instance_port" doc:"hidden"`
InstanceAddr string `yaml:"instance_addr" doc:"hidden"`
EnableIPv6 bool `yaml:"instance_enable_ipv6" doc:"hidden"`
// Injected internally
ListenPort int `yaml:"-"`
}
RingConfig masks the ring lifecycler config which contains many options not really required by the distributors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.
func (*RingConfig) RegisterFlags ¶
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*RingConfig) ToBasicLifecyclerConfig ¶
func (cfg *RingConfig) ToBasicLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)
ToBasicLifecyclerConfig returns a BasicLifecyclerConfig based on the distributor ring config.
func (*RingConfig) ToRingConfig ¶
func (cfg *RingConfig) ToRingConfig() ring.Config
type ShardTracker ¶
type ShardTracker struct {
// contains filtered or unexported fields
}
ShardTracker is a data structure to keep track of the last pushed shard number for a given stream hash. This allows the distributor to evenly shard streams across pushes even when any given push has fewer entries than the calculated number of shards
func NewShardTracker ¶
func NewShardTracker() *ShardTracker
func (*ShardTracker) LastShardNum ¶
func (t *ShardTracker) LastShardNum(tenant string, streamHash uint64) int
func (*ShardTracker) SetLastShardNum ¶
func (t *ShardTracker) SetLastShardNum(tenant string, streamHash uint64, shardNum int)
type Tee ¶
type Tee interface {
Duplicate(tenant string, streams []KeyedStream)
}
Tee implementations can duplicate the log streams to another endpoint.
type Validator ¶
type Validator struct {
Limits
// contains filtered or unexported fields
}
func NewValidator ¶
func NewValidator(l Limits, t push.UsageTracker) (*Validator, error)
func (Validator) ShouldBlockIngestion ¶ added in v3.2.0
func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time) (bool, time.Time, int)
ShouldBlockIngestion returns whether ingestion should be blocked, until when and the status code.
func (Validator) ValidateEntry ¶
func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, labels labels.Labels, entry logproto.Entry) error
ValidateEntry returns an error if the entry is invalid and report metrics for invalid entries accordingly.
func (Validator) ValidateLabels ¶
func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, stream logproto.Stream) error
Validate labels returns an error if the labels are invalid