jobq

package
v1.127.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2025 License: AGPL-3.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const ServerTimeNow uint64 = 123456789 // 1973-11-29T21:33:09Z

ServerTimeNow is a flag indicating that the server should fill in its idea of the current time for the indicated field. This must be a value that is preserved through a uint64->time.Time->uint64 conversion round trip and is unlikely to be used naturally.

Variables

View Source
var ErrJobNotFound = errors.New("job not found")

ErrJobNotFound is returned by the client when a particular job is not found.

View Source
var ErrQueueEmpty = errors.New("queue is empty")

ErrQueueEmpty is returned by the client when the queue is empty.

View Source
var RecordSize = unsafe.Sizeof(RepairJob{})

RecordSize is the size of a RepairJob record in bytes. It includes any padding that may be added by the compiler to align the record to a multiple of the word size for the target arch.

Functions

func ConvertJobToProtobuf

func ConvertJobToProtobuf(job RepairJob) *pb.RepairJob

ConvertJobToProtobuf converts a RepairJob record to a protobuf representation.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client wraps a DRPCJobQueueClient.

func WrapConn

func WrapConn(conn *rpc.Conn) *Client

WrapConn wraps an existing connection in a client.

func (*Client) Clean

func (c *Client) Clean(ctx context.Context, placement storj.PlacementConstraint, updatedBefore time.Time) (removedSegments int32, err error)

Clean removes all jobs with UpdatedAt time before the given cutoff.

func (*Client) CleanAll added in v1.125.2

func (c *Client) CleanAll(ctx context.Context, updatedBefore time.Time) (removedSegments int32, err error)

CleanAll removes all jobs with UpdatedAt time before the given cutoff from all placement queues.

func (*Client) Close

func (c *Client) Close() error

Close closes the underlying connection.

func (*Client) Delete added in v1.125.2

func (c *Client) Delete(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, position uint64) (wasDeleted bool, err error)

Delete removes a specific job from the indicated queue.

func (*Client) Inspect

func (c *Client) Inspect(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, position uint64) (job RepairJob, err error)

Inspect finds a job in the queue by streamID, position, and placement, and returns all of the job information. If the job is not found, it returns ErrJobNotFound.

func (*Client) Len

func (c *Client) Len(ctx context.Context, placement storj.PlacementConstraint) (repairLen, retryLen int64, err error)

Len returns the number of items in the indicated job queue.

func (*Client) LenAll added in v1.125.2

func (c *Client) LenAll(ctx context.Context) (repairLen, retryLen int64, err error)

LenAll sums up the number of items in all queues on the server.

func (*Client) Peek

func (c *Client) Peek(ctx context.Context, limit int, includedPlacements, excludedPlacements []storj.PlacementConstraint) (jobs []RepairJob, err error)

Peek returns the 'limit' lowest-health items from the indicated job queues without removing them. If there are less than 'limit' items in all the queues, it returns all of them.

func (*Client) Pop

func (c *Client) Pop(ctx context.Context, limit int, includedPlacements, excludedPlacements []storj.PlacementConstraint) (jobs []RepairJob, err error)

Pop removes and returns the 'limit' lowest-health items from the indicated job queues. If there are less than 'limit' items in the queue, it removes and returns all of them.

func (*Client) Push

func (c *Client) Push(ctx context.Context, job RepairJob) (wasNew bool, err error)

Push adds a new item to the job queue with the given health.

It returns an indication of whether the given segment was newly inserted or if it already existed in the target queue.

func (*Client) PushBatch

func (c *Client) PushBatch(ctx context.Context, jobs []RepairJob) (wasNew []bool, err error)

PushBatch adds multiple items to the appropriate job queues with the given health values.

It returns a slice of booleans indicating whether each segment was newly inserted or if it already existed in the target queue.

func (*Client) Stat added in v1.125.2

func (c *Client) Stat(ctx context.Context, placement storj.PlacementConstraint) (stat QueueStat, err error)

Stat collects statistics about the indicated job queue.

func (*Client) StatAll added in v1.125.2

func (c *Client) StatAll(ctx context.Context) (stats []QueueStat, err error)

StatAll collects statistics about all job queues on a server.

func (*Client) TestingSetAttemptedTime added in v1.125.2

func (c *Client) TestingSetAttemptedTime(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, position uint64, t time.Time) (rowsAffected int64, err error)

TestingSetAttemptedTime sets the LastAttemptedAt field of a specific job. This is only intended for testing scenarios.

func (*Client) TestingSetUpdatedTime added in v1.126.2

func (c *Client) TestingSetUpdatedTime(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, position uint64, t time.Time) (rowsAffected int64, err error)

TestingSetUpdatedTime sets the UpdatedAt field of a specific job. This is only intended for testing scenarios.

func (*Client) Trim

func (c *Client) Trim(ctx context.Context, placement storj.PlacementConstraint, healthGreaterThan float64) (removedSegments int32, err error)

Trim removes all jobs with Health greater than the given threshold.

func (*Client) TrimAll added in v1.125.2

func (c *Client) TrimAll(ctx context.Context, healthGreaterThan float64) (removedSegments int32, err error)

TrimAll removes all jobs with UpdatedAt time before the given cutoff from all placement queues.

func (*Client) Truncate

func (c *Client) Truncate(ctx context.Context, placement storj.PlacementConstraint) error

Truncate removes all items from a job queue.

func (*Client) TruncateAll added in v1.125.2

func (c *Client) TruncateAll(ctx context.Context) error

TruncateAll removes all items from all job queues on a server.

type Config added in v1.125.2

type Config struct {
	ServerNodeURL storj.NodeURL `help:"\"node URL\" of the job queue server" default:"" testDefault:""`
	TLS           tlsopts.Config
}

Config holds the Storj-style configuration for a job queue client.

type QueueStat added in v1.125.2

type QueueStat struct {
	Placement        storj.PlacementConstraint
	Count            int64
	MaxInsertedAt    time.Time
	MinInsertedAt    time.Time
	MaxAttemptedAt   *time.Time
	MinAttemptedAt   *time.Time
	MinSegmentHealth float64
	MaxSegmentHealth float64
}

QueueStat contains statistics about a queue or set of queues.

type RepairJob

type RepairJob struct {
	ID                SegmentIdentifier
	Health            float64
	InsertedAt        uint64
	LastAttemptedAt   uint64
	UpdatedAt         uint64
	NumAttempts       uint16
	Placement         uint16
	NumMissing        uint16
	NumOutOfPlacement uint16
}

RepairJob represents the in-memory structure of a job in the queue. This structure does not _need_ to be a multiple of 64 bits in size, but it's probably going to be aligned to a multiple of 64 bits in memory either way, so it's more efficient if we use that whole space.

func ConvertJobFromProtobuf

func ConvertJobFromProtobuf(protoJob *pb.RepairJob) (RepairJob, error)

ConvertJobFromProtobuf converts a protobuf representation of a RepairJob to a RepairJob record.

func (RepairJob) LastAttemptedAtTime

func (rj RepairJob) LastAttemptedAtTime() time.Time

LastAttemptedAtTime returns the LastAttemptedAt field as a time.Time.

type RepairJobQueue added in v1.125.2

type RepairJobQueue struct {
	// contains filtered or unexported fields
}

RepairJobQueue is a Storj-style repair queue, meant to be a near drop-in replacement for the PostgreSQL arrangement.

func OpenJobQueue added in v1.126.2

func OpenJobQueue(ctx context.Context, fi *identity.FullIdentity, config Config) (*RepairJobQueue, error)

OpenJobQueue opens a RepairJobQueue with the given configuration.

func WrapJobQueue added in v1.127.1

func WrapJobQueue(cli *Client) *RepairJobQueue

WrapJobQueue wraps a jobq Client to become a RepairJobQueue.

func (*RepairJobQueue) Clean added in v1.125.2

func (rjq *RepairJobQueue) Clean(ctx context.Context, updatedBefore time.Time) (int64, error)

Clean removes all segments from the repair queue that were last updated before the given time. It returns the number of segments removed.

func (*RepairJobQueue) Close added in v1.125.2

func (rjq *RepairJobQueue) Close() error

Close closes the connection to the job queue server.

func (*RepairJobQueue) Count added in v1.125.2

func (rjq *RepairJobQueue) Count(ctx context.Context) (int, error)

Count returns the number of segments in the repair queues, including all placement queues and including retry queues.

func (*RepairJobQueue) Delete added in v1.125.2

func (rjq *RepairJobQueue) Delete(ctx context.Context, job queue.InjuredSegment) error

Delete removes a specific segment from its repair queue.

func (*RepairJobQueue) Insert added in v1.125.2

func (rjq *RepairJobQueue) Insert(ctx context.Context, job *queue.InjuredSegment) (alreadyInserted bool, err error)

Insert adds a segment to the appropriate repair queue. If the segment is already in the queue, it is not added again. wasInserted is true if the segment was already added to the queue, false if it was inserted by this call.

func (*RepairJobQueue) InsertBatch added in v1.125.2

func (rjq *RepairJobQueue) InsertBatch(ctx context.Context, segments []*queue.InjuredSegment) (newlyInsertedSegments []*queue.InjuredSegment, err error)

InsertBatch adds multiple segments to the appropriate repair queues. If a segment is already in the queue, it is not added again. newlyInsertedSegments is the list of segments that were added to the queue.

func (*RepairJobQueue) Release added in v1.125.2

func (rjq *RepairJobQueue) Release(ctx context.Context, job queue.InjuredSegment, repaired bool) error

Release does what's necessary to mark a repair job as succeeded or failed. In the case of RepairJobQueue, Release puts a segment back into the queue if it has failed.

func (*RepairJobQueue) Select added in v1.125.2

func (rjq *RepairJobQueue) Select(ctx context.Context, limit int, includedPlacements []storj.PlacementConstraint, excludedPlacements []storj.PlacementConstraint) ([]queue.InjuredSegment, error)

Select removes and returns up to limit segments from the repair queue that match the given placement constraints. If includedPlacements is non-empty, only segments with placements in includedPlacements are returned. If excludedPlacements is non-empty, only segments with placements not in excludedPlacements are returned.

func (*RepairJobQueue) SelectN added in v1.125.2

func (rjq *RepairJobQueue) SelectN(ctx context.Context, limit int) ([]queue.InjuredSegment, error)

SelectN returns up to limit segments from the repair queues- whichever segments from any queue have the lowest health, only including segments that are currently eligible for repair.

Note that this is very different behavior from Select(); these segments are not removed from their queues. The similarity in naming is regrettable.

func (*RepairJobQueue) Stat added in v1.125.2

func (rjq *RepairJobQueue) Stat(ctx context.Context) ([]queue.Stat, error)

Stat returns statistics about the repair queues. Note: this is expensive! It requires a full scan of all queues.

func (*RepairJobQueue) TestingSetAttemptedTime added in v1.125.2

func (rjq *RepairJobQueue) TestingSetAttemptedTime(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, position metabase.SegmentPosition, t time.Time) (rowsAffected int64, err error)

TestingSetAttemptedTime is a testing-only method that sets the LastAttemptedAt field of a segment in the repair queue. It is not intended for production use.

func (*RepairJobQueue) TestingSetUpdatedTime added in v1.126.2

func (rjq *RepairJobQueue) TestingSetUpdatedTime(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, position metabase.SegmentPosition, t time.Time) (rowsAffected int64, err error)

TestingSetUpdatedTime is a testing-only method that sets the UpdatedAt field of a segment in the repair queue. It is not intended for production use.

type SegmentIdentifier

type SegmentIdentifier struct {
	// StreamID is the stream ID of the segment.
	StreamID uuid.UUID
	// Position is the position of the segment.
	Position uint64
}

SegmentIdentifier identifies individual segments in the repair queue.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳