Documentation
¶
Index ¶
- Constants
- Variables
- func HLCDecode(encodedHLC uint64) (uint64, uint32)
- func HLCNow() uint64
- func IsConcurrent(hlc1, hlc2 uint64) bool
- type DBEngine
- type Engine
- func (e *Engine) BatchDelete(keys [][]byte) error
- func (e *Engine) BatchDeleteRows(rowKeys [][]byte) error
- func (e *Engine) BatchPut(key, value [][]byte) error
- func (e *Engine) BtreeSnapshot(w io.Writer) (int64, error)
- func (e *Engine) Close(ctx context.Context) error
- func (e *Engine) CurrentOffset() *Offset
- func (e *Engine) Delete(key []byte) error
- func (e *Engine) DeleteColumnsForRow(rowKey []byte, columnEntries map[string][]byte) error
- func (e *Engine) DeleteColumnsForRows(rowKeys [][]byte, columnEntries []map[string][]byte) error
- func (e *Engine) DeleteRow(rowKey []byte) error
- func (e *Engine) Get(key []byte) ([]byte, error)
- func (e *Engine) GetRowColumns(rowKey string, predicate func(columnKey string) bool) (map[string][]byte, error)
- func (e *Engine) GetWalCheckPoint() (*internal.Metadata, error)
- func (e *Engine) Namespace() string
- func (e *Engine) NewReader() (*Reader, error)
- func (e *Engine) NewReaderWithStart(startPos *Offset) (*Reader, error)
- func (e *Engine) NewTxn(txnType logrecord.LogOperationType, valueType logrecord.LogEntryType) (*Txn, error)
- func (e *Engine) OpsFlushedCount() uint64
- func (e *Engine) OpsReceivedCount() uint64
- func (e *Engine) Put(key, value []byte) error
- func (e *Engine) PutColumnsForRow(rowKey []byte, columnEntries map[string][]byte) error
- func (e *Engine) PutColumnsForRows(rowKeys [][]byte, columnEntriesPerRow []map[string][]byte) error
- func (e *Engine) RecoveredWALCount() int
- func (e *Engine) WaitForAppend(ctx context.Context, timeout time.Duration, lastSeen *Offset) error
- type EngineConfig
- type Offset
- type Reader
- type ReplicaWALHandler
- type Txn
- func (t *Txn) AppendColumnTxn(rowKey []byte, columnEntries map[string][]byte) error
- func (t *Txn) AppendKVTxn(key []byte, value []byte) error
- func (t *Txn) Checksum() uint32
- func (t *Txn) ChunkedValueChecksum() uint32
- func (t *Txn) Commit() error
- func (t *Txn) CommitOffset() *Offset
- func (t *Txn) TxnID() []byte
Constants ¶
const (
// CustomEpochMs is a Sentinel Value for Jan 1, 2025 @ 00:00:00 UTC.
CustomEpochMs = 1735689600000
)
Variables ¶
var ( // ErrKeyNotFound is a sentinel error for missing keys. ErrKeyNotFound = kvdrivers.ErrKeyNotFound ErrBucketNotFound = kvdrivers.ErrBucketNotFound ErrRecordCorrupted = kvdrivers.ErrRecordCorrupted ErrUseGetColumnAPI = kvdrivers.ErrUseGetColumnAPI ErrInCloseProcess = errors.New("in-Close process") ErrDatabaseDirInUse = errors.New("pid.lock is held by another process") ErrInternalError = errors.New("internal error") ErrMisMatchKeyType = errors.New("mismatch key type with existing value") )
var ( ErrInvalidLSN = errors.New("invalid LSN") ErrInvalidOffset = errors.New("appendLog: offset does not match record") )
var ( ErrTxnAlreadyCommitted = errors.New("txn already committed") ErrKeyChangedForChunkedType = errors.New("chunked txn type cannot change key from first value") ErrUnsupportedTxnType = errors.New("unsupported txn type") ErrEmptyColumns = errors.New("empty column set") )
var ( // ErrWaitTimeoutExceeded is a sentinel error to denotes sync.cond expired due to timeout. ErrWaitTimeoutExceeded = errors.New("wait timeout exceeded") )
Functions ¶
func HLCNow ¶
func HLCNow() uint64
HLCNow returns an encoded hybrid logical clock. 41 bits = ms timestamp since custom epoch and 23 bits = logical counter.
func IsConcurrent ¶
IsConcurrent return true if the two hybrid logical clock have happened at the same time but are distinct events within a millisecond interval.
Types ¶
type DBEngine ¶
type DBEngine string
DBEngine : which bTreeStore engine to use for the underlying persistence storage.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine manages WAL, MemTable (SkipList), and BtreeStore for a given namespace.
func NewStorageEngine ¶
func NewStorageEngine(dataDir, namespace string, conf *EngineConfig) (*Engine, error)
NewStorageEngine initializes WAL, MemTable, and BtreeStore and returns an initialized Engine for a namespace.
func (*Engine) BatchDelete ¶
BatchDelete removes all the key and its value pair.
func (*Engine) BatchDeleteRows ¶
func (*Engine) BtreeSnapshot ¶
BtreeSnapshot returns the snapshot of the current btree store.
func (*Engine) CurrentOffset ¶
CurrentOffset returns the current offset that it has seen.
func (*Engine) DeleteColumnsForRow ¶
DeleteColumnsForRow removes the specified columns from the given row key.
func (*Engine) DeleteColumnsForRows ¶
DeleteColumnsForRows removes specified columns from multiple rows.
func (*Engine) GetRowColumns ¶
func (e *Engine) GetRowColumns(rowKey string, predicate func(columnKey string) bool) (map[string][]byte, error)
GetRowColumns returns all the column value associated with the row. It's filters columns if predicate function is provided and only returns those columns for which predicate return true.
func (*Engine) GetWalCheckPoint ¶
GetWalCheckPoint returns the last checkpoint metadata saved in the database.
func (*Engine) NewReader ¶
NewReader return a reader that reads from the beginning, until EOF is encountered. It returns io.EOF when it reaches end of file.
func (*Engine) NewReaderWithStart ¶
NewReaderWithStart return a reader that reads from provided offset, until EOF is encountered. It returns io.EOF when it reaches end of file.
func (*Engine) NewTxn ¶
func (e *Engine) NewTxn(txnType logrecord.LogOperationType, valueType logrecord.LogEntryType) (*Txn, error)
NewTxn returns a new initialized batch Txn.
func (*Engine) OpsFlushedCount ¶
OpsFlushedCount returns the total number of Put and Delete operations flushed to BtreeStore.
func (*Engine) OpsReceivedCount ¶
OpsReceivedCount returns the total number of Put and Delete operations received.
func (*Engine) PutColumnsForRow ¶
PutColumnsForRow inserts or updates the provided column entries.
It's an upsert operation: - existing column value will get updated to newer value, else a new column entry will be created for the given row.
func (*Engine) PutColumnsForRows ¶
func (*Engine) RecoveredWALCount ¶
RecoveredWALCount returns the number of WAL entries successfully recovered.
type EngineConfig ¶
type EngineConfig struct { ArenaSize int64 `toml:"arena_size"` WalConfig wal.Config `toml:"wal_config"` BtreeConfig kvdrivers.Config `toml:"btree_config"` DBEngine DBEngine `toml:"db_engine"` }
EngineConfig embeds all the config needed for Engine.
func NewDefaultEngineConfig ¶
func NewDefaultEngineConfig() *EngineConfig
NewDefaultEngineConfig returns an initialized default config for engine.
type Offset ¶
Offset represents the offset in the wal.
func DecodeOffset ¶
DecodeOffset decodes the offset position from a byte slice.
type ReplicaWALHandler ¶
type ReplicaWALHandler struct {
// contains filtered or unexported fields
}
ReplicaWALHandler processes and applies incoming WAL records during replication.
func NewReplicaWALHandler ¶
func NewReplicaWALHandler(engine *Engine) *ReplicaWALHandler
func (*ReplicaWALHandler) ApplyRecord ¶
func (wh *ReplicaWALHandler) ApplyRecord(encodedWal []byte, receivedOffset []byte) error
ApplyRecord validates and applies a WAL record to the mem table which later get flushed to Btree Store.
type Txn ¶
type Txn struct {
// contains filtered or unexported fields
}
Txn ensures atomicity at WAL. Writes/Deletes/Chunks wouldn't be visible that are part of the batch until commited.
func (*Txn) AppendColumnTxn ¶
AppendColumnTxn appends the Columns update type Txn to wal for the provided rowKey. Update/Delete Ops for column is decided by the Log Operation type. Single Txn Cannot contain both update and delete ops. Caller can set the Columns Key to empty value, if deleted needs to be part of same Txn.
func (*Txn) AppendKVTxn ¶
AppendKVTxn append a key, value to the WAL as part of a Txn.