Documentation
¶
Index ¶
- Constants
- Variables
- func ConfigSortPath() string
- func DecodeBackendTag(name string) (int64, error)
- func EncodeBackendTag(jobID int64) string
- func InitGlobalLightningEnv()
- func InitInstanceAddr() string
- func NewMemRootImpl(maxQuota int64, bcCtxMgr *litBackendCtxMgr) *memRootImpl
- func RiskOfDiskFull(available, capacity uint64) bool
- type BackendCtx
- type BackendCtxMgr
- type CheckpointManager
- func (s *CheckpointManager) Close()
- func (s *CheckpointManager) IsComplete(end kv.Key) bool
- func (s *CheckpointManager) Register(taskID int, end kv.Key)
- func (s *CheckpointManager) Reset(newPhysicalID int64, start, end kv.Key)
- func (s *CheckpointManager) Status() (int, kv.Key)
- func (s *CheckpointManager) Sync()
- func (s *CheckpointManager) UpdateCurrent(taskID int, added int) error
- func (s *CheckpointManager) UpdateTotal(taskID int, added int, last bool)
- type Config
- type DiskRoot
- type Engine
- type FlushController
- type FlushMode
- type JobReorgMeta
- type MemRoot
- type MockBackendCtx
- func (m *MockBackendCtx) AttachCheckpointManager(mgr *CheckpointManager)
- func (*MockBackendCtx) CollectRemoteDuplicateRows(indexID int64, _ table.Table) error
- func (*MockBackendCtx) Done() bool
- func (*MockBackendCtx) FinishImport(indexID int64, _ bool, _ table.Table) error
- func (*MockBackendCtx) Flush(_ int64, _ FlushMode) (flushed bool, imported bool, err error)
- func (m *MockBackendCtx) GetCheckpointManager() *CheckpointManager
- func (m *MockBackendCtx) Register(jobID, indexID int64, _, _ string) (Engine, error)
- func (*MockBackendCtx) ResetWorkers(_, _ int64)
- func (*MockBackendCtx) SetDone()
- func (*MockBackendCtx) Unregister(jobID, indexID int64)
- type MockBackendCtxMgr
- type MockEngineInfo
- type MockWriter
- type ReorgCheckpoint
- type TaskCheckpoint
- type Writer
Constants ¶
const ( JobCheckpointVersionCurrent = JobCheckpointVersion1 JobCheckpointVersion1 = 1 )
JobCheckpointVersionCurrent is the current version of the checkpoint.
const ( LitErrAllocMemFail string = "[ddl-ingest] allocate memory failed" LitErrCreateDirFail string = "[ddl-ingest] create ingest sort path error" LitErrStatDirFail string = "[ddl-ingest] stat ingest sort path error" LitErrDeleteDirFail string = "[ddl-ingest] delete ingest sort path error" LitErrCreateBackendFail string = "[ddl-ingest] build ingest backend failed" LitErrGetBackendFail string = "[ddl-ingest] cannot get ingest backend" LitErrCreateEngineFail string = "[ddl-ingest] build ingest engine failed" LitErrCreateContextFail string = "[ddl-ingest] build ingest writer context failed" LitErrGetEngineFail string = "[ddl-ingest] can not get ingest engine info" LitErrGetStorageQuota string = "[ddl-ingest] get storage quota error" LitErrCloseEngineErr string = "[ddl-ingest] close engine error" LitErrCleanEngineErr string = "[ddl-ingest] clean engine error" LitErrFlushEngineErr string = "[ddl-ingest] flush engine data err" LitErrIngestDataErr string = "[ddl-ingest] ingest data into storage error" LitErrRemoteDupExistErr string = "[ddl-ingest] remote duplicate index key exist" LitErrExceedConcurrency string = "[ddl-ingest] the concurrency is greater than ingest limit" LitErrUpdateDiskStats string = "[ddl-ingest] update disk usage error" LitWarnEnvInitFail string = "[ddl-ingest] initialize environment failed" LitWarnConfigError string = "[ddl-ingest] build config for backend failed" LitInfoEnvInitSucc string = "[ddl-ingest] init global ingest backend environment finished" LitInfoSortDir string = "[ddl-ingest] the ingest sorted directory" LitInfoCreateBackend string = "[ddl-ingest] create one backend for an DDL job" LitInfoCloseBackend string = "[ddl-ingest] close one backend for DDL job" LitInfoOpenEngine string = "[ddl-ingest] open an engine for index reorg task" LitInfoAddWriter string = "[ddl-ingest] reuse engine and add a writer for index reorg task" LitInfoCreateWrite string = "[ddl-ingest] create one local writer for index reorg task" LitInfoCloseEngine string = "[ddl-ingest] flush all writer and get closed engine" LitInfoRemoteDupCheck string = "[ddl-ingest] start remote duplicate checking" LitInfoStartImport string = "[ddl-ingest] start to import data" LitInfoChgMemSetting string = "[ddl-ingest] change memory setting for ingest" LitInfoInitMemSetting string = "[ddl-ingest] initial memory setting for ingest" LitInfoUnsafeImport string = "[ddl-ingest] do a partial import data into the storage" LitErrCloseWriterErr string = "[ddl-ingest] close writer error" )
Message const text
Variables ¶
var ( // LitBackCtxMgr is the entry for the lightning backfill process. LitBackCtxMgr BackendCtxMgr // LitMemRoot is used to track the memory usage of the lightning backfill process. LitMemRoot MemRoot // LitDiskRoot is used to track the disk usage of the lightning backfill process. LitDiskRoot DiskRoot // LitRLimit is the max open file number of the lightning backfill process. LitRLimit uint64 // LitSortPath is the sort path for the lightning backfill process. LitSortPath string // LitInitialized is the flag indicates whether the lightning backfill process is initialized. LitInitialized bool )
var ( // StructSizeBackendCtx is the size of litBackendCtx. StructSizeBackendCtx int64 // StructSizeEngineInfo is the size of engineInfo. StructSizeEngineInfo int64 // StructSizeWriterCtx is the size of writerContext. StructSizeWriterCtx int64 )
var GenLightningDataDirForTest = genLightningDataDir
GenLightningDataDirForTest is only used for test.
var ImporterRangeConcurrencyForTest *atomic.Int32
ImporterRangeConcurrencyForTest is only used for test.
Functions ¶
func ConfigSortPath ¶
func ConfigSortPath() string
ConfigSortPath returns the sort path for lightning.
func DecodeBackendTag ¶
DecodeBackendTag decodes the backend tag to job ID.
func EncodeBackendTag ¶
EncodeBackendTag encodes the job ID to backend tag. The backend tag is also used as the file name of the local index data files.
func InitGlobalLightningEnv ¶
func InitGlobalLightningEnv()
InitGlobalLightningEnv initialize Lightning backfill environment.
func InitInstanceAddr ¶
func InitInstanceAddr() string
InitInstanceAddr returns the string concat with instance address and temp-dir.
func NewMemRootImpl ¶
func NewMemRootImpl(maxQuota int64, bcCtxMgr *litBackendCtxMgr) *memRootImpl
NewMemRootImpl creates a new memRootImpl.
func RiskOfDiskFull ¶
RiskOfDiskFull checks if the disk has less than 10% space.
Types ¶
type BackendCtx ¶
type BackendCtx interface { Register(jobID, indexID int64, schemaName, tableName string) (Engine, error) Unregister(jobID, indexID int64) CollectRemoteDuplicateRows(indexID int64, tbl table.Table) error FinishImport(indexID int64, unique bool, tbl table.Table) error ResetWorkers(jobID, indexID int64) Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error) Done() bool SetDone() AttachCheckpointManager(*CheckpointManager) GetCheckpointManager() *CheckpointManager }
BackendCtx is the backend context for add index reorg task.
type BackendCtxMgr ¶
type BackendCtxMgr interface { CheckAvailable() (bool, error) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error) Unregister(jobID int64) Load(jobID int64) (BackendCtx, bool) }
BackendCtxMgr is used to manage the backend context.
type CheckpointManager ¶
type CheckpointManager struct {
// contains filtered or unexported fields
}
CheckpointManager is a checkpoint manager implementation that used by non-distributed reorganization.
func NewCheckpointManager ¶
func NewCheckpointManager(ctx context.Context, flushCtrl FlushController, sessPool *sess.Pool, jobID, indexID int64) (*CheckpointManager, error)
NewCheckpointManager creates a new checkpoint manager.
func (*CheckpointManager) Close ¶
func (s *CheckpointManager) Close()
Close closes the checkpoint manager.
func (*CheckpointManager) IsComplete ¶
func (s *CheckpointManager) IsComplete(end kv.Key) bool
IsComplete checks if the task is complete. This is called before the reader reads the data and decides whether to skip the current task.
func (*CheckpointManager) Register ¶
func (s *CheckpointManager) Register(taskID int, end kv.Key)
Register registers a new task.
func (*CheckpointManager) Reset ¶
func (s *CheckpointManager) Reset(newPhysicalID int64, start, end kv.Key)
Reset resets the checkpoint manager between two partitions.
func (*CheckpointManager) Status ¶
func (s *CheckpointManager) Status() (int, kv.Key)
Status returns the status of the checkpoint.
func (*CheckpointManager) UpdateCurrent ¶
func (s *CheckpointManager) UpdateCurrent(taskID int, added int) error
UpdateCurrent updates the current keys of the task. This is called by the writer after writing the local engine to update the current number of rows written.
func (*CheckpointManager) UpdateTotal ¶
func (s *CheckpointManager) UpdateTotal(taskID int, added int, last bool)
UpdateTotal updates the total keys of the task. This is called by the reader after reading the data to update the number of rows contained in the current chunk.
type DiskRoot ¶
type DiskRoot interface { UpdateUsage() ShouldImport() bool UsageInfo() string PreCheckUsage() error StartupCheck() error }
DiskRoot is used to track the disk usage for the lightning backfill process.
func NewDiskRootImpl ¶
NewDiskRootImpl creates a new DiskRoot.
type Engine ¶
type Engine interface { Flush() error ImportAndClean() error Clean() CreateWriter(id int, unique bool) (Writer, error) }
Engine is the interface for the engine that can be used to write key-value pairs.
type FlushController ¶
type FlushController interface {
Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error)
}
FlushController is an interface to control the flush of the checkpoint.
type FlushMode ¶
type FlushMode byte
FlushMode is used to control how to flush.
const ( // FlushModeAuto means flush when the memory table size reaches the threshold. FlushModeAuto FlushMode = iota // FlushModeForceLocal means flush all data to local storage. FlushModeForceLocal // FlushModeForceLocalAndCheckDiskQuota means flush all data to local storage and check disk quota. FlushModeForceLocalAndCheckDiskQuota // FlushModeForceGlobal means import all data in local storage to global storage. FlushModeForceGlobal )
type JobReorgMeta ¶
type JobReorgMeta struct {
Checkpoint *ReorgCheckpoint `json:"reorg_checkpoint"`
}
JobReorgMeta is the metadata for a reorg job.
type MemRoot ¶
type MemRoot interface { Consume(size int64) Release(size int64) CheckConsume(size int64) bool ConsumeWithTag(tag string, size int64) ReleaseWithTag(tag string) SetMaxMemoryQuota(quota int64) MaxMemoryQuota() int64 CurrentUsage() int64 CurrentUsageWithTag(tag string) int64 RefreshConsumption() }
MemRoot is used to track the memory usage for the lightning backfill process.
type MockBackendCtx ¶
type MockBackendCtx struct {
// contains filtered or unexported fields
}
MockBackendCtx is a mock backend context.
func (*MockBackendCtx) AttachCheckpointManager ¶
func (m *MockBackendCtx) AttachCheckpointManager(mgr *CheckpointManager)
AttachCheckpointManager attaches a checkpoint manager to the backend context.
func (*MockBackendCtx) CollectRemoteDuplicateRows ¶
func (*MockBackendCtx) CollectRemoteDuplicateRows(indexID int64, _ table.Table) error
CollectRemoteDuplicateRows implements BackendCtx.CollectRemoteDuplicateRows interface.
func (*MockBackendCtx) Done ¶
func (*MockBackendCtx) Done() bool
Done implements BackendCtx.Done interface.
func (*MockBackendCtx) FinishImport ¶
FinishImport implements BackendCtx.FinishImport interface.
func (*MockBackendCtx) GetCheckpointManager ¶
func (m *MockBackendCtx) GetCheckpointManager() *CheckpointManager
GetCheckpointManager returns the checkpoint manager attached to the backend context.
func (*MockBackendCtx) Register ¶
func (m *MockBackendCtx) Register(jobID, indexID int64, _, _ string) (Engine, error)
Register implements BackendCtx.Register interface.
func (*MockBackendCtx) ResetWorkers ¶
func (*MockBackendCtx) ResetWorkers(_, _ int64)
ResetWorkers implements BackendCtx.ResetWorkers interface.
func (*MockBackendCtx) SetDone ¶
func (*MockBackendCtx) SetDone()
SetDone implements BackendCtx.SetDone interface.
func (*MockBackendCtx) Unregister ¶
func (*MockBackendCtx) Unregister(jobID, indexID int64)
Unregister implements BackendCtx.Unregister interface.
type MockBackendCtxMgr ¶
type MockBackendCtxMgr struct {
// contains filtered or unexported fields
}
MockBackendCtxMgr is a mock backend context manager.
func NewMockBackendCtxMgr ¶
func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBackendCtxMgr
NewMockBackendCtxMgr creates a new mock backend context manager.
func (*MockBackendCtxMgr) CheckAvailable ¶
func (*MockBackendCtxMgr) CheckAvailable() (bool, error)
CheckAvailable implements BackendCtxMgr.Available interface.
func (*MockBackendCtxMgr) Load ¶
func (m *MockBackendCtxMgr) Load(jobID int64) (BackendCtx, bool)
Load implements BackendCtxMgr.Load interface.
func (*MockBackendCtxMgr) Register ¶
func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client) (BackendCtx, error)
Register implements BackendCtxMgr.Register interface.
func (*MockBackendCtxMgr) Unregister ¶
func (m *MockBackendCtxMgr) Unregister(jobID int64)
Unregister implements BackendCtxMgr.Unregister interface.
type MockEngineInfo ¶
type MockEngineInfo struct {
// contains filtered or unexported fields
}
MockEngineInfo is a mock engine info.
func (*MockEngineInfo) Clean ¶
func (*MockEngineInfo) Clean()
Clean implements Engine.Clean interface.
func (*MockEngineInfo) CreateWriter ¶
func (m *MockEngineInfo) CreateWriter(id int, _ bool) (Writer, error)
CreateWriter implements Engine.CreateWriter interface.
func (*MockEngineInfo) Flush ¶
func (*MockEngineInfo) Flush() error
Flush implements Engine.Flush interface.
func (*MockEngineInfo) ImportAndClean ¶
func (*MockEngineInfo) ImportAndClean() error
ImportAndClean implements Engine.ImportAndClean interface.
type MockWriter ¶
type MockWriter struct {
// contains filtered or unexported fields
}
MockWriter is a mock writer.
func (*MockWriter) LockForWrite ¶
func (*MockWriter) LockForWrite() func()
LockForWrite implements Writer.LockForWrite interface.
type ReorgCheckpoint ¶
type ReorgCheckpoint struct { LocalSyncKey kv.Key `json:"local_sync_key"` LocalKeyCount int `json:"local_key_count"` GlobalSyncKey kv.Key `json:"global_sync_key"` GlobalKeyCount int `json:"global_key_count"` InstanceAddr string `json:"instance_addr"` PhysicalID int64 `json:"physical_id"` StartKey kv.Key `json:"start_key"` EndKey kv.Key `json:"end_key"` Version int64 `json:"version"` }
ReorgCheckpoint is the checkpoint for a reorg job.
type TaskCheckpoint ¶
type TaskCheckpoint struct {
// contains filtered or unexported fields
}
TaskCheckpoint is the checkpoint for a single task.