Documentation
¶
Index ¶
- func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, l Limits, ...) *rateStore
- type Config
- type Distributor
- func (d *Distributor) HealthyInstancesCount() int
- func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error)
- func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request)
- func (d *Distributor) ServeHTTP(w http.ResponseWriter, r *http.Request)
- type Limits
- type RateStore
- type RateStoreConfig
- type ReadLifecycler
- type RingConfig
- type ShardTracker
- type Validator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewRateStore ¶
func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, l Limits, registerer prometheus.Registerer) *rateStore
Types ¶
type Config ¶
type Config struct {
// Distributors ring
DistributorRing RingConfig `yaml:"ring,omitempty"`
// RateStore customizes the rate storing used by stream sharding.
RateStore RateStoreConfig `yaml:"rate_store"`
// WriteFailuresLoggingCfg customizes write failures logging behavior.
WriteFailuresLogging writefailures.Cfg `yaml:"write_failures_logging" doc:"description=Experimental. Customize the logging of write failures."`
// contains filtered or unexported fields
}
Config for a Distributor.
func (*Config) RegisterFlags ¶
func (cfg *Config) RegisterFlags(fs *flag.FlagSet)
RegisterFlags registers distributor-related flags.
type Distributor ¶
type Distributor struct {
services.Service
// contains filtered or unexported fields
}
Distributor coordinates replicates and distribution of log streams.
func New ¶
func New(
cfg Config,
clientCfg client.Config,
configs *runtime.TenantConfigs,
ingestersRing ring.ReadRing,
overrides Limits,
registerer prometheus.Registerer,
) (*Distributor, error)
New a distributor creates.
func (*Distributor) HealthyInstancesCount ¶
func (d *Distributor) HealthyInstancesCount() int
HealthyInstancesCount implements the ReadLifecycler interface.
We use a ring lifecycler delegate to count the number of members of the ring. The count is then used to enforce rate limiting correctly for each distributor. $EFFECTIVE_RATE_LIMIT = $GLOBAL_RATE_LIMIT / $NUM_INSTANCES.
func (*Distributor) Push ¶
func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error)
Push a set of streams. The returned error is the last one seen.
func (*Distributor) PushHandler ¶
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request)
PushHandler reads a snappy-compressed proto from the HTTP body.
func (*Distributor) ServeHTTP ¶
func (d *Distributor) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements the distributor ring status page.
If the rate limiting strategy is local instead of global, no ring is used by the distributor and as such, no ring status is returned from this function.
type Limits ¶
type Limits interface {
retention.Limits
MaxLineSize(userID string) int
MaxLineSizeTruncate(userID string) bool
EnforceMetricName(userID string) bool
MaxLabelNamesPerSeries(userID string) int
MaxLabelNameLength(userID string) int
MaxLabelValueLength(userID string) int
CreationGracePeriod(userID string) time.Duration
RejectOldSamples(userID string) bool
RejectOldSamplesMaxAge(userID string) time.Duration
IncrementDuplicateTimestamps(userID string) bool
ShardStreams(userID string) *shardstreams.Config
IngestionRateStrategy() string
IngestionRateBytes(userID string) float64
IngestionBurstSizeBytes(userID string) int
}
Limits is an interface for distributor limits/related configs
type RateStore ¶
type RateStore interface {
RateFor(tenantID string, streamHash uint64) (int64, float64)
}
RateStore manages the ingestion rate of streams, populated by data fetched from ingesters.
type RateStoreConfig ¶
type RateStoreConfig struct {
MaxParallelism int `yaml:"max_request_parallelism"`
StreamRateUpdateInterval time.Duration `yaml:"stream_rate_update_interval"`
IngesterReqTimeout time.Duration `yaml:"ingester_request_timeout"`
Debug bool `yaml:"debug"`
}
func (*RateStoreConfig) RegisterFlagsWithPrefix ¶
func (cfg *RateStoreConfig) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet)
type ReadLifecycler ¶
type ReadLifecycler interface {
HealthyInstancesCount() int
}
ReadLifecycler represents the read interface to the lifecycler.
type RingConfig ¶
type RingConfig struct {
KVStore kv.Config `yaml:"kvstore"`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
// Instance details
InstanceID string `yaml:"instance_id" doc:"hidden"`
InstanceInterfaceNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]"`
InstancePort int `yaml:"instance_port" doc:"hidden"`
InstanceAddr string `yaml:"instance_addr" doc:"hidden"`
EnableIPv6 bool `yaml:"instance_enable_ipv6" doc:"hidden"`
// Injected internally
ListenPort int `yaml:"-"`
}
RingConfig masks the ring lifecycler config which contains many options not really required by the distributors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.
func (*RingConfig) RegisterFlags ¶
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*RingConfig) ToBasicLifecyclerConfig ¶
func (cfg *RingConfig) ToBasicLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)
ToBasicLifecyclerConfig returns a BasicLifecyclerConfig based on the distributor ring config.
func (*RingConfig) ToRingConfig ¶
func (cfg *RingConfig) ToRingConfig() ring.Config
type ShardTracker ¶
type ShardTracker struct {
// contains filtered or unexported fields
}
ShardTracker is a data structure to keep track of the last pushed shard number for a given stream hash. This allows the distributor to evenly shard streams across pushes even when any given push has fewer entries than the calculated number of shards
func NewShardTracker ¶
func NewShardTracker() *ShardTracker
func (*ShardTracker) LastShardNum ¶
func (t *ShardTracker) LastShardNum(tenant string, streamHash uint64) int
func (*ShardTracker) SetLastShardNum ¶
func (t *ShardTracker) SetLastShardNum(tenant string, streamHash uint64, shardNum int)
type Validator ¶
type Validator struct {
Limits
}
func NewValidator ¶
func NewValidator(l Limits) (*Validator, error)
func (Validator) ValidateEntry ¶
func (v Validator) ValidateEntry(ctx validationContext, labels string, entry logproto.Entry) error
ValidateEntry returns an error if the entry is invalid and report metrics for invalid entries accordingly.
func (Validator) ValidateLabels ¶
func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, stream logproto.Stream) error
Validate labels returns an error if the labels are invalid