Documentation
¶
Index ¶
- Variables
- func SegmentRange(dir string) (int, int, error)
- type ChunkStore
- type Config
- type Ingester
- func (i *Ingester) AllUserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UsersStatsResponse, error)
- func (i *Ingester) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error)
- func (i *Ingester) CheckReady(ctx context.Context) error
- func (i *Ingester) Flush()
- func (i *Ingester) FlushHandler(w http.ResponseWriter, r *http.Request)
- func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error)
- func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error)
- func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error)
- func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error)
- func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error)
- func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error
- func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request)
- func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) error
- func (i *Ingester) TransferOut(ctx context.Context) error
- func (i *Ingester) TransferTSDB(stream client.Ingester_TransferTSDBServer) error
- func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error)
- func (i *Ingester) Watch(in *grpc_health_v1.HealthCheckRequest, ...) error
- type Labels
- func (*Labels) Descriptor() ([]byte, []int)
- func (this *Labels) Equal(that interface{}) bool
- func (m *Labels) GetFingerprint() uint64
- func (this *Labels) GoString() string
- func (m *Labels) Marshal() (dAtA []byte, err error)
- func (m *Labels) MarshalTo(dAtA []byte) (int, error)
- func (m *Labels) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*Labels) ProtoMessage()
- func (m *Labels) Reset()
- func (m *Labels) Size() (n int)
- func (this *Labels) String() string
- func (m *Labels) Unmarshal(dAtA []byte) error
- func (m *Labels) XXX_DiscardUnknown()
- func (m *Labels) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Labels) XXX_Merge(src proto.Message)
- func (m *Labels) XXX_Size() int
- func (m *Labels) XXX_Unmarshal(b []byte) error
- type Record
- func (*Record) Descriptor() ([]byte, []int)
- func (this *Record) Equal(that interface{}) bool
- func (m *Record) GetLabels() []Labels
- func (m *Record) GetSamples() []Sample
- func (m *Record) GetUserId() string
- func (this *Record) GoString() string
- func (m *Record) Marshal() (dAtA []byte, err error)
- func (m *Record) MarshalTo(dAtA []byte) (int, error)
- func (m *Record) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*Record) ProtoMessage()
- func (m *Record) Reset()
- func (m *Record) Size() (n int)
- func (this *Record) String() string
- func (m *Record) Unmarshal(dAtA []byte) error
- func (m *Record) XXX_DiscardUnknown()
- func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Record) XXX_Merge(src proto.Message)
- func (m *Record) XXX_Size() int
- func (m *Record) XXX_Unmarshal(b []byte) error
- type RingCount
- type Sample
- func (*Sample) Descriptor() ([]byte, []int)
- func (this *Sample) Equal(that interface{}) bool
- func (m *Sample) GetFingerprint() uint64
- func (m *Sample) GetTimestamp() uint64
- func (m *Sample) GetValue() float64
- func (this *Sample) GoString() string
- func (m *Sample) Marshal() (dAtA []byte, err error)
- func (m *Sample) MarshalTo(dAtA []byte) (int, error)
- func (m *Sample) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*Sample) ProtoMessage()
- func (m *Sample) Reset()
- func (m *Sample) Size() (n int)
- func (this *Sample) String() string
- func (m *Sample) Unmarshal(dAtA []byte) error
- func (m *Sample) XXX_DiscardUnknown()
- func (m *Sample) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Sample) XXX_Merge(src proto.Message)
- func (m *Sample) XXX_Size() int
- func (m *Sample) XXX_Unmarshal(b []byte) error
- type Series
- func (*Series) Descriptor() ([]byte, []int)
- func (this *Series) Equal(that interface{}) bool
- func (m *Series) GetChunks() []client.Chunk
- func (m *Series) GetFingerprint() uint64
- func (m *Series) GetUserId() string
- func (this *Series) GoString() string
- func (m *Series) Marshal() (dAtA []byte, err error)
- func (m *Series) MarshalTo(dAtA []byte) (int, error)
- func (m *Series) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*Series) ProtoMessage()
- func (m *Series) Reset()
- func (m *Series) Size() (n int)
- func (this *Series) String() string
- func (m *Series) Unmarshal(dAtA []byte) error
- func (m *Series) XXX_DiscardUnknown()
- func (m *Series) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Series) XXX_Merge(src proto.Message)
- func (m *Series) XXX_Size() int
- func (m *Series) XXX_Unmarshal(b []byte) error
- type SeriesLimiter
- type Shipper
- type TSDBState
- type WAL
- type WALConfig
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidLengthWal = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowWal = fmt.Errorf("proto: integer overflow") )
Functions ¶
func SegmentRange ¶ added in v0.6.0
SegmentRange returns the first and last segment index of the WAL in the dir. If https://github.com/prometheus/prometheus/pull/6477 is merged, get rid of this method and use from Prometheus directly.
Types ¶
type ChunkStore ¶
type ChunkStore interface {
Put(ctx context.Context, chunks []cortex_chunk.Chunk) error
}
ChunkStore is the interface we need to store chunks
type Config ¶
type Config struct { WALConfig WALConfig `yaml:"walconfig,omitempty"` LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"` // Config for transferring chunks. Zero or negative = no retries. MaxTransferRetries int `yaml:"max_transfer_retries,omitempty"` // Config for chunk flushing. FlushCheckPeriod time.Duration RetainPeriod time.Duration MaxChunkIdle time.Duration MaxStaleChunkIdle time.Duration FlushOpTimeout time.Duration MaxChunkAge time.Duration ChunkAgeJitter time.Duration ConcurrentFlushes int SpreadFlushes bool RateUpdatePeriod time.Duration // Use tsdb block storage TSDBEnabled bool `yaml:"-"` TSDBConfig tsdb.Config `yaml:"-"` // Injected at runtime and read from the distributor config, required // to accurately apply global limits. ShardByAllLabels bool `yaml:"-"` // contains filtered or unexported fields }
Config for an Ingester.
func (*Config) RegisterFlags ¶
RegisterFlags adds the flags required to config this to the given FlagSet
type Ingester ¶
type Ingester struct { services.Service // Prometheus block storage TSDBState TSDBState // contains filtered or unexported fields }
Ingester deals with "in flight" chunks. Based on Prometheus 1.x MemorySeriesStorage.
func New ¶
func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, chunkStore ChunkStore, registerer prometheus.Registerer) (*Ingester, error)
New constructs a new Ingester.
func NewV2 ¶ added in v0.4.0
func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error)
NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage
func (*Ingester) AllUserStats ¶
func (i *Ingester) AllUserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UsersStatsResponse, error)
AllUserStats returns ingestion statistics for all users known to this ingester.
func (*Ingester) Check ¶
func (i *Ingester) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error)
Check implements the grpc healthcheck
func (*Ingester) CheckReady ¶ added in v0.7.0
ReadinessHandler is used to indicate to k8s when the ingesters are ready for the addition removal of another ingester. Returns 204 when the ingester is ready, 500 otherwise.
func (*Ingester) Flush ¶
func (i *Ingester) Flush()
Flush triggers a flush of all the chunks and closes the flush queues. Called from the Lifecycler as part of the ingester shutdown.
func (*Ingester) FlushHandler ¶
func (i *Ingester) FlushHandler(w http.ResponseWriter, r *http.Request)
FlushHandler triggers a flush of all in memory chunks. Mainly used for local testing.
func (*Ingester) LabelNames ¶
func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error)
LabelNames return all the label names.
func (*Ingester) LabelValues ¶
func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error)
LabelValues returns all label values that are associated with a given label name.
func (*Ingester) MetricsForLabelMatchers ¶
func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error)
MetricsForLabelMatchers returns all the metrics which match a set of matchers.
func (*Ingester) Push ¶
func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error)
Push implements client.IngesterServer
func (*Ingester) Query ¶
func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error)
Query implements service.IngesterServer
func (*Ingester) QueryStream ¶
func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error
QueryStream implements service.IngesterServer
func (*Ingester) ShutdownHandler ¶ added in v0.4.0
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request)
ShutdownHandler triggers the following set of operations in order:
- Change the state of ring to stop accepting writes.
- Flush all the chunks.
func (*Ingester) TransferChunks ¶
func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) error
TransferChunks receives all the chunks from another ingester.
func (*Ingester) TransferOut ¶
TransferOut finds an ingester in PENDING state and transfers our chunks to it. Called as part of the ingester shutdown process.
func (*Ingester) TransferTSDB ¶ added in v0.4.0
func (i *Ingester) TransferTSDB(stream client.Ingester_TransferTSDBServer) error
TransferTSDB receives all the file chunks from another ingester, and writes them to tsdb directories
func (*Ingester) UserStats ¶
func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error)
UserStats returns ingestion statistics for the current user.
func (*Ingester) Watch ¶
func (i *Ingester) Watch(in *grpc_health_v1.HealthCheckRequest, stream grpc_health_v1.Health_WatchServer) error
Watch implements the grpc healthcheck.
type Labels ¶ added in v0.6.0
type Labels struct { Fingerprint uint64 `protobuf:"varint,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` Labels []github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter `` /* 131-byte string literal not displayed */ }
func (*Labels) Descriptor ¶ added in v0.6.0
func (*Labels) GetFingerprint ¶ added in v0.6.0
func (*Labels) MarshalToSizedBuffer ¶ added in v0.7.0
func (*Labels) ProtoMessage ¶ added in v0.6.0
func (*Labels) ProtoMessage()
func (*Labels) XXX_DiscardUnknown ¶ added in v0.6.0
func (m *Labels) XXX_DiscardUnknown()
func (*Labels) XXX_Marshal ¶ added in v0.6.0
func (*Labels) XXX_Unmarshal ¶ added in v0.6.0
type Record ¶ added in v0.6.0
type Record struct { UserId string `protobuf:"bytes,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` Labels []Labels `protobuf:"bytes,2,rep,name=labels,proto3" json:"labels"` Samples []Sample `protobuf:"bytes,3,rep,name=samples,proto3" json:"samples"` }
func (*Record) Descriptor ¶ added in v0.6.0
func (*Record) GetSamples ¶ added in v0.6.0
func (*Record) MarshalToSizedBuffer ¶ added in v0.7.0
func (*Record) ProtoMessage ¶ added in v0.6.0
func (*Record) ProtoMessage()
func (*Record) XXX_DiscardUnknown ¶ added in v0.6.0
func (m *Record) XXX_DiscardUnknown()
func (*Record) XXX_Marshal ¶ added in v0.6.0
func (*Record) XXX_Unmarshal ¶ added in v0.6.0
type RingCount ¶ added in v0.4.0
type RingCount interface {
HealthyInstancesCount() int
}
RingCount is the interface exposed by a ring implementation which allows to count members
type Sample ¶ added in v0.6.0
type Sample struct { Fingerprint uint64 `protobuf:"varint,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` Timestamp uint64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` Value float64 `protobuf:"fixed64,3,opt,name=value,proto3" json:"value,omitempty"` }
func (*Sample) Descriptor ¶ added in v0.6.0
func (*Sample) GetFingerprint ¶ added in v0.6.0
func (*Sample) GetTimestamp ¶ added in v0.6.0
func (*Sample) MarshalToSizedBuffer ¶ added in v0.7.0
func (*Sample) ProtoMessage ¶ added in v0.6.0
func (*Sample) ProtoMessage()
func (*Sample) XXX_DiscardUnknown ¶ added in v0.6.0
func (m *Sample) XXX_DiscardUnknown()
func (*Sample) XXX_Marshal ¶ added in v0.6.0
func (*Sample) XXX_Unmarshal ¶ added in v0.6.0
type Series ¶ added in v0.6.0
type Series struct { UserId string `protobuf:"bytes,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` Fingerprint uint64 `protobuf:"varint,2,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` Labels []github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter `` /* 131-byte string literal not displayed */ Chunks []client.Chunk `protobuf:"bytes,4,rep,name=chunks,proto3" json:"chunks"` }
func (*Series) Descriptor ¶ added in v0.6.0
func (*Series) GetFingerprint ¶ added in v0.6.0
func (*Series) MarshalToSizedBuffer ¶ added in v0.7.0
func (*Series) ProtoMessage ¶ added in v0.6.0
func (*Series) ProtoMessage()
func (*Series) XXX_DiscardUnknown ¶ added in v0.6.0
func (m *Series) XXX_DiscardUnknown()
func (*Series) XXX_Marshal ¶ added in v0.6.0
func (*Series) XXX_Unmarshal ¶ added in v0.6.0
type SeriesLimiter ¶ added in v0.4.0
type SeriesLimiter struct {
// contains filtered or unexported fields
}
SeriesLimiter implements primitives to get the maximum number of series an ingester can handle for a specific tenant
func NewSeriesLimiter ¶ added in v0.4.0
func NewSeriesLimiter(limits *validation.Overrides, ring RingCount, replicationFactor int, shardByAllLabels bool) *SeriesLimiter
NewSeriesLimiter makes a new in-memory series limiter
func (*SeriesLimiter) AssertMaxSeriesPerMetric ¶ added in v0.4.0
func (l *SeriesLimiter) AssertMaxSeriesPerMetric(userID string, series int) error
AssertMaxSeriesPerMetric limit has not been reached compared to the current number of series in input and returns an error if so.
func (*SeriesLimiter) AssertMaxSeriesPerUser ¶ added in v0.4.0
func (l *SeriesLimiter) AssertMaxSeriesPerUser(userID string, series int) error
AssertMaxSeriesPerUser limit has not been reached compared to the current number of series in input and returns an error if so.
func (*SeriesLimiter) MaxSeriesPerQuery ¶ added in v0.4.0
func (l *SeriesLimiter) MaxSeriesPerQuery(userID string) int
MaxSeriesPerQuery returns the maximum number of series a query is allowed to hit.
type TSDBState ¶ added in v0.4.0
type TSDBState struct {
// contains filtered or unexported fields
}
TSDBState holds data structures used by the TSDB storage engine
type WAL ¶ added in v0.6.0
type WAL interface { // Log marshalls the records and writes it into the WAL. Log(*Record) error // Stop stops all the WAL operations. Stop() }
WAL interface allows us to have a no-op WAL when the WAL is disabled.
type WALConfig ¶ added in v0.6.0
type WALConfig struct { WALEnabled bool `yaml:"wal_enabled,omitempty"` CheckpointEnabled bool `yaml:"checkpoint_enabled,omitempty"` Recover bool `yaml:"recover_from_wal,omitempty"` Dir string `yaml:"wal_dir,omitempty"` CheckpointDuration time.Duration `yaml:"checkpoint_duration,omitempty"` // contains filtered or unexported fields }
WALConfig is config for the Write Ahead Log.
func (*WALConfig) RegisterFlags ¶ added in v0.6.0
RegisterFlags adds the flags required to config this to the given FlagSet