Documentation
¶
Index ¶
- func Migrate(ctx context.Context, db pgxtype.Querier) error
- func PruneDb(ctx ctx.Context, db *pgx.Conn, batchLimit int, ...) error
- func WithTestDb(action func(queries *Queries, db *pgxpool.Pool) error) error
- type DBTX
- type Executor
- type ExecutorRepository
- type InsertMarkerParams
- type Job
- type JobRepository
- type JobRunError
- type JobRunLease
- type LegacyQueueRepository
- type Marker
- type PostgresExecutorRepository
- func (r *PostgresExecutorRepository) GetExecutors(ctx context.Context) ([]*schedulerobjects.Executor, error)
- func (r *PostgresExecutorRepository) GetLastUpdateTimes(ctx context.Context) (map[string]time.Time, error)
- func (r *PostgresExecutorRepository) StoreExecutor(ctx context.Context, executor *schedulerobjects.Executor) error
- type PostgresJobRepository
- func (r *PostgresJobRepository) CountReceivedPartitions(ctx context.Context, groupId uuid.UUID) (uint32, error)
- func (r *PostgresJobRepository) FetchJobRunErrors(ctx context.Context, runIds []uuid.UUID) (map[uuid.UUID]*armadaevents.Error, error)
- func (r *PostgresJobRepository) FetchJobRunLeases(ctx context.Context, executor string, maxResults uint, ...) ([]*JobRunLease, error)
- func (r *PostgresJobRepository) FetchJobUpdates(ctx context.Context, jobSerial int64, jobRunSerial int64) ([]Job, []Run, error)
- func (r *PostgresJobRepository) FindInactiveRuns(ctx context.Context, runIds []uuid.UUID) ([]uuid.UUID, error)
- type Queries
- func (q *Queries) CountGroup(ctx context.Context, groupID uuid.UUID) (int64, error)
- func (q *Queries) DeleteOldMarkers(ctx context.Context, cutoff time.Time) error
- func (q *Queries) FindActiveRuns(ctx context.Context, runIds []uuid.UUID) ([]uuid.UUID, error)
- func (q *Queries) InsertMarker(ctx context.Context, arg InsertMarkerParams) error
- func (q *Queries) MarkJobRunsFailedById(ctx context.Context, runIds []uuid.UUID) error
- func (q *Queries) MarkJobRunsReturnedById(ctx context.Context, runIds []uuid.UUID) error
- func (q *Queries) MarkJobRunsRunningById(ctx context.Context, runIds []uuid.UUID) error
- func (q *Queries) MarkJobRunsSucceededById(ctx context.Context, runIds []uuid.UUID) error
- func (q *Queries) MarkJobsCancelRequestedById(ctx context.Context, jobIds []string) error
- func (q *Queries) MarkJobsCancelRequestedBySets(ctx context.Context, jobSets []string) error
- func (q *Queries) MarkJobsCancelledById(ctx context.Context, jobIds []string) error
- func (q *Queries) MarkJobsFailedById(ctx context.Context, jobIds []string) error
- func (q *Queries) MarkJobsSucceededById(ctx context.Context, jobIds []string) error
- func (q *Queries) SelectAllExecutors(ctx context.Context) ([]Executor, error)
- func (q *Queries) SelectAllJobIds(ctx context.Context) ([]string, error)
- func (q *Queries) SelectAllMarkers(ctx context.Context) ([]Marker, error)
- func (q *Queries) SelectAllRunErrors(ctx context.Context) ([]JobRunError, error)
- func (q *Queries) SelectAllRunIds(ctx context.Context) ([]uuid.UUID, error)
- func (q *Queries) SelectExecutorUpdateTimes(ctx context.Context) ([]SelectExecutorUpdateTimesRow, error)
- func (q *Queries) SelectJobsForExecutor(ctx context.Context, arg SelectJobsForExecutorParams) ([]SelectJobsForExecutorRow, error)
- func (q *Queries) SelectNewJobs(ctx context.Context, arg SelectNewJobsParams) ([]Job, error)
- func (q *Queries) SelectNewRuns(ctx context.Context, arg SelectNewRunsParams) ([]Run, error)
- func (q *Queries) SelectNewRunsForJobs(ctx context.Context, arg SelectNewRunsForJobsParams) ([]Run, error)
- func (q *Queries) SelectRunErrorsById(ctx context.Context, runIds []uuid.UUID) ([]JobRunError, error)
- func (q *Queries) SelectUpdatedJobs(ctx context.Context, arg SelectUpdatedJobsParams) ([]SelectUpdatedJobsRow, error)
- func (q *Queries) UpdateJobPriorityById(ctx context.Context, arg UpdateJobPriorityByIdParams) error
- func (q *Queries) UpdateJobPriorityByJobSet(ctx context.Context, arg UpdateJobPriorityByJobSetParams) error
- func (q *Queries) UpsertExecutor(ctx context.Context, arg UpsertExecutorParams) error
- func (q *Queries) WithTx(tx pgx.Tx) *Queries
- type Queue
- type QueueRepository
- type RedisExecutorRepository
- func (r *RedisExecutorRepository) GetExecutors(_ context.Context) ([]*schedulerobjects.Executor, error)
- func (r *RedisExecutorRepository) GetLastUpdateTimes(_ context.Context) (map[string]time.Time, error)
- func (r *RedisExecutorRepository) StoreExecutor(_ context.Context, executor *schedulerobjects.Executor) error
- type Run
- type SelectExecutorUpdateTimesRow
- type SelectJobsForExecutorParams
- type SelectJobsForExecutorRow
- type SelectNewJobsParams
- type SelectNewRunsForJobsParams
- type SelectNewRunsParams
- type SelectUpdatedJobsParams
- type SelectUpdatedJobsRow
- type UpdateJobPriorityByIdParams
- type UpdateJobPriorityByJobSetParams
- type UpsertExecutorParams
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func PruneDb ¶ added in v0.3.47
func PruneDb(ctx ctx.Context, db *pgx.Conn, batchLimit int, keepAfterCompletion time.Duration, clock clock.Clock) error
PruneDb removes completed jobs (and related runs and errors) from the database if their `lastUpdateTime` is more than `keepAfterCompletion` in the past. Jobs are deleted in batches across transactions. This means that if this job fails midway through, it still may have deleted some jobs. The function will run until the supplied context is cancelled.
func WithTestDb ¶
func WithTestDb(action func(queries *Queries, db *pgxpool.Pool) error) error
Types ¶
type DBTX ¶
type DBTX interface {
Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
Query(context.Context, string, ...interface{}) (pgx.Rows, error)
QueryRow(context.Context, string, ...interface{}) pgx.Row
}
type Executor ¶
type Executor struct {
ExecutorID string `db:"executor_id"`
LastRequest []byte `db:"last_request"`
LastUpdated time.Time `db:"last_updated"`
}
type ExecutorRepository ¶
type ExecutorRepository interface {
// GetExecutors returns all known executors, regardless of their last heartbeat time
GetExecutors(ctx context.Context) ([]*schedulerobjects.Executor, error)
// GetLastUpdateTimes returns a map of executor name -> last heartbeat time
GetLastUpdateTimes(ctx context.Context) (map[string]time.Time, error)
// StoreExecutor persists the latest executor state
StoreExecutor(ctx context.Context, executor *schedulerobjects.Executor) error
}
ExecutorRepository is an interface to be implemented by structs which provide executor information
type InsertMarkerParams ¶ added in v0.3.48
type InsertMarkerParams struct {
GroupID uuid.UUID `db:"group_id"`
PartitionID int32 `db:"partition_id"`
Created time.Time `db:"created"`
}
type Job ¶
type Job struct {
JobID string `db:"job_id"`
JobSet string `db:"job_set"`
Queue string `db:"queue"`
UserID string `db:"user_id"`
Submitted int64 `db:"submitted"`
Groups []byte `db:"groups"`
Priority int64 `db:"priority"`
CancelRequested bool `db:"cancel_requested"`
Cancelled bool `db:"cancelled"`
CancelByJobsetRequested bool `db:"cancel_by_jobset_requested"`
Succeeded bool `db:"succeeded"`
Failed bool `db:"failed"`
SubmitMessage []byte `db:"submit_message"`
SchedulingInfo []byte `db:"scheduling_info"`
Serial int64 `db:"serial"`
LastModified time.Time `db:"last_modified"`
}
func (Job) GetSerial ¶
func (job Job) GetSerial() int64
GetSerial is needed for the HasSerial interface
func (Job) InTerminalState ¶
func (job Job) InTerminalState() bool
InTerminalState returns true if Job is in a terminal state
type JobRepository ¶
type JobRepository interface {
// FetchJobUpdates returns all jobs and job dbRuns that have been updated after jobSerial and jobRunSerial respectively
// These updates are guaranteed to be consistent with each other
FetchJobUpdates(ctx context.Context, jobSerial int64, jobRunSerial int64) ([]Job, []Run, error)
// FetchJobRunErrors returns all armadaevents.JobRunErrors for the provided job run ids. The returned map is
// keyed by job run id. Any dbRuns which don't have errors wil be absent from the map.
FetchJobRunErrors(ctx context.Context, runIds []uuid.UUID) (map[uuid.UUID]*armadaevents.Error, error)
// CountReceivedPartitions returns a count of the number of partition messages present in the database corresponding
// to the provided groupId. This is used by the scheduler to determine if the database represents the state of
// pulsar after a given point in time.
CountReceivedPartitions(ctx context.Context, groupId uuid.UUID) (uint32, error)
// FindInactiveRuns returns a slice containing all dbRuns that the scheduler does not currently consider active
// Runs are inactive if they don't exist or if they have succeeded, failed or been cancelled
FindInactiveRuns(ctx context.Context, runIds []uuid.UUID) ([]uuid.UUID, error)
// FetchJobRunLeases fetches new job runs for a given executor. A maximum of maxResults rows will be returned, while run
// in excludedRunIds will be excluded
FetchJobRunLeases(ctx context.Context, executor string, maxResults uint, excludedRunIds []uuid.UUID) ([]*JobRunLease, error)
}
JobRepository is an interface to be implemented by structs which provide job and run information
type JobRunError ¶
type JobRunError struct {
RunID uuid.UUID `db:"run_id"`
JobID string `db:"job_id"`
Error []byte `db:"error"`
}
type JobRunLease ¶
type JobRunLease struct {
RunID uuid.UUID
Queue string
JobSet string
UserID string
Node string
Groups []byte
SubmitMessage []byte
}
type LegacyQueueRepository ¶ added in v0.3.47
type LegacyQueueRepository struct {
// contains filtered or unexported fields
}
LegacyQueueRepository is a QueueRepository which is backed by Armada's redis store
func NewLegacyQueueRepository ¶ added in v0.3.47
func NewLegacyQueueRepository(db redis.UniversalClient) *LegacyQueueRepository
func (*LegacyQueueRepository) GetAllQueues ¶ added in v0.3.47
func (r *LegacyQueueRepository) GetAllQueues() ([]*Queue, error)
type Marker ¶
type Marker struct {
GroupID uuid.UUID `db:"group_id"`
PartitionID int32 `db:"partition_id"`
Created time.Time `db:"created"`
}
type PostgresExecutorRepository ¶
type PostgresExecutorRepository struct {
// contains filtered or unexported fields
}
PostgresExecutorRepository is an implementation of ExecutorRepository that stores its state in postgres
func NewPostgresExecutorRepository ¶
func NewPostgresExecutorRepository(db *pgxpool.Pool) *PostgresExecutorRepository
func (*PostgresExecutorRepository) GetExecutors ¶
func (r *PostgresExecutorRepository) GetExecutors(ctx context.Context) ([]*schedulerobjects.Executor, error)
GetExecutors returns all known executors, regardless of their last heartbeat time
func (*PostgresExecutorRepository) GetLastUpdateTimes ¶
func (r *PostgresExecutorRepository) GetLastUpdateTimes(ctx context.Context) (map[string]time.Time, error)
GetLastUpdateTimes returns a map of executor name -> last heartbeat time
func (*PostgresExecutorRepository) StoreExecutor ¶
func (r *PostgresExecutorRepository) StoreExecutor(ctx context.Context, executor *schedulerobjects.Executor) error
StoreExecutor persists the latest executor state
type PostgresJobRepository ¶
type PostgresJobRepository struct {
// contains filtered or unexported fields
}
PostgresJobRepository is an implementation of JobRepository that stores its state in postgres
func NewPostgresJobRepository ¶
func NewPostgresJobRepository(db *pgxpool.Pool, batchSize int32) *PostgresJobRepository
func (*PostgresJobRepository) CountReceivedPartitions ¶
func (r *PostgresJobRepository) CountReceivedPartitions(ctx context.Context, groupId uuid.UUID) (uint32, error)
CountReceivedPartitions returns a count of the number of partition messages present in the database corresponding to the provided groupId. This is used by the scheduler to determine if the database represents the state of pulsar after a given point in time.
func (*PostgresJobRepository) FetchJobRunErrors ¶
func (r *PostgresJobRepository) FetchJobRunErrors(ctx context.Context, runIds []uuid.UUID) (map[uuid.UUID]*armadaevents.Error, error)
FetchJobRunErrors returns all armadaevents.JobRunErrors for the provided job run ids. The returned map is keyed by job run id. Any dbRuns which don't have errors wil be absent from the map.
func (*PostgresJobRepository) FetchJobRunLeases ¶
func (r *PostgresJobRepository) FetchJobRunLeases(ctx context.Context, executor string, maxResults uint, excludedRunIds []uuid.UUID) ([]*JobRunLease, error)
FetchJobRunLeases fetches new job runs for a given executor. A maximum of maxResults rows will be returned, while run in excludedRunIds will be excluded
func (*PostgresJobRepository) FetchJobUpdates ¶
func (r *PostgresJobRepository) FetchJobUpdates(ctx context.Context, jobSerial int64, jobRunSerial int64) ([]Job, []Run, error)
FetchJobUpdates returns all jobs and job dbRuns that have been updated after jobSerial and jobRunSerial respectively These updates are guaranteed to be consistent with each other
func (*PostgresJobRepository) FindInactiveRuns ¶
func (r *PostgresJobRepository) FindInactiveRuns(ctx context.Context, runIds []uuid.UUID) ([]uuid.UUID, error)
FindInactiveRuns returns a slice containing all dbRuns that the scheduler does not currently consider active Runs are inactive if they don't exist or if they have succeeded, failed or been cancelled
type Queries ¶
type Queries struct {
// contains filtered or unexported fields
}
func (*Queries) CountGroup ¶
func (q *Queries) CountGroup(ctx context.Context, groupID uuid.UUID) (int64, error)
func (*Queries) DeleteOldMarkers ¶ added in v0.3.47
func (q *Queries) DeleteOldMarkers(ctx context.Context, cutoff time.Time) error
func (*Queries) FindActiveRuns ¶
func (q *Queries) FindActiveRuns(ctx context.Context, runIds []uuid.UUID) ([]uuid.UUID, error)
func (*Queries) InsertMarker ¶ added in v0.3.48
func (q *Queries) InsertMarker(ctx context.Context, arg InsertMarkerParams) error
func (*Queries) MarkJobRunsFailedById ¶
func (q *Queries) MarkJobRunsFailedById(ctx context.Context, runIds []uuid.UUID) error
func (*Queries) MarkJobRunsReturnedById ¶ added in v0.3.47
func (q *Queries) MarkJobRunsReturnedById(ctx context.Context, runIds []uuid.UUID) error
func (*Queries) MarkJobRunsRunningById ¶
func (q *Queries) MarkJobRunsRunningById(ctx context.Context, runIds []uuid.UUID) error
func (*Queries) MarkJobRunsSucceededById ¶
func (q *Queries) MarkJobRunsSucceededById(ctx context.Context, runIds []uuid.UUID) error
func (*Queries) MarkJobsCancelRequestedById ¶ added in v0.3.47
func (q *Queries) MarkJobsCancelRequestedById(ctx context.Context, jobIds []string) error
func (*Queries) MarkJobsCancelRequestedBySets ¶ added in v0.3.47
func (q *Queries) MarkJobsCancelRequestedBySets(ctx context.Context, jobSets []string) error
func (*Queries) MarkJobsCancelledById ¶
func (q *Queries) MarkJobsCancelledById(ctx context.Context, jobIds []string) error
func (*Queries) MarkJobsFailedById ¶
func (q *Queries) MarkJobsFailedById(ctx context.Context, jobIds []string) error
func (*Queries) MarkJobsSucceededById ¶
func (q *Queries) MarkJobsSucceededById(ctx context.Context, jobIds []string) error
func (*Queries) SelectAllExecutors ¶
func (q *Queries) SelectAllExecutors(ctx context.Context) ([]Executor, error)
func (*Queries) SelectAllJobIds ¶ added in v0.3.47
func (q *Queries) SelectAllJobIds(ctx context.Context) ([]string, error)
func (*Queries) SelectAllMarkers ¶ added in v0.3.47
func (q *Queries) SelectAllMarkers(ctx context.Context) ([]Marker, error)
func (*Queries) SelectAllRunErrors ¶ added in v0.3.47
func (q *Queries) SelectAllRunErrors(ctx context.Context) ([]JobRunError, error)
func (*Queries) SelectAllRunIds ¶ added in v0.3.47
func (q *Queries) SelectAllRunIds(ctx context.Context) ([]uuid.UUID, error)
func (*Queries) SelectExecutorUpdateTimes ¶
func (q *Queries) SelectExecutorUpdateTimes(ctx context.Context) ([]SelectExecutorUpdateTimesRow, error)
func (*Queries) SelectJobsForExecutor ¶
func (q *Queries) SelectJobsForExecutor(ctx context.Context, arg SelectJobsForExecutorParams) ([]SelectJobsForExecutorRow, error)
func (*Queries) SelectNewJobs ¶
func (q *Queries) SelectNewJobs(ctx context.Context, arg SelectNewJobsParams) ([]Job, error)
func (*Queries) SelectNewRuns ¶
func (q *Queries) SelectNewRuns(ctx context.Context, arg SelectNewRunsParams) ([]Run, error)
func (*Queries) SelectNewRunsForJobs ¶
func (q *Queries) SelectNewRunsForJobs(ctx context.Context, arg SelectNewRunsForJobsParams) ([]Run, error)
func (*Queries) SelectRunErrorsById ¶
func (q *Queries) SelectRunErrorsById(ctx context.Context, runIds []uuid.UUID) ([]JobRunError, error)
Run errors
func (*Queries) SelectUpdatedJobs ¶
func (q *Queries) SelectUpdatedJobs(ctx context.Context, arg SelectUpdatedJobsParams) ([]SelectUpdatedJobsRow, error)
func (*Queries) UpdateJobPriorityById ¶
func (q *Queries) UpdateJobPriorityById(ctx context.Context, arg UpdateJobPriorityByIdParams) error
func (*Queries) UpdateJobPriorityByJobSet ¶
func (q *Queries) UpdateJobPriorityByJobSet(ctx context.Context, arg UpdateJobPriorityByJobSetParams) error
func (*Queries) UpsertExecutor ¶
func (q *Queries) UpsertExecutor(ctx context.Context, arg UpsertExecutorParams) error
type QueueRepository ¶
type QueueRepository interface {
GetAllQueues() ([]*Queue, error)
}
QueueRepository is an interface to be implemented by structs which provide queue information
type RedisExecutorRepository ¶ added in v0.3.49
type RedisExecutorRepository struct {
// contains filtered or unexported fields
}
func NewRedisExecutorRepository ¶ added in v0.3.49
func NewRedisExecutorRepository(db redis.UniversalClient, schedulerName string) *RedisExecutorRepository
func (*RedisExecutorRepository) GetExecutors ¶ added in v0.3.49
func (r *RedisExecutorRepository) GetExecutors(_ context.Context) ([]*schedulerobjects.Executor, error)
func (*RedisExecutorRepository) GetLastUpdateTimes ¶ added in v0.3.49
func (r *RedisExecutorRepository) GetLastUpdateTimes(_ context.Context) (map[string]time.Time, error)
func (*RedisExecutorRepository) StoreExecutor ¶ added in v0.3.49
func (r *RedisExecutorRepository) StoreExecutor(_ context.Context, executor *schedulerobjects.Executor) error
type Run ¶
type Run struct {
RunID uuid.UUID `db:"run_id"`
JobID string `db:"job_id"`
Created int64 `db:"created"`
JobSet string `db:"job_set"`
Executor string `db:"executor"`
Node string `db:"node"`
Cancelled bool `db:"cancelled"`
Running bool `db:"running"`
Succeeded bool `db:"succeeded"`
Failed bool `db:"failed"`
Returned bool `db:"returned"`
Serial int64 `db:"serial"`
LastModified time.Time `db:"last_modified"`
}
type SelectExecutorUpdateTimesRow ¶
type SelectExecutorUpdateTimesRow struct {
ExecutorID string `db:"executor_id"`
LastUpdated time.Time `db:"last_updated"`
}
type SelectJobsForExecutorParams ¶
type SelectJobsForExecutorParams struct {
Executor string `db:"executor"`
RunIds []uuid.UUID `db:"run_ids"`
}
type SelectJobsForExecutorRow ¶
type SelectJobsForExecutorRow struct {
RunID uuid.UUID `db:"run_id"`
Queue string `db:"queue"`
JobSet string `db:"job_set"`
UserID string `db:"user_id"`
Groups []byte `db:"groups"`
SubmitMessage []byte `db:"submit_message"`
}
type SelectNewJobsParams ¶
type SelectNewJobsParams struct {
Serial int64 `db:"serial"`
Limit int32 `db:"limit"`
}
type SelectNewRunsForJobsParams ¶
type SelectNewRunsForJobsParams struct {
Serial int64 `db:"serial"`
JobIds []string `db:"job_ids"`
}
type SelectNewRunsParams ¶
type SelectNewRunsParams struct {
Serial int64 `db:"serial"`
Limit int32 `db:"limit"`
}
type SelectUpdatedJobsParams ¶
type SelectUpdatedJobsParams struct {
Serial int64 `db:"serial"`
Limit int32 `db:"limit"`
}
type SelectUpdatedJobsRow ¶
type SelectUpdatedJobsRow struct {
JobID string `db:"job_id"`
JobSet string `db:"job_set"`
Queue string `db:"queue"`
Priority int64 `db:"priority"`
Submitted int64 `db:"submitted"`
CancelRequested bool `db:"cancel_requested"`
CancelByJobsetRequested bool `db:"cancel_by_jobset_requested"`
Cancelled bool `db:"cancelled"`
Succeeded bool `db:"succeeded"`
Failed bool `db:"failed"`
SchedulingInfo []byte `db:"scheduling_info"`
Serial int64 `db:"serial"`
}
type UpdateJobPriorityByIdParams ¶
type UpdateJobPriorityByIdParams struct {
Priority int64 `db:"priority"`
JobID string `db:"job_id"`
}
type UpdateJobPriorityByJobSetParams ¶
type UpdateJobPriorityByJobSetParams struct {
Priority int64 `db:"priority"`
JobSet string `db:"job_set"`
}
type UpsertExecutorParams ¶
type UpsertExecutorParams struct {
ExecutorID string `db:"executor_id"`
LastRequest []byte `db:"last_request"`
UpdateTime time.Time `db:"update_time"`
}