Documentation
¶
Index ¶
- func NewInstructionConverter(metrics *metrics.Metrics, priorityClasses map[string]types.PriorityClass, ...) ingest.InstructionConverter[*DbOperationsWithMessageIds]
- func NewSchedulerDb(db *pgxpool.Pool, metrics *metrics.Metrics, initialBackOff time.Duration, ...) ingest.Sink[*DbOperationsWithMessageIds]
- func Run(config Configuration)
- type Configuration
- type DbOperation
- type DbOperationsWithMessageIds
- type InsertJobRunErrors
- type InsertJobs
- type InsertPartitionMarker
- type InsertRuns
- type InstructionConverter
- type JobQueuedStateUpdate
- type JobRunDetails
- type JobRunFailed
- type JobSchedulingInfoUpdate
- type JobSetCancelAction
- type JobSetKey
- type JobSetOperation
- type MarkJobSetsCancelRequested
- type MarkJobsCancelRequested
- type MarkJobsCancelled
- type MarkJobsFailed
- type MarkJobsSucceeded
- type MarkRunsFailed
- type MarkRunsRunning
- type MarkRunsSucceeded
- type SchedulerDb
- type UpdateJobPriorities
- type UpdateJobQueuedState
- type UpdateJobSchedulingInfo
- type UpdateJobSetPriorities
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewInstructionConverter ¶
func NewInstructionConverter(
metrics *metrics.Metrics,
priorityClasses map[string]types.PriorityClass,
compressor compress.Compressor,
) ingest.InstructionConverter[*DbOperationsWithMessageIds]
func NewSchedulerDb ¶
func NewSchedulerDb(
db *pgxpool.Pool,
metrics *metrics.Metrics,
initialBackOff time.Duration,
maxBackOff time.Duration,
lockTimeout time.Duration,
) ingest.Sink[*DbOperationsWithMessageIds]
Types ¶
type Configuration ¶
type Configuration struct {
// Database configuration
Postgres configuration.PostgresConfig
// Metrics configuration
Metrics configuration.MetricsConfig
// General Pulsar configuration
Pulsar configuration.PulsarConfig
// Map of allowed priority classes by name
PriorityClasses map[string]types.PriorityClass
// Pulsar subscription name
SubscriptionName string
// Number of messages that will be batched together before being inserted into the database
BatchSize int
// Maximum time since the last batch before a batch will be inserted into the database
BatchDuration time.Duration
// Time for which the pulsar consumer will wait for a new message before retrying
PulsarReceiveTimeout time.Duration
// Time for which the pulsar consumer will back off after receiving an error on trying to receive a message
PulsarBackoffTime time.Duration
}
type DbOperation ¶
type DbOperation interface {
// a.Merge(b) attempts to merge b into a, creating a single combined op.
// Returns true if merging was successful.
// If successful, modifies a in-place.
// If not successful, neither op is mutated.
Merge(DbOperation) bool
// a.CanBeAppliedBefore(b) returns true if a can be placed before b
// without changing the end result of the overall set of operations.
CanBeAppliedBefore(DbOperation) bool
}
DbOperation captures a generic batch database operation.
There are 5 types of operations: - Insert jobs (i.e., add new jobs to the schedulerdb). - Insert runs (i.e., add new runs to the schedulerdb). - Job set operations (i.e., modify all jobs and runs in the schedulerdb part of a given job set). - Job operations (i.e., modify particular jobs). - Job run operations (i.e., modify particular runs).
To improve performance, several ops can be merged into a single op if of the same type. To increase the number of ops that can be merged, ops can sometimes be reordered.
Specifically, an op can be applied before another if: - Insert jobs: if prior op doesn't affect the job set. - Insert runs: if prior op doesn't affect the job set or defines the corresponding job. - Job set operations: if not affecting a job defined in prior op. - Job operations: if not affecting a job defined in a prior op. - Job run operations: if not affecting a run defined in a prior op.
In addition, UpdateJobPriorities can never be applied beforee UpdateJobSetPriorities and vice versa, since one may overwrite values set by the other.
func AppendDbOperation ¶
func AppendDbOperation(ops []DbOperation, op DbOperation) []DbOperation
AppendDbOperation appends a sql operation, possibly merging it with a previous operation if that can be done in such a way that the end result of applying the entire sequence of operations is unchanged.
type DbOperationsWithMessageIds ¶
type DbOperationsWithMessageIds struct {
Ops []DbOperation
MessageIds []pulsar.MessageID
}
DbOperationsWithMessageIds bundles a sequence of schedulerdb ops with the ids of all Pulsar messages that were consumed to produce it.
func (*DbOperationsWithMessageIds) GetMessageIDs ¶
func (d *DbOperationsWithMessageIds) GetMessageIDs() []pulsar.MessageID
type InsertJobRunErrors ¶ added in v0.3.47
type InsertJobRunErrors map[uuid.UUID]*schedulerdb.JobRunError
func (InsertJobRunErrors) CanBeAppliedBefore ¶ added in v0.3.47
func (a InsertJobRunErrors) CanBeAppliedBefore(_ DbOperation) bool
type InsertJobs ¶
type InsertJobs map[string]*schedulerdb.Job
func (InsertJobs) CanBeAppliedBefore ¶
func (a InsertJobs) CanBeAppliedBefore(b DbOperation) bool
type InsertPartitionMarker ¶ added in v0.3.48
type InsertPartitionMarker struct {
// contains filtered or unexported fields
}
func (*InsertPartitionMarker) CanBeAppliedBefore ¶ added in v0.3.48
func (a *InsertPartitionMarker) CanBeAppliedBefore(b DbOperation) bool
type InsertRuns ¶
type InsertRuns map[uuid.UUID]*JobRunDetails
func (InsertRuns) CanBeAppliedBefore ¶
func (a InsertRuns) CanBeAppliedBefore(b DbOperation) bool
type InstructionConverter ¶
type InstructionConverter struct {
// contains filtered or unexported fields
}
type JobQueuedStateUpdate ¶ added in v0.3.63
type JobQueuedStateUpdate struct {
Queued bool
QueuedStateVersion int32
}
type JobRunDetails ¶ added in v0.3.68
type JobRunDetails struct {
// contains filtered or unexported fields
}
type JobRunFailed ¶ added in v0.3.47
type JobRunFailed struct {
LeaseReturned bool
RunAttempted bool
}
type JobSchedulingInfoUpdate ¶ added in v0.3.63
type JobSchedulingInfoUpdate struct {
JobSchedulingInfo []byte
JobSchedulingInfoVersion int32
}
type JobSetCancelAction ¶ added in v0.3.68
type JobSetCancelAction struct {
// contains filtered or unexported fields
}
type JobSetKey ¶ added in v0.3.68
type JobSetKey struct {
// contains filtered or unexported fields
}
type JobSetOperation ¶
type JobSetOperation interface {
AffectsJobSet(queue string, jobSet string) bool
}
type MarkJobSetsCancelRequested ¶ added in v0.3.47
type MarkJobSetsCancelRequested map[JobSetKey]*JobSetCancelAction
func (MarkJobSetsCancelRequested) AffectsJobSet ¶ added in v0.3.47
func (a MarkJobSetsCancelRequested) AffectsJobSet(queue string, jobSet string) bool
func (MarkJobSetsCancelRequested) CanBeAppliedBefore ¶ added in v0.3.47
func (a MarkJobSetsCancelRequested) CanBeAppliedBefore(b DbOperation) bool
type MarkJobsCancelRequested ¶ added in v0.3.47
type MarkJobsCancelRequested map[string]bool
func (MarkJobsCancelRequested) CanBeAppliedBefore ¶ added in v0.3.47
func (a MarkJobsCancelRequested) CanBeAppliedBefore(b DbOperation) bool
type MarkJobsCancelled ¶
type MarkJobsCancelled map[string]bool
func (MarkJobsCancelled) CanBeAppliedBefore ¶
func (a MarkJobsCancelled) CanBeAppliedBefore(b DbOperation) bool
type MarkJobsFailed ¶
type MarkJobsFailed map[string]bool
func (MarkJobsFailed) CanBeAppliedBefore ¶
func (a MarkJobsFailed) CanBeAppliedBefore(b DbOperation) bool
type MarkJobsSucceeded ¶
type MarkJobsSucceeded map[string]bool
func (MarkJobsSucceeded) CanBeAppliedBefore ¶
func (a MarkJobsSucceeded) CanBeAppliedBefore(b DbOperation) bool
type MarkRunsFailed ¶
type MarkRunsFailed map[uuid.UUID]*JobRunFailed
func (MarkRunsFailed) CanBeAppliedBefore ¶
func (a MarkRunsFailed) CanBeAppliedBefore(b DbOperation) bool
type MarkRunsRunning ¶
type MarkRunsRunning map[uuid.UUID]bool
func (MarkRunsRunning) CanBeAppliedBefore ¶
func (a MarkRunsRunning) CanBeAppliedBefore(b DbOperation) bool
type MarkRunsSucceeded ¶
type MarkRunsSucceeded map[uuid.UUID]bool
func (MarkRunsSucceeded) CanBeAppliedBefore ¶
func (a MarkRunsSucceeded) CanBeAppliedBefore(b DbOperation) bool
type SchedulerDb ¶
type SchedulerDb struct {
// contains filtered or unexported fields
}
SchedulerDb writes DbOperations into postgres.
func (*SchedulerDb) Store ¶
func (s *SchedulerDb) Store(ctx context.Context, instructions *DbOperationsWithMessageIds) error
Store persists all operations in the database. This function retires until it either succeeds or encounters a terminal error. This function locks the postgres table to avoid write conflicts; see acquireLock() for details.
type UpdateJobPriorities ¶
type UpdateJobPriorities map[string]int64
func (UpdateJobPriorities) CanBeAppliedBefore ¶
func (a UpdateJobPriorities) CanBeAppliedBefore(b DbOperation) bool
type UpdateJobQueuedState ¶ added in v0.3.63
type UpdateJobQueuedState map[string]*JobQueuedStateUpdate
func (UpdateJobQueuedState) CanBeAppliedBefore ¶ added in v0.3.63
func (a UpdateJobQueuedState) CanBeAppliedBefore(b DbOperation) bool
type UpdateJobSchedulingInfo ¶ added in v0.3.63
type UpdateJobSchedulingInfo map[string]*JobSchedulingInfoUpdate
func (UpdateJobSchedulingInfo) CanBeAppliedBefore ¶ added in v0.3.63
func (a UpdateJobSchedulingInfo) CanBeAppliedBefore(b DbOperation) bool
type UpdateJobSetPriorities ¶
type UpdateJobSetPriorities map[JobSetKey]int64
func (UpdateJobSetPriorities) AffectsJobSet ¶
func (a UpdateJobSetPriorities) AffectsJobSet(queue string, jobSet string) bool
func (UpdateJobSetPriorities) CanBeAppliedBefore ¶
func (a UpdateJobSetPriorities) CanBeAppliedBefore(b DbOperation) bool