Documentation
¶
Overview ¶
The bloom gateway is a component that can be run as a standalone microserivce target and provides capabilities for filtering ChunkRefs based on a given list of line filter expressions.
Index ¶
- Constants
- type AddressProvider
- type BlockResolver
- type BloomQuerier
- type Client
- type ClientConfig
- type ClientFactory
- type Config
- type GRPCPool
- type Gateway
- type GatewayClient
- type JumpHashClientPool
- type Limits
- type PoolConfig
- type QuerierConfig
- type Stats
- func (s *Stats) AddBlocksFetchTime(t time.Duration)
- func (s *Stats) AddPostProcessingTime(t time.Duration)
- func (s *Stats) AddProcessedBlocksTotal(delta int)
- func (s *Stats) AddProcessingTime(t time.Duration)
- func (s *Stats) AddQueueTime(t time.Duration)
- func (s *Stats) AddTotalProcessingTime(t time.Duration)
- func (s *Stats) Duration() (dur time.Duration)
- func (s *Stats) IncProcessedBlocks()
- func (s *Stats) IncSkippedBlocks()
- func (s *Stats) KVArgs() []any
- type Task
Constants ¶
const (
Day = 24 * time.Hour
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AddressProvider ¶
type AddressProvider interface {
Addresses() []string
}
type BlockResolver ¶ added in v3.1.0
type BlockResolver interface {
Resolve(ctx context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) (blocks []blockWithSeries, skipped []*logproto.GroupedChunkRefs, err error)
}
func NewBlockResolver ¶ added in v3.1.0
func NewBlockResolver(store bloomshipper.StoreBase, logger log.Logger) BlockResolver
type BloomQuerier ¶
type BloomQuerier struct {
// contains filtered or unexported fields
}
BloomQuerier is a store-level abstraction on top of Client It is used by the index gateway to filter ChunkRefs based on given line fiter expression.
func NewQuerier ¶
func NewQuerier(c Client, cfg QuerierConfig, limits Limits, resolver BlockResolver, r prometheus.Registerer, logger log.Logger) *BloomQuerier
func (*BloomQuerier) FilterChunkRefs ¶
func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, series map[uint64]labels.Labels, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, bool, error)
type Client ¶
type Client interface {
FilterChunks(ctx context.Context, tenant string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error)
PrefetchBloomBlocks(ctx context.Context, blocks []bloomshipper.BlockRef) error
}
type ClientConfig ¶
type ClientConfig struct {
// PoolConfig defines the behavior of the gRPC connection pool used to communicate
// with the Bloom Gateway.
PoolConfig PoolConfig `yaml:"pool_config,omitempty" doc:"description=Configures the behavior of the connection pool."`
// GRPCClientConfig configures the gRPC connection between the Bloom Gateway client and the server.
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
// Client sharding using DNS disvovery and jumphash
Addresses string `yaml:"addresses,omitempty"`
}
IndexGatewayClientConfig configures the Index Gateway client used to communicate with the Index Gateway server.
func (*ClientConfig) RegisterFlags ¶
func (i *ClientConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags registers flags for the Bloom Gateway client configuration.
func (*ClientConfig) RegisterFlagsWithPrefix ¶
func (i *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
RegisterFlagsWithPrefix registers flags for the Bloom Gateway client configuration with a common prefix.
type ClientFactory ¶ added in v3.3.0
type ClientFactory func(addr string) (client.PoolClient, error)
type Config ¶
type Config struct {
// Enabled is the global switch to configures whether Bloom Gateways should be used to filter chunks.
Enabled bool `yaml:"enabled"`
// Client configures the Bloom Gateway client
Client ClientConfig `yaml:"client,omitempty" doc:""`
WorkerConcurrency int `yaml:"worker_concurrency"`
BlockQueryConcurrency int `yaml:"block_query_concurrency"`
MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
NumMultiplexItems int `yaml:"num_multiplex_tasks"`
FetchBlocksAsync bool `yaml:"fetch_blocks_async" doc:"hidden"`
}
Config configures the Bloom Gateway component.
func (*Config) RegisterFlags ¶
func (cfg *Config) RegisterFlags(f *flag.FlagSet)
RegisterFlags registers flags for the Bloom Gateway configuration.
func (*Config) RegisterFlagsWithPrefix ¶
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
RegisterFlagsWithPrefix registers flags for the Bloom Gateway configuration with a common prefix.
type GRPCPool ¶
type GRPCPool struct {
grpc_health_v1.HealthClient
logproto.BloomGatewayClient
io.Closer
}
GRPCPool represents a pool of gRPC connections to different bloom gateway instances. Interfaces are inlined for simplicity to automatically satisfy interface functions.
func NewBloomGatewayGRPCPool ¶
func NewBloomGatewayGRPCPool(address string, opts []grpc.DialOption) (*GRPCPool, error)
NewBloomGatewayGRPCPool instantiates a new pool of GRPC connections for the Bloom Gateway Internally, it also instantiates a protobuf bloom gateway client and a health client.
type Gateway ¶
type Gateway struct {
services.Service
// contains filtered or unexported fields
}
func New ¶
func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus.Registerer) (*Gateway, error)
New returns a new instance of the Bloom Gateway.
func (*Gateway) FilterChunkRefs ¶
func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error)
FilterChunkRefs implements BloomGatewayServer
func (*Gateway) PrefetchBloomBlocks ¶ added in v3.4.0
func (g *Gateway) PrefetchBloomBlocks(_ context.Context, req *logproto.PrefetchBloomBlocksRequest) (*logproto.PrefetchBloomBlocksResponse, error)
type GatewayClient ¶
type GatewayClient struct {
// contains filtered or unexported fields
}
func NewClient ¶
func NewClient(cfg ClientConfig, registerer prometheus.Registerer, logger log.Logger) (*GatewayClient, error)
func (*GatewayClient) FilterChunks ¶
func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error)
FilterChunks implements Client
func (*GatewayClient) PrefetchBloomBlocks ¶ added in v3.4.0
func (c *GatewayClient) PrefetchBloomBlocks(ctx context.Context, blocks []bloomshipper.BlockRef) error
type JumpHashClientPool ¶
type JumpHashClientPool struct {
services.Service
*jumphash.Selector
sync.RWMutex
// contains filtered or unexported fields
}
func NewJumpHashClientPool ¶
func NewJumpHashClientPool(clientFactory ClientFactory, dnsProvider AddressProvider, updateInterval time.Duration, logger log.Logger) (*JumpHashClientPool, error)
func (*JumpHashClientPool) Addr ¶ added in v3.1.0
func (p *JumpHashClientPool) Addr(key string) (string, error)
func (*JumpHashClientPool) GetClientFor ¶ added in v3.3.0
func (p *JumpHashClientPool) GetClientFor(addr string) (client.PoolClient, error)
GetClientFor implements clientPool.
type PoolConfig ¶
type PoolConfig struct {
CheckInterval time.Duration `yaml:"check_interval"`
}
PoolConfig is config for creating a Pool.
func (*PoolConfig) RegisterFlagsWithPrefix ¶
func (cfg *PoolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type QuerierConfig ¶ added in v3.1.0
type QuerierConfig struct {
BuildInterval time.Duration
BuildTableOffset int
}
type Stats ¶
type Stats struct {
Status string
NumTasks, NumMatchers int
ChunksRequested, ChunksFiltered int
SeriesRequested, SeriesFiltered int
QueueTime *atomic.Duration
BlocksFetchTime *atomic.Duration
ProcessingTime, TotalProcessingTime *atomic.Duration
PostProcessingTime *atomic.Duration
SkippedBlocks *atomic.Int32 // blocks skipped because they were not available (yet)
ProcessedBlocks *atomic.Int32 // blocks processed for this specific request
ProcessedBlocksTotal *atomic.Int32 // blocks processed for multiplexed request
}
func ContextWithEmptyStats ¶
func ContextWithEmptyStats(ctx context.Context) (*Stats, context.Context)
ContextWithEmptyStats returns a context with empty stats.
func FromContext ¶
func FromContext(ctx context.Context) *Stats
FromContext gets the Stats out of the Context. Returns nil if stats have not been initialised in the context.
func (*Stats) AddBlocksFetchTime ¶
func (s *Stats) AddBlocksFetchTime(t time.Duration)
func (*Stats) AddPostProcessingTime ¶
func (s *Stats) AddPostProcessingTime(t time.Duration)
func (*Stats) AddProcessedBlocksTotal ¶ added in v3.3.0
func (s *Stats) AddProcessedBlocksTotal(delta int)
func (*Stats) AddProcessingTime ¶
func (s *Stats) AddProcessingTime(t time.Duration)
func (*Stats) AddQueueTime ¶
func (s *Stats) AddQueueTime(t time.Duration)
func (*Stats) AddTotalProcessingTime ¶
func (s *Stats) AddTotalProcessingTime(t time.Duration)
func (*Stats) Duration ¶
func (s *Stats) Duration() (dur time.Duration)
aggregates the total duration
func (*Stats) IncProcessedBlocks ¶
func (s *Stats) IncProcessedBlocks()
func (*Stats) IncSkippedBlocks ¶ added in v3.4.0
func (s *Stats) IncSkippedBlocks()
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task is the data structure that is enqueued to the internal queue and dequeued by query workers
func (Task) Bounds ¶
func (t Task) Bounds() (model.Time, model.Time)
Bounds implements Bounded see pkg/storage/stores/shipper/indexshipper/tsdb.Bounded
func (Task) CloseWithError ¶
func (t Task) CloseWithError(err error)
func (Task) Copy ¶
func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task
Copy returns a copy of the existing task but with a new slice of grouped chunk refs
func (Task) RequestIter ¶
func (t Task) RequestIter() iter.Iterator[v1.Request]