Documentation
¶
Index ¶
- func PeekNMultipleQueues(limit int, queueMap map[storj.PlacementConstraint]*Queue) (jobs []jobq.RepairJob)
- func PopNMultipleQueues(limit int, queueMap map[storj.PlacementConstraint]*Queue) (jobs []jobq.RepairJob)
- type Queue
- func (q *Queue) Clean(updatedBefore time.Time) (removed int)
- func (q *Queue) Delete(streamID uuid.UUID, position uint64) (wasDeleted bool)
- func (q *Queue) Destroy()
- func (q *Queue) Insert(job jobq.RepairJob) (wasNew bool)
- func (q *Queue) Inspect(streamID uuid.UUID, position uint64) (job jobq.RepairJob, ok bool)
- func (q *Queue) Len() (inRepair, inRetry int64)
- func (q *Queue) Peek() (job jobq.RepairJob, ok bool)
- func (q *Queue) PeekRetry() jobq.RepairJob
- func (q *Queue) Pop() (job jobq.RepairJob, ok bool)
- func (q *Queue) ResetTimer() error
- func (q *Queue) Start() error
- func (q *Queue) Stat(ctx context.Context) (repairStat, retryStat jobq.QueueStat, err error)
- func (q *Queue) Stop()
- func (q *Queue) TestingSetAttemptedTime(streamID uuid.UUID, position uint64, lastAttemptedAt time.Time) (rowsAffected int)
- func (q *Queue) TestingSetUpdatedTime(streamID uuid.UUID, position uint64, updatedAt time.Time) (rowsAffected int)
- func (q *Queue) Trim(healthGreaterThan float64) (removed int)
- func (q *Queue) Truncate()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func PeekNMultipleQueues ¶ added in v1.125.2
func PeekNMultipleQueues(limit int, queueMap map[storj.PlacementConstraint]*Queue) (jobs []jobq.RepairJob)
PeekNMultipleQueues returns the 'limit' segments with the lowest health from any of the given queues without removing them from the queues. If there are fewer than 'limit' segments in all of the queues, it returns all available. Checks only the repair queues, not the retry queues.
This function is useful for combining multiple queues into a single view of the lowest health segments across all of them. Older repair code expects a single queue containing all placements and all jobs whether eligible for retry or not, so this function allows similar usage. This is not very performant, but as far as I can tell we only need this in test situations.
func PopNMultipleQueues ¶ added in v1.125.2
func PopNMultipleQueues(limit int, queueMap map[storj.PlacementConstraint]*Queue) (jobs []jobq.RepairJob)
PopNMultipleQueues removes and returns the 'limit' segments with the lowest health from any of the given queues without removing them from the queues. If there are fewer than 'limit' segments in all of the queues, it returns all available. Checks only the repair queues, not the retry queues.
This function is useful for combining multiple queues into a single view of the lowest health segments across all of them. Older repair code expects a single queue containing all placements and all jobs whether eligible for retry or not, so this function allows similar usage. Hopefully soon we can teach the repair workers to ask for jobs from each placement separately.
Types ¶
type Queue ¶
type Queue struct { RetryAfter time.Duration Now func() time.Time // contains filtered or unexported fields }
Queue is a priority queue of repair jobs paired with a priority queue of jobs to be retried once they are eligible. A secondary index on streamID+position is kept to allow updates to the health (priority) of jobs already in one of the queues.
func NewQueue ¶
func NewQueue(log *zap.Logger, retryAfter time.Duration, initialAlloc, memReleaseThreshold int) (*Queue, error)
NewQueue creates a new Queue.
func (*Queue) Clean ¶
Clean removes all items from the queues that were last updated before the given time. This is a relatively expensive operation at O(n). The queues for this placement are left locked for the duration of the operation; all reads and writes to this placement will block until this is complete.
This could conceivably take a context parameter and allow the cleanQueue part of the operation to be canceled, but since the heap.Init and reindex parts would still need to run to completion, that seems mostly unhelpful. Alternatively, we could call heap.Init and update the index after every item is removed. That would allow cancellation at any point, but would probably be slower (potentially many more updates to the index map). Still, that is an option if it turns out we need to be able to cancel Clean operations partway through.
Returns the total number of items removed from the queues.
func (*Queue) Delete ¶ added in v1.125.2
Delete removes a segment from the queue by streamID and position, whether it is in the repair queue or the retry queue. Returns true if the segment was found and removed, and false if it was not found.
func (*Queue) Destroy ¶
func (q *Queue) Destroy()
Destroy stops the queue's funnel goroutine (if it is still running) and frees the associated memory.
func (*Queue) Insert ¶
Insert adds a job to the queue with the given health. If the segment is already in the repair queue or the retry queue, the job record is updated and left in the queue (with its position updated as necessary)
When a job is updated, its InsertedAt value is preserved, its UpdatedAt field is set to the current time, and the new NumAttempts field is added to the previously existing value.
If the job is not already in either queue and its LastAttemptedAt field is recent enough (as determined by RetryAfter), it is added to the retry queue instead of the repair queue, to wait until it is eligible for another try.
Returns true if the job was newly added to a queue, and false if an existing entry in the target queue was updated.
func (*Queue) Inspect ¶
Inspect finds a repair job in the queue by streamID and position and returns all of the job information.
func (*Queue) Len ¶
Len returns the number of segments in the repair queue and the retry queue, respectively.
func (*Queue) Peek ¶
Peek returns the segment with the lowest health without removing it from the queue. If there are no segments in the queue, it returns a zero UUID and position.
func (*Queue) PeekRetry ¶
PeekRetry returns the segment with the smallest LastUpdatedAt value in the retry queue without removing it from the queue. If there are no segments in the queue, it returns a zero UUID and position.
func (*Queue) Pop ¶
Pop removes and returns the segment with the lowest health from the repair queue. If there are no segments in the queue, it returns a zero job and ok=false.
func (*Queue) ResetTimer ¶
ResetTimer causes the funnel goroutine to wake up and adjust its wait timer (might be used after artificially changing the clock, for example).
func (*Queue) Start ¶
Start starts the queue's funnel goroutine, which moves items from the retry queue to the repair queue as they become eligible for retry (after RetryAfter). If the queue is already running, it returns an error.
func (*Queue) Stat ¶ added in v1.125.2
Stat performs some analysis of the items in the queue and returns some related statistics. This is a relatively expensive operation at O(n). The queues for this placement are left locked for the duration of the operation; all reads and writes to this queue will block until this is complete.
func (*Queue) TestingSetAttemptedTime ¶ added in v1.125.2
func (q *Queue) TestingSetAttemptedTime(streamID uuid.UUID, position uint64, lastAttemptedAt time.Time) (rowsAffected int)
TestingSetAttemptedTime sets the LastAttemptedAt field for a segment in the queue by streamID and position. It returns the number of jobs affected (this will be 0 or 1).
func (*Queue) TestingSetUpdatedTime ¶ added in v1.126.2
func (q *Queue) TestingSetUpdatedTime(streamID uuid.UUID, position uint64, updatedAt time.Time) (rowsAffected int)
TestingSetUpdatedTime sets the UpdatedAt field for a segment in the queue by streamID and position. It returns the number of jobs affected (this will be 0 or 1).
func (*Queue) Trim ¶
Trim removes all items from the queues with health greater than the given value. This is a relatively expensive operation at O(n). The queues for this placement are left locked for the duration of the operation; all reads and writes to this placement will block until this is complete.
This could conceivably take a context parameter and allow the trimQueue part of the operation to be canceled, but since the heap.Init and reindex parts would still need to run to completion, that seems mostly unhelpful. Alternatively, we could call heap.Init and update the index after every item is removed. That would allow cancellation at any point, but would probably be slower (potentially many more updates to the index map). Still, that is an option if it turns out we need to be able to cancel Trim operations partway through.
Returns the total number of items removed from the queues.