Documentation
¶
Index ¶
- type JobqEndpoint
- func (se *JobqEndpoint) Clean(ctx context.Context, req *pb.JobQueueCleanRequest) (_ *pb.JobQueueCleanResponse, err error)
- func (se *JobqEndpoint) Delete(ctx context.Context, req *pb.JobQueueDeleteRequest) (_ *pb.JobQueueDeleteResponse, err error)
- func (se *JobqEndpoint) Inspect(ctx context.Context, req *pb.JobQueueInspectRequest) (_ *pb.JobQueueInspectResponse, err error)
- func (se *JobqEndpoint) Len(ctx context.Context, req *pb.JobQueueLengthRequest) (_ *pb.JobQueueLengthResponse, err error)
- func (se *JobqEndpoint) Peek(ctx context.Context, req *pb.JobQueuePeekRequest) (_ *pb.JobQueuePeekResponse, err error)
- func (se *JobqEndpoint) Pop(ctx context.Context, req *pb.JobQueuePopRequest) (_ *pb.JobQueuePopResponse, err error)
- func (se *JobqEndpoint) Push(ctx context.Context, req *pb.JobQueuePushRequest) (_ *pb.JobQueuePushResponse, err error)
- func (se *JobqEndpoint) PushBatch(ctx context.Context, req *pb.JobQueuePushBatchRequest) (_ *pb.JobQueuePushBatchResponse, err error)
- func (se *JobqEndpoint) Stat(ctx context.Context, req *pb.JobQueueStatRequest) (_ *pb.JobQueueStatResponse, err error)
- func (se *JobqEndpoint) TestingSetAttemptedTime(ctx context.Context, req *pb.JobQueueTestingSetAttemptedTimeRequest) (_ *pb.JobQueueTestingSetAttemptedTimeResponse, err error)
- func (se *JobqEndpoint) TestingSetUpdatedTime(ctx context.Context, req *pb.JobQueueTestingSetUpdatedTimeRequest) (_ *pb.JobQueueTestingSetUpdatedTimeResponse, err error)
- func (se *JobqEndpoint) Trim(ctx context.Context, req *pb.JobQueueTrimRequest) (_ *pb.JobQueueTrimResponse, err error)
- func (se *JobqEndpoint) Truncate(ctx context.Context, req *pb.JobQueueTruncateRequest) (_ *pb.JobQueueTruncateResponse, err error)
- type QueueMap
- func (qm *QueueMap) ChooseQueues(includedPlacements, excludedPlacements []storj.PlacementConstraint) map[storj.PlacementConstraint]*jobqueue.Queue
- func (qm *QueueMap) GetAllQueues() map[storj.PlacementConstraint]*jobqueue.Queue
- func (qm *QueueMap) GetQueue(placement storj.PlacementConstraint) (q *jobqueue.Queue, err error)
- func (qm *QueueMap) StopAll()
- type Server
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type JobqEndpoint ¶
type JobqEndpoint struct {
// contains filtered or unexported fields
}
JobqEndpoint implements the DRPCJobQueueServer interface.
func NewEndpoint ¶
func NewEndpoint(log *zap.Logger, queues *QueueMap) *JobqEndpoint
NewEndpoint creates a new endpoint.
func (*JobqEndpoint) Clean ¶
func (se *JobqEndpoint) Clean(ctx context.Context, req *pb.JobQueueCleanRequest) (_ *pb.JobQueueCleanResponse, err error)
Clean removes all jobs from the queue that were last updated before the requested time. If the given placement is negative, all queues are cleaned.
func (*JobqEndpoint) Delete ¶ added in v1.125.2
func (se *JobqEndpoint) Delete(ctx context.Context, req *pb.JobQueueDeleteRequest) (_ *pb.JobQueueDeleteResponse, err error)
Delete removes a specific job from the queue by its placement, streamID, and position.
func (*JobqEndpoint) Inspect ¶
func (se *JobqEndpoint) Inspect(ctx context.Context, req *pb.JobQueueInspectRequest) (_ *pb.JobQueueInspectResponse, err error)
Inspect finds a particular job in the queue by its placement, streamID, and position and returns all of the job information.
func (*JobqEndpoint) Len ¶
func (se *JobqEndpoint) Len(ctx context.Context, req *pb.JobQueueLengthRequest) (_ *pb.JobQueueLengthResponse, err error)
Len returns the number of jobs in the queues for the requested placement.
func (*JobqEndpoint) Peek ¶
func (se *JobqEndpoint) Peek(ctx context.Context, req *pb.JobQueuePeekRequest) (_ *pb.JobQueuePeekResponse, err error)
Peek returns the lowest-health job from the queues for the requested placement without removing the job from its queue.
func (*JobqEndpoint) Pop ¶
func (se *JobqEndpoint) Pop(ctx context.Context, req *pb.JobQueuePopRequest) (_ *pb.JobQueuePopResponse, err error)
Pop removes and returns the 'limit' lowest-health jobs from the queues for the requested placements.
func (*JobqEndpoint) Push ¶
func (se *JobqEndpoint) Push(ctx context.Context, req *pb.JobQueuePushRequest) (_ *pb.JobQueuePushResponse, err error)
Push inserts a job into the appropriate queue for its placement.
func (*JobqEndpoint) PushBatch ¶
func (se *JobqEndpoint) PushBatch(ctx context.Context, req *pb.JobQueuePushBatchRequest) (_ *pb.JobQueuePushBatchResponse, err error)
PushBatch inserts multiple jobs into the appropriate queues for their placements.
func (*JobqEndpoint) Stat ¶ added in v1.125.2
func (se *JobqEndpoint) Stat(ctx context.Context, req *pb.JobQueueStatRequest) (_ *pb.JobQueueStatResponse, err error)
Stat returns statistics about the queues for the requested placement. Note: this is expensive! It requires a full scan of the target queues.
func (*JobqEndpoint) TestingSetAttemptedTime ¶ added in v1.125.2
func (se *JobqEndpoint) TestingSetAttemptedTime(ctx context.Context, req *pb.JobQueueTestingSetAttemptedTimeRequest) (_ *pb.JobQueueTestingSetAttemptedTimeResponse, err error)
TestingSetAttemptedTime sets the attempted time for a specific job in the queue. This is a testing-only method.
func (*JobqEndpoint) TestingSetUpdatedTime ¶ added in v1.126.2
func (se *JobqEndpoint) TestingSetUpdatedTime(ctx context.Context, req *pb.JobQueueTestingSetUpdatedTimeRequest) (_ *pb.JobQueueTestingSetUpdatedTimeResponse, err error)
TestingSetUpdatedTime sets the updated time for a specific job in the queue. This is a testing-only method.
func (*JobqEndpoint) Trim ¶
func (se *JobqEndpoint) Trim(ctx context.Context, req *pb.JobQueueTrimRequest) (_ *pb.JobQueueTrimResponse, err error)
Trim removes all jobs from the queue with health greater than the given value. If the given placement is negative, all queues are trimmed.
func (*JobqEndpoint) Truncate ¶
func (se *JobqEndpoint) Truncate(ctx context.Context, req *pb.JobQueueTruncateRequest) (_ *pb.JobQueueTruncateResponse, err error)
Truncate removes all jobs from the queue for the requested placement. The queue is not destroyed.
type QueueMap ¶
type QueueMap struct {
// contains filtered or unexported fields
}
QueueMap is a thread-safe mapping of placement constraints to queues.
func NewQueueMap ¶
func NewQueueMap(log *zap.Logger, queueFactory func(storj.PlacementConstraint) (*jobqueue.Queue, error)) *QueueMap
NewQueueMap creates a new QueueMap.
func (*QueueMap) ChooseQueues ¶ added in v1.125.2
func (qm *QueueMap) ChooseQueues(includedPlacements, excludedPlacements []storj.PlacementConstraint) map[storj.PlacementConstraint]*jobqueue.Queue
ChooseQueues returns a map of queues that match the given placement constraints. If includedPlacements is non-empty, only queues for the given placements are returned. If excludedPlacements is non-empty, queues for the given placements are excluded.
func (*QueueMap) GetAllQueues ¶
func (qm *QueueMap) GetAllQueues() map[storj.PlacementConstraint]*jobqueue.Queue
GetAllQueues gets a copy of the current queue map. It is possible for another caller to have destroyed one or more queues between this call and the time when the caller uses the returned map. If this happens, the affected queues will simply appear empty.
type Server ¶
type Server struct { QueueMap *QueueMap // contains filtered or unexported fields }
Server represents a job queue DRPC server.
func New ¶
func New(log *zap.Logger, listener net.Listener, tlsOpts *tlsopts.Options, retryAfter time.Duration, initAlloc, maxItems, memReleaseThreshold int) (*Server, error)
New creates a new Server instance.
func (*Server) Run ¶
Run runs the server (accepting connections on the listener) until the context is canceled.
func (*Server) SetTimeFunc ¶
SetTimeFunc sets the time function for all queues currently initialized in the server. This is primarily used for testing to control the timestamps used in the queue.
Importantly, this will not affect queues to be initialized after this point.