Documentation
¶
Index ¶
- Constants
- Variables
- func ConvertJobToProtobuf(job RepairJob) *pb.RepairJob
- type Client
- func (c *Client) Clean(ctx context.Context, placement storj.PlacementConstraint, ...) (removedSegments int32, err error)
- func (c *Client) CleanAll(ctx context.Context, updatedBefore time.Time) (removedSegments int32, err error)
- func (c *Client) Close() error
- func (c *Client) Delete(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, ...) (wasDeleted bool, err error)
- func (c *Client) Inspect(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, ...) (job RepairJob, err error)
- func (c *Client) Len(ctx context.Context, placement storj.PlacementConstraint) (repairLen, retryLen int64, err error)
- func (c *Client) LenAll(ctx context.Context) (repairLen, retryLen int64, err error)
- func (c *Client) Peek(ctx context.Context, limit int, ...) (jobs []RepairJob, err error)
- func (c *Client) Pop(ctx context.Context, limit int, ...) (jobs []RepairJob, err error)
- func (c *Client) Push(ctx context.Context, job RepairJob) (wasNew bool, err error)
- func (c *Client) PushBatch(ctx context.Context, jobs []RepairJob) (wasNew []bool, err error)
- func (c *Client) Stat(ctx context.Context, placement storj.PlacementConstraint) (stat QueueStat, err error)
- func (c *Client) StatAll(ctx context.Context) (stats []QueueStat, err error)
- func (c *Client) TestingSetAttemptedTime(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, ...) (rowsAffected int64, err error)
- func (c *Client) TestingSetUpdatedTime(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, ...) (rowsAffected int64, err error)
- func (c *Client) Trim(ctx context.Context, placement storj.PlacementConstraint, ...) (removedSegments int32, err error)
- func (c *Client) TrimAll(ctx context.Context, healthGreaterThan float64) (removedSegments int32, err error)
- func (c *Client) Truncate(ctx context.Context, placement storj.PlacementConstraint) error
- func (c *Client) TruncateAll(ctx context.Context) error
- type Config
- type QueueStat
- type RepairJob
- type RepairJobQueue
- func (rjq *RepairJobQueue) Clean(ctx context.Context, updatedBefore time.Time) (int64, error)
- func (rjq *RepairJobQueue) Close() error
- func (rjq *RepairJobQueue) Count(ctx context.Context) (int, error)
- func (rjq *RepairJobQueue) Delete(ctx context.Context, job queue.InjuredSegment) error
- func (rjq *RepairJobQueue) Insert(ctx context.Context, job *queue.InjuredSegment) (alreadyInserted bool, err error)
- func (rjq *RepairJobQueue) InsertBatch(ctx context.Context, segments []*queue.InjuredSegment) (newlyInsertedSegments []*queue.InjuredSegment, err error)
- func (rjq *RepairJobQueue) Release(ctx context.Context, job queue.InjuredSegment, repaired bool) error
- func (rjq *RepairJobQueue) Select(ctx context.Context, limit int, includedPlacements []storj.PlacementConstraint, ...) ([]queue.InjuredSegment, error)
- func (rjq *RepairJobQueue) SelectN(ctx context.Context, limit int) ([]queue.InjuredSegment, error)
- func (rjq *RepairJobQueue) Stat(ctx context.Context) ([]queue.Stat, error)
- func (rjq *RepairJobQueue) TestingSetAttemptedTime(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, ...) (rowsAffected int64, err error)
- func (rjq *RepairJobQueue) TestingSetUpdatedTime(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, ...) (rowsAffected int64, err error)
- type SegmentIdentifier
Constants ¶
const ServerTimeNow uint64 = 123456789 // 1973-11-29T21:33:09Z
ServerTimeNow is a flag indicating that the server should fill in its idea of the current time for the indicated field. This must be a value that is preserved through a uint64->time.Time->uint64 conversion round trip and is unlikely to be used naturally.
Variables ¶
var ErrJobNotFound = errors.New("job not found")
ErrJobNotFound is returned by the client when a particular job is not found.
var ErrQueueEmpty = errors.New("queue is empty")
ErrQueueEmpty is returned by the client when the queue is empty.
var RecordSize = unsafe.Sizeof(RepairJob{})
RecordSize is the size of a RepairJob record in bytes. It includes any padding that may be added by the compiler to align the record to a multiple of the word size for the target arch.
Functions ¶
func ConvertJobToProtobuf ¶
ConvertJobToProtobuf converts a RepairJob record to a protobuf representation.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client wraps a DRPCJobQueueClient.
func (*Client) Clean ¶
func (c *Client) Clean(ctx context.Context, placement storj.PlacementConstraint, updatedBefore time.Time) (removedSegments int32, err error)
Clean removes all jobs with UpdatedAt time before the given cutoff.
func (*Client) CleanAll ¶ added in v1.125.2
func (c *Client) CleanAll(ctx context.Context, updatedBefore time.Time) (removedSegments int32, err error)
CleanAll removes all jobs with UpdatedAt time before the given cutoff from all placement queues.
func (*Client) Delete ¶ added in v1.125.2
func (c *Client) Delete(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, position uint64) (wasDeleted bool, err error)
Delete removes a specific job from the indicated queue.
func (*Client) Inspect ¶
func (c *Client) Inspect(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, position uint64) (job RepairJob, err error)
Inspect finds a job in the queue by streamID, position, and placement, and returns all of the job information. If the job is not found, it returns ErrJobNotFound.
func (*Client) Len ¶
func (c *Client) Len(ctx context.Context, placement storj.PlacementConstraint) (repairLen, retryLen int64, err error)
Len returns the number of items in the indicated job queue.
func (*Client) LenAll ¶ added in v1.125.2
LenAll sums up the number of items in all queues on the server.
func (*Client) Peek ¶
func (c *Client) Peek(ctx context.Context, limit int, includedPlacements, excludedPlacements []storj.PlacementConstraint) (jobs []RepairJob, err error)
Peek returns the 'limit' lowest-health items from the indicated job queues without removing them. If there are less than 'limit' items in all the queues, it returns all of them.
func (*Client) Pop ¶
func (c *Client) Pop(ctx context.Context, limit int, includedPlacements, excludedPlacements []storj.PlacementConstraint) (jobs []RepairJob, err error)
Pop removes and returns the 'limit' lowest-health items from the indicated job queues. If there are less than 'limit' items in the queue, it removes and returns all of them.
func (*Client) Push ¶
Push adds a new item to the job queue with the given health.
It returns an indication of whether the given segment was newly inserted or if it already existed in the target queue.
func (*Client) PushBatch ¶
PushBatch adds multiple items to the appropriate job queues with the given health values.
It returns a slice of booleans indicating whether each segment was newly inserted or if it already existed in the target queue.
func (*Client) Stat ¶ added in v1.125.2
func (c *Client) Stat(ctx context.Context, placement storj.PlacementConstraint) (stat QueueStat, err error)
Stat collects statistics about the indicated job queue.
func (*Client) StatAll ¶ added in v1.125.2
StatAll collects statistics about all job queues on a server.
func (*Client) TestingSetAttemptedTime ¶ added in v1.125.2
func (c *Client) TestingSetAttemptedTime(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, position uint64, t time.Time) (rowsAffected int64, err error)
TestingSetAttemptedTime sets the LastAttemptedAt field of a specific job. This is only intended for testing scenarios.
func (*Client) TestingSetUpdatedTime ¶ added in v1.126.2
func (c *Client) TestingSetUpdatedTime(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, position uint64, t time.Time) (rowsAffected int64, err error)
TestingSetUpdatedTime sets the UpdatedAt field of a specific job. This is only intended for testing scenarios.
func (*Client) Trim ¶
func (c *Client) Trim(ctx context.Context, placement storj.PlacementConstraint, healthGreaterThan float64) (removedSegments int32, err error)
Trim removes all jobs with Health greater than the given threshold.
func (*Client) TrimAll ¶ added in v1.125.2
func (c *Client) TrimAll(ctx context.Context, healthGreaterThan float64) (removedSegments int32, err error)
TrimAll removes all jobs with UpdatedAt time before the given cutoff from all placement queues.
type Config ¶ added in v1.125.2
type Config struct { ServerNodeURL storj.NodeURL `help:"\"node URL\" of the job queue server" default:"" testDefault:""` TLS tlsopts.Config }
Config holds the Storj-style configuration for a job queue client.
type QueueStat ¶ added in v1.125.2
type QueueStat struct { Placement storj.PlacementConstraint Count int64 MaxInsertedAt time.Time MinInsertedAt time.Time MaxAttemptedAt *time.Time MinAttemptedAt *time.Time MinSegmentHealth float64 MaxSegmentHealth float64 }
QueueStat contains statistics about a queue or set of queues.
type RepairJob ¶
type RepairJob struct { ID SegmentIdentifier Health float64 InsertedAt uint64 LastAttemptedAt uint64 UpdatedAt uint64 NumAttempts uint16 Placement uint16 NumMissing uint16 NumOutOfPlacement uint16 }
RepairJob represents the in-memory structure of a job in the queue. This structure does not _need_ to be a multiple of 64 bits in size, but it's probably going to be aligned to a multiple of 64 bits in memory either way, so it's more efficient if we use that whole space.
func ConvertJobFromProtobuf ¶
ConvertJobFromProtobuf converts a protobuf representation of a RepairJob to a RepairJob record.
func (RepairJob) LastAttemptedAtTime ¶
LastAttemptedAtTime returns the LastAttemptedAt field as a time.Time.
type RepairJobQueue ¶ added in v1.125.2
type RepairJobQueue struct {
// contains filtered or unexported fields
}
RepairJobQueue is a Storj-style repair queue, meant to be a near drop-in replacement for the PostgreSQL arrangement.
func OpenJobQueue ¶ added in v1.126.2
func OpenJobQueue(ctx context.Context, fi *identity.FullIdentity, config Config) (*RepairJobQueue, error)
OpenJobQueue opens a RepairJobQueue with the given configuration.
func WrapJobQueue ¶ added in v1.127.1
func WrapJobQueue(cli *Client) *RepairJobQueue
WrapJobQueue wraps a jobq Client to become a RepairJobQueue.
func (*RepairJobQueue) Clean ¶ added in v1.125.2
Clean removes all segments from the repair queue that were last updated before the given time. It returns the number of segments removed.
func (*RepairJobQueue) Close ¶ added in v1.125.2
func (rjq *RepairJobQueue) Close() error
Close closes the connection to the job queue server.
func (*RepairJobQueue) Count ¶ added in v1.125.2
func (rjq *RepairJobQueue) Count(ctx context.Context) (int, error)
Count returns the number of segments in the repair queues, including all placement queues and including retry queues.
func (*RepairJobQueue) Delete ¶ added in v1.125.2
func (rjq *RepairJobQueue) Delete(ctx context.Context, job queue.InjuredSegment) error
Delete removes a specific segment from its repair queue.
func (*RepairJobQueue) Insert ¶ added in v1.125.2
func (rjq *RepairJobQueue) Insert(ctx context.Context, job *queue.InjuredSegment) (alreadyInserted bool, err error)
Insert adds a segment to the appropriate repair queue. If the segment is already in the queue, it is not added again. wasInserted is true if the segment was already added to the queue, false if it was inserted by this call.
func (*RepairJobQueue) InsertBatch ¶ added in v1.125.2
func (rjq *RepairJobQueue) InsertBatch(ctx context.Context, segments []*queue.InjuredSegment) (newlyInsertedSegments []*queue.InjuredSegment, err error)
InsertBatch adds multiple segments to the appropriate repair queues. If a segment is already in the queue, it is not added again. newlyInsertedSegments is the list of segments that were added to the queue.
func (*RepairJobQueue) Release ¶ added in v1.125.2
func (rjq *RepairJobQueue) Release(ctx context.Context, job queue.InjuredSegment, repaired bool) error
Release does what's necessary to mark a repair job as succeeded or failed. In the case of RepairJobQueue, Release puts a segment back into the queue if it has failed.
func (*RepairJobQueue) Select ¶ added in v1.125.2
func (rjq *RepairJobQueue) Select(ctx context.Context, limit int, includedPlacements []storj.PlacementConstraint, excludedPlacements []storj.PlacementConstraint) ([]queue.InjuredSegment, error)
Select removes and returns up to limit segments from the repair queue that match the given placement constraints. If includedPlacements is non-empty, only segments with placements in includedPlacements are returned. If excludedPlacements is non-empty, only segments with placements not in excludedPlacements are returned.
func (*RepairJobQueue) SelectN ¶ added in v1.125.2
func (rjq *RepairJobQueue) SelectN(ctx context.Context, limit int) ([]queue.InjuredSegment, error)
SelectN returns up to limit segments from the repair queues- whichever segments from any queue have the lowest health, only including segments that are currently eligible for repair.
Note that this is very different behavior from Select(); these segments are not removed from their queues. The similarity in naming is regrettable.
func (*RepairJobQueue) Stat ¶ added in v1.125.2
Stat returns statistics about the repair queues. Note: this is expensive! It requires a full scan of all queues.
func (*RepairJobQueue) TestingSetAttemptedTime ¶ added in v1.125.2
func (rjq *RepairJobQueue) TestingSetAttemptedTime(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, position metabase.SegmentPosition, t time.Time) (rowsAffected int64, err error)
TestingSetAttemptedTime is a testing-only method that sets the LastAttemptedAt field of a segment in the repair queue. It is not intended for production use.
func (*RepairJobQueue) TestingSetUpdatedTime ¶ added in v1.126.2
func (rjq *RepairJobQueue) TestingSetUpdatedTime(ctx context.Context, placement storj.PlacementConstraint, streamID uuid.UUID, position metabase.SegmentPosition, t time.Time) (rowsAffected int64, err error)
TestingSetUpdatedTime is a testing-only method that sets the UpdatedAt field of a segment in the repair queue. It is not intended for production use.
type SegmentIdentifier ¶
type SegmentIdentifier struct { // StreamID is the stream ID of the segment. StreamID uuid.UUID // Position is the position of the segment. Position uint64 }
SegmentIdentifier identifies individual segments in the repair queue.