Documentation
¶
Index ¶
- Variables
- func UpdateJobExpectedStatus(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, ...) error
- type Job
- func (j *Job) CancelJob(ctx context.Context) (err error)
- func (j *Job) DropJob(ctx context.Context) error
- func (j *Job) FailJob(ctx context.Context, result string) error
- func (j *Job) FinishJob(ctx context.Context, result string) error
- func (j *Job) GetJobInfo(ctx context.Context) (*JobInfo, error)
- func (j *Job) GetJobStatus(ctx context.Context) (JobStatus, string, error)
- func (j *Job) OnComplete(inErr error, msg string)
- func (j *Job) ProgressUpdateRoutineFn(ctx context.Context, finishCh chan struct{}, errCh <-chan struct{}, ...) error
- func (j *Job) StartJob(ctx context.Context) error
- func (j *Job) UpdateJobProgress(ctx context.Context, progress string) (bool, error)
- type JobExpectedStatus
- type JobInfo
- type JobStatus
- type LogicalImportProgress
- type PhysicalImportProgress
- type Progress
Constants ¶
This section is empty.
Variables ¶
var ( // TestSyncCh is used in unit test to synchronize the execution of LOAD DATA. TestSyncCh = make(chan struct{}) // TestLastLoadDataJobID last created job id, used in unit test. TestLastLoadDataJobID atomic.Int64 )
vars used for test.
var ( // HeartBeatInSec is the interval of heartbeat. HeartBeatInSec = 5 // OfflineThresholdInSec means after failing to update heartbeat for 3 times, // we treat the worker of the job as offline. OfflineThresholdInSec = HeartBeatInSec * 3 )
Functions ¶
func UpdateJobExpectedStatus ¶
func UpdateJobExpectedStatus( ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, status JobExpectedStatus, ) error
UpdateJobExpectedStatus updates the expected status of a load data job. TODO: remove it?
Types ¶
type Job ¶
type Job struct { ID int64 // Job don't manage the life cycle of the connection. Conn sqlexec.SQLExecutor User string }
Job import job.
func CreateLoadDataJob ¶
func CreateLoadDataJob( ctx context.Context, conn sqlexec.SQLExecutor, dataSource, db, table string, importMode string, user string, ) (*Job, error)
CreateLoadDataJob creates a load data job by insert a record to system table. The AUTO_INCREMENT value will be returned as jobID.
func NewJob ¶
func NewJob(ID int64, conn sqlexec.SQLExecutor, user string) *Job
NewJob returns new Job.
func (*Job) CancelJob ¶
CancelJob cancels a load data job. Only a running/paused job can be canceled.
func (*Job) GetJobInfo ¶
GetJobInfo gets all needed information of a load data job.
func (*Job) GetJobStatus ¶
GetJobStatus gets the status of a load data job. The returned error means something wrong when querying the database. Other business logic errors are returned as JobFailed with message.
func (*Job) OnComplete ¶
OnComplete is called when a job is finished or failed.
func (*Job) ProgressUpdateRoutineFn ¶
func (j *Job) ProgressUpdateRoutineFn(ctx context.Context, finishCh chan struct{}, errCh <-chan struct{}, progress *Progress) error
ProgressUpdateRoutineFn job progress update routine.
func (*Job) StartJob ¶
StartJob tries to start a not-yet-started job with jobID. It will not return error when there's no matched job.
func (*Job) UpdateJobProgress ¶
UpdateJobProgress updates the progress of a load data job. It should be called periodically as heartbeat after StartJob. The returned bool indicates whether the keepalive is succeeded. If not, the caller should call FailJob soon. TODO: Currently if the node is crashed after CreateLoadDataJob and before StartJob, it will always be in the status of pending. Maybe we should unify CreateLoadDataJob and StartJob.
type JobExpectedStatus ¶
type JobExpectedStatus int
JobExpectedStatus is the expected status of a load data job. User can set the expected status of a job and worker will respect it.
const ( // JobExpectedRunning means the job is expected to be running. JobExpectedRunning JobExpectedStatus = iota // JobExpectedPaused means the job is expected to be paused. JobExpectedPaused // JobExpectedCanceled means the job is expected to be canceled. JobExpectedCanceled )
type JobInfo ¶
type JobInfo struct { JobID int64 User string DataSource string TableSchema string TableName string ImportMode string Progress string Status JobStatus StatusMessage string CreateTime types.Time StartTime types.Time EndTime types.Time }
JobInfo is the information of a load data job.
func GetAllJobInfo ¶
func GetAllJobInfo( ctx context.Context, conn sqlexec.SQLExecutor, user string, ) ([]*JobInfo, error)
GetAllJobInfo gets all jobs status of a user.
type JobStatus ¶
type JobStatus int
JobStatus represents the status of a load data job.
const ( // JobFailed means the job is failed and can't be resumed. JobFailed JobStatus = iota // JobCanceled means the job is canceled by user and can't be resumed. It // will finally convert to JobFailed with a message indicating the reason // is canceled. JobCanceled // JobPaused means the job is paused by user and can be resumed. JobPaused // JobFinished means the job is finished. JobFinished // JobPending means the job is pending to be started. JobPending // JobRunning means the job is running. JobRunning )
type LogicalImportProgress ¶
type LogicalImportProgress struct { // LoadedFileSize is the size of the data that's loaded in bytes. It's // larger than the actual loaded data size, but due to the fact that reading // is once-a-block and a block may generate multiple tasks that are // concurrently executed, we can't know the actual loaded data size easily. LoadedFileSize atomic.Int64 }
LogicalImportProgress is the progress info of the logical import mode.
type PhysicalImportProgress ¶
type PhysicalImportProgress struct { // ReadRowCnt is the number of rows read from data files. // Lines ignored by IGNORE N LINES clause is not included. ReadRowCnt atomic.Uint64 // EncodeFileSize is the size of the file that has finished KV encoding in bytes. // it should equal to SourceFileSize eventually. EncodeFileSize atomic.Int64 }
PhysicalImportProgress is the progress info of the physical import mode.
type Progress ¶
type Progress struct { // SourceFileSize is the size of the source file in bytes. When we can't get // the size of the source file, it will be set to -1. // Currently, the value is read by seek(0, end), when LOAD DATA LOCAL we wrap // SimpleSeekerOnReadCloser on MySQL client connection which doesn't support // it. SourceFileSize int64 *LogicalImportProgress `json:",inline"` *PhysicalImportProgress `json:",inline"` // LoadedRowCnt is the number of rows that has been loaded. // for physical mode, it's the number of rows that has been imported into TiKV. // in SHOW LOAD JOB we call it Imported_Rows, to make it compatible with 7.0, // the variable name is not changed. LoadedRowCnt atomic.Uint64 }
Progress is the progress of the LOAD DATA task.
func NewProgress ¶
NewProgress creates a new Progress. todo: better pass import mode, but it causes import cycle.
func ProgressFromJSON ¶
ProgressFromJSON creates Progress from a JSON string.