jobqueue

package
v1.126.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2025 License: AGPL-3.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PeekNMultipleQueues added in v1.125.2

func PeekNMultipleQueues(limit int, queueMap map[storj.PlacementConstraint]*Queue) (jobs []jobq.RepairJob)

PeekNMultipleQueues returns the 'limit' segments with the lowest health from any of the given queues without removing them from the queues. If there are fewer than 'limit' segments in all of the queues, it returns all available. Checks only the repair queues, not the retry queues.

This function is useful for combining multiple queues into a single view of the lowest health segments across all of them. Older repair code expects a single queue containing all placements and all jobs whether eligible for retry or not, so this function allows similar usage. This is not very performant, but as far as I can tell we only need this in test situations.

func PopNMultipleQueues added in v1.125.2

func PopNMultipleQueues(limit int, queueMap map[storj.PlacementConstraint]*Queue) (jobs []jobq.RepairJob)

PopNMultipleQueues removes and returns the 'limit' segments with the lowest health from any of the given queues without removing them from the queues. If there are fewer than 'limit' segments in all of the queues, it returns all available. Checks only the repair queues, not the retry queues.

This function is useful for combining multiple queues into a single view of the lowest health segments across all of them. Older repair code expects a single queue containing all placements and all jobs whether eligible for retry or not, so this function allows similar usage. Hopefully soon we can teach the repair workers to ask for jobs from each placement separately.

Types

type Queue

type Queue struct {
	RetryAfter time.Duration
	Now        func() time.Time
	// contains filtered or unexported fields
}

Queue is a priority queue of repair jobs paired with a priority queue of jobs to be retried once they are eligible. A secondary index on streamID+position is kept to allow updates to the health (priority) of jobs already in one of the queues.

func NewQueue

func NewQueue(log *zap.Logger, retryAfter time.Duration, initialAlloc, memReleaseThreshold int) (*Queue, error)

NewQueue creates a new Queue.

func (*Queue) Clean

func (q *Queue) Clean(updatedBefore time.Time) (removed int)

Clean removes all items from the queues that were last updated before the given time. This is a relatively expensive operation at O(n). The queues for this placement are left locked for the duration of the operation; all reads and writes to this placement will block until this is complete.

This could conceivably take a context parameter and allow the cleanQueue part of the operation to be canceled, but since the heap.Init and reindex parts would still need to run to completion, that seems mostly unhelpful. Alternatively, we could call heap.Init and update the index after every item is removed. That would allow cancellation at any point, but would probably be slower (potentially many more updates to the index map). Still, that is an option if it turns out we need to be able to cancel Clean operations partway through.

Returns the total number of items removed from the queues.

func (*Queue) Delete added in v1.125.2

func (q *Queue) Delete(streamID uuid.UUID, position uint64) (wasDeleted bool)

Delete removes a segment from the queue by streamID and position, whether it is in the repair queue or the retry queue. Returns true if the segment was found and removed, and false if it was not found.

func (*Queue) Destroy

func (q *Queue) Destroy()

Destroy stops the queue's funnel goroutine (if it is still running) and frees the associated memory.

func (*Queue) Insert

func (q *Queue) Insert(job jobq.RepairJob) (wasNew bool)

Insert adds a job to the queue with the given health. If the segment is already in the repair queue or the retry queue, the job record is updated and left in the queue (with its position updated as necessary)

When a job is updated, its InsertedAt value is preserved, its UpdatedAt field is set to the current time, and the new NumAttempts field is added to the previously existing value.

If the job is not already in either queue and its LastAttemptedAt field is recent enough (as determined by RetryAfter), it is added to the retry queue instead of the repair queue, to wait until it is eligible for another try.

Returns true if the job was newly added to a queue, and false if an existing entry in the target queue was updated.

func (*Queue) Inspect

func (q *Queue) Inspect(streamID uuid.UUID, position uint64) (job jobq.RepairJob, ok bool)

Inspect finds a repair job in the queue by streamID and position and returns all of the job information.

func (*Queue) Len

func (q *Queue) Len() (inRepair, inRetry int64)

Len returns the number of segments in the repair queue and the retry queue, respectively.

func (*Queue) Peek

func (q *Queue) Peek() (job jobq.RepairJob, ok bool)

Peek returns the segment with the lowest health without removing it from the queue. If there are no segments in the queue, it returns a zero UUID and position.

func (*Queue) PeekRetry

func (q *Queue) PeekRetry() jobq.RepairJob

PeekRetry returns the segment with the smallest LastUpdatedAt value in the retry queue without removing it from the queue. If there are no segments in the queue, it returns a zero UUID and position.

func (*Queue) Pop

func (q *Queue) Pop() (job jobq.RepairJob, ok bool)

Pop removes and returns the segment with the lowest health from the repair queue. If there are no segments in the queue, it returns a zero job and ok=false.

func (*Queue) ResetTimer

func (q *Queue) ResetTimer() error

ResetTimer causes the funnel goroutine to wake up and adjust its wait timer (might be used after artificially changing the clock, for example).

func (*Queue) Start

func (q *Queue) Start() error

Start starts the queue's funnel goroutine, which moves items from the retry queue to the repair queue as they become eligible for retry (after RetryAfter). If the queue is already running, it returns an error.

func (*Queue) Stat added in v1.125.2

func (q *Queue) Stat(ctx context.Context) (repairStat, retryStat jobq.QueueStat, err error)

Stat performs some analysis of the items in the queue and returns some related statistics. This is a relatively expensive operation at O(n). The queues for this placement are left locked for the duration of the operation; all reads and writes to this queue will block until this is complete.

func (*Queue) Stop

func (q *Queue) Stop()

Stop stops the queue's funnel goroutine.

func (*Queue) TestingSetAttemptedTime added in v1.125.2

func (q *Queue) TestingSetAttemptedTime(streamID uuid.UUID, position uint64, lastAttemptedAt time.Time) (rowsAffected int)

TestingSetAttemptedTime sets the LastAttemptedAt field for a segment in the queue by streamID and position. It returns the number of jobs affected (this will be 0 or 1).

func (*Queue) TestingSetUpdatedTime added in v1.126.2

func (q *Queue) TestingSetUpdatedTime(streamID uuid.UUID, position uint64, updatedAt time.Time) (rowsAffected int)

TestingSetUpdatedTime sets the UpdatedAt field for a segment in the queue by streamID and position. It returns the number of jobs affected (this will be 0 or 1).

func (*Queue) Trim

func (q *Queue) Trim(healthGreaterThan float64) (removed int)

Trim removes all items from the queues with health greater than the given value. This is a relatively expensive operation at O(n). The queues for this placement are left locked for the duration of the operation; all reads and writes to this placement will block until this is complete.

This could conceivably take a context parameter and allow the trimQueue part of the operation to be canceled, but since the heap.Init and reindex parts would still need to run to completion, that seems mostly unhelpful. Alternatively, we could call heap.Init and update the index after every item is removed. That would allow cancellation at any point, but would probably be slower (potentially many more updates to the index map). Still, that is an option if it turns out we need to be able to cancel Trim operations partway through.

Returns the total number of items removed from the queues.

func (*Queue) Truncate

func (q *Queue) Truncate()

Truncate removes all items currently in the queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳