Documentation
¶
Index ¶
- Variables
- type Mapable
- type Mapping
- func (m *Mapping[v]) Get(idx QueueIndex) v
- func (m *Mapping[v]) GetByKey(key string) v
- func (m *Mapping[v]) GetNext(idx QueueIndex) (v, error)
- func (m *Mapping[v]) Init(size int)
- func (m *Mapping[v]) Keys() []string
- func (m *Mapping[v]) Len() int
- func (m *Mapping[v]) Put(key string, value v) bool
- func (m *Mapping[v]) Remove(key string) bool
- func (m *Mapping[v]) Values() []v
- type Metrics
- type Queue
- type QueueIndex
- type QueuePath
- type Request
- type RequestChannel
- type RequestQueue
- func (q *RequestQueue) Dequeue(ctx context.Context, last QueueIndex, querierID string) (Request, QueueIndex, error)
- func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, maxQueriers int, successFn func()) error
- func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64
- func (q *RequestQueue) NotifyQuerierShutdown(querierID string)
- func (q *RequestQueue) RegisterQuerierConnection(querier string)
- func (q *RequestQueue) UnregisterQuerierConnection(querier string)
- type TreeQueue
Constants ¶
This section is empty.
Variables ¶
var ( ErrTooManyRequests = errors.New("too many outstanding requests") ErrStopped = errors.New("queue is stopped") )
var ErrOutOfBounds = errors.New("queue index out of bounds")
Functions ¶
This section is empty.
Types ¶
type Mapable ¶
type Mapable interface { *tenantQueue | *TreeQueue // https://github.com/golang/go/issues/48522#issuecomment-924348755 Pos() QueueIndex SetPos(index QueueIndex) }
type Mapping ¶
type Mapping[v Mapable] struct { // contains filtered or unexported fields }
Mapping is a map-like data structure that allows accessing its items not only by key but also by index. When an item is removed, the internal key array is not resized, but the removed place is marked as empty. This allows to remove keys without changing the index of the remaining items after the removed key. Mapping uses *tenantQueue as concrete value and keys of type string. The data structure is not thread-safe.
func (*Mapping[v]) Get ¶
func (m *Mapping[v]) Get(idx QueueIndex) v
func (*Mapping[v]) GetNext ¶
func (m *Mapping[v]) GetNext(idx QueueIndex) (v, error)
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
func NewMetrics ¶
func NewMetrics(subsystem string, registerer prometheus.Registerer) *Metrics
type Queue ¶
type Queue interface { Chan() RequestChannel Dequeue() Request Name() string Len() int }
type QueueIndex ¶
type QueueIndex int // nolint:revive
QueueIndex is opaque type that allows to resume iteration over tenants between successive calls of RequestQueue.GetNextRequestForQuerier method.
var StartIndex QueueIndex = -1
StartIndex is the index of the queue that starts iteration over sub queues.
var StartIndexWithLocalQueue QueueIndex = -2
StartIndexWithLocalQueue is the index of the queue that starts iteration over local and sub queues.
func (QueueIndex) ReuseLastIndex ¶
func (ui QueueIndex) ReuseLastIndex() QueueIndex
Modify index to start iteration on the same tenant, for which last queue was returned.
type RequestChannel ¶
type RequestChannel chan Request
RequestChannel is a channel that queues Requests
type RequestQueue ¶
RequestQueue holds incoming requests in per-tenant queues. It also assigns each tenant specified number of queriers, and when querier asks for next request to handle (using GetNextRequestForQuerier), it returns requests in a fair fashion.
func NewRequestQueue ¶
func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, metrics *Metrics) *RequestQueue
func (*RequestQueue) Dequeue ¶
func (q *RequestQueue) Dequeue(ctx context.Context, last QueueIndex, querierID string) (Request, QueueIndex, error)
Dequeue find next tenant queue and takes the next request off of it. Will block if there are no requests. By passing tenant index from previous call of this method, querier guarantees that it iterates over all tenants fairly. If querier finds that request from the tenant is already expired, it can get a request for the same tenant by using UserIndex.ReuseLastUser.
func (*RequestQueue) Enqueue ¶
func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, maxQueriers int, successFn func()) error
Enqueue puts the request into the queue. MaxQueries is tenant-specific value that specifies how many queriers can this tenant use (zero or negative = all queriers). It is passed to each Enqueue, because it can change between calls.
If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.
func (*RequestQueue) GetConnectedQuerierWorkersMetric ¶
func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64
func (*RequestQueue) NotifyQuerierShutdown ¶
func (q *RequestQueue) NotifyQuerierShutdown(querierID string)
func (*RequestQueue) RegisterQuerierConnection ¶
func (q *RequestQueue) RegisterQuerierConnection(querier string)
func (*RequestQueue) UnregisterQuerierConnection ¶
func (q *RequestQueue) UnregisterQuerierConnection(querier string)
type TreeQueue ¶
type TreeQueue struct {
// contains filtered or unexported fields
}
TreeQueue is an hierarchical queue implementation where each sub-queue has the same guarantees to be chosen from. Each queue has also a local queue, which gets chosen with equal preference as the sub-queues.