README
¶
Chunk format
| | |
| MagicNumber(4b) | version(1b) |
| | |
--------------------------------------------------
| block-1 bytes | checksum (4b) |
--------------------------------------------------
| block-2 bytes | checksum (4b) |
--------------------------------------------------
| block-n bytes | checksum (4b) |
--------------------------------------------------
| #blocks (uvarint) |
--------------------------------------------------
| #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) | uncompressedSize (uvarint) |
------------------------------------------------------------------------------------------------
| #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) | uncompressedSize (uvarint) |
------------------------------------------------------------------------------------------------
| #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) | uncompressedSize (uvarint) |
------------------------------------------------------------------------------------------------
| #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) | uncompressedSize (uvarint) |
------------------------------------------------------------------------------------------------
| checksum(from #blocks) |
-------------------------------------------------------------------
| metasOffset - offset to the point with #blocks |
--------------------------------------------------
Documentation
¶
Index ¶
- Constants
- Variables
- func ErrTooFarBehind(entryTs, cutoff time.Time) error
- func IsErrTooFarBehind(err error) bool
- func IsOutOfOrderErr(err error) bool
- func NewFacade(c Chunk, blockSize, targetSize int) chunk.Data
- func UncompressedSize(c chunk.Data) (int, bool)
- type Block
- type Chunk
- type Facade
- func (f Facade) Bounds() (time.Time, time.Time)
- func (Facade) Encoding() chunk.Encoding
- func (f Facade) Entries() int
- func (f Facade) LokiChunk() Chunk
- func (f Facade) Marshal(w io.Writer) error
- func (f Facade) Rebound(start, end model.Time, filter filter.Func) (chunk.Data, error)
- func (f Facade) Size() int
- func (f Facade) UncompressedSize() int
- func (f *Facade) UnmarshalFromBuf(buf []byte) error
- func (f Facade) Utilization() float64
- type HeadBlock
- type HeadBlockFmt
- type MemChunk
- func (c *MemChunk) Append(entry *logproto.Entry) (bool, error)
- func (c *MemChunk) BlockCount() int
- func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block
- func (c *MemChunk) Bounds() (fromT, toT time.Time)
- func (c *MemChunk) Bytes() ([]byte, error)
- func (c *MemChunk) BytesSize() int
- func (c *MemChunk) BytesWith(b []byte) ([]byte, error)
- func (c *MemChunk) CheckpointSize() (chunk, head int)
- func (c *MemChunk) Close() error
- func (c *MemChunk) CompressedSize() int
- func (c *MemChunk) ConvertHead(desired HeadBlockFmt) error
- func (c *MemChunk) Encoding() compression.Codec
- func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, ...) (iter.EntryIterator, error)
- func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, error)
- func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, ...) iter.SampleIterator
- func (c *MemChunk) SerializeForCheckpointTo(chk, head io.Writer) error
- func (c *MemChunk) Size() int
- func (c *MemChunk) SpaceFor(e *logproto.Entry) bool
- func (c *MemChunk) UncompressedSize() int
- func (c *MemChunk) Utilization() float64
- func (c *MemChunk) WriteTo(w io.Writer) (int64, error)
Constants ¶
const (
ChunkFormatV1 byte
ChunkFormatV2
ChunkFormatV3
ChunkFormatV4
)
const GzipLogChunk = chunk.Encoding(128)
GzipLogChunk is a cortex encoding type for our chunks. Deprecated: the chunk encoding/compression format is inside the chunk data.
const LogChunk = chunk.Encoding(129)
LogChunk is a cortex encoding type for our chunks.
Variables ¶
var (
ErrChunkFull = errors.New("chunk full")
ErrOutOfOrder = errors.New("entry out of order")
ErrInvalidSize = errors.New("invalid size")
ErrInvalidFlag = errors.New("invalid flag")
ErrInvalidChecksum = errors.New("invalid chunk checksum")
)
Errors returned by the chunk interface.
var (
// BytesBufferPool is a bytes buffer used for lines decompressed.
// Buckets [0.5KB,1KB,2KB,4KB,8KB]
BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) })
// LabelsPool is a matrix of bytes buffers used to store label names and values.
// Buckets [8, 16, 32, 64, 128, 256].
// Since we store label names and values, the number of labels we can store is the half the bucket size.
// So we will be able to store from 0 to 128 labels.
LabelsPool = pool.New(1<<3, 1<<8, 2, func(size int) interface{} { return make([][]byte, 0, size) })
SymbolsPool = pool.New(1<<3, 1<<8, 2, func(size int) interface{} { return make([]symbol, 0, size) })
// SamplesPool pooling array of samples [512,1024,...,16k]
SamplesPool = pool.New(1<<9, 1<<14, 2, func(size int) interface{} { return make([]logproto.Sample, 0, size) })
// EncodeBufferPool is a pool used to binary encode.
EncodeBufferPool = sync.Pool{
New: func() interface{} {
return &encbuf{
b: make([]byte, 0, 256),
}
},
}
)
var HeadBlockFmts = []HeadBlockFmt{OrderedHeadBlockFmt, UnorderedHeadBlockFmt, UnorderedWithStructuredMetadataHeadBlockFmt}
Functions ¶
func ErrTooFarBehind ¶
func ErrTooFarBehind(entryTs, cutoff time.Time) error
func IsErrTooFarBehind ¶
func IsErrTooFarBehind(err error) bool
func IsOutOfOrderErr ¶
func IsOutOfOrderErr(err error) bool
func NewFacade ¶
func NewFacade(c Chunk, blockSize, targetSize int) chunk.Data
NewFacade makes a new Facade.
func UncompressedSize ¶
func UncompressedSize(c chunk.Data) (int, bool)
UncompressedSize is a helper function to hide the type assertion kludge when wanting the uncompressed size of the Cortex interface encoding.Chunk.
Types ¶
type Block ¶
type Block interface {
// MinTime is the minimum time of entries in the block
MinTime() int64
// MaxTime is the maximum time of entries in the block
MaxTime() int64
// Offset is the offset/position of the block in the chunk. Offset is unique for a given block per chunk.
Offset() int
// Entries is the amount of entries in the block.
Entries() int
// Iterator returns an entry iterator for the block.
Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator
// SampleIterator returns a sample iterator for the block.
SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator
}
Block is a chunk block.
type Chunk ¶
type Chunk interface {
Bounds() (time.Time, time.Time)
SpaceFor(*logproto.Entry) bool
// Append returns true if the entry appended was a duplicate
Append(*logproto.Entry) (bool, error)
Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error)
SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator
// Returns the list of blocks in the chunks.
Blocks(mintT, maxtT time.Time) []Block
// Size returns the number of entries in a chunk
Size() int
Bytes() ([]byte, error)
BytesWith([]byte) ([]byte, error) // uses provided []byte for buffer instantiation
io.WriterTo
BlockCount() int
Utilization() float64
UncompressedSize() int
CompressedSize() int
Close() error
Encoding() compression.Codec
Rebound(start, end time.Time, filter filter.Func) (Chunk, error)
}
Chunk is the interface for the compressed logs chunk format.
func NewDumbChunk ¶
func NewDumbChunk() Chunk
NewDumbChunk returns a new chunk that isn't very good.
type Facade ¶
type Facade struct {
chunk.Data
// contains filtered or unexported fields
}
Facade for compatibility with cortex chunk type, so we can use its chunk store.
func (Facade) Rebound ¶
func (f Facade) Rebound(start, end model.Time, filter filter.Func) (chunk.Data, error)
func (Facade) Size ¶
func (f Facade) Size() int
Size implements encoding.Chunk, which unfortunately uses the Size method to refer to the byte size and not the entry count like chunkenc.Chunk does.
func (Facade) UncompressedSize ¶
func (f Facade) UncompressedSize() int
func (*Facade) UnmarshalFromBuf ¶
func (f *Facade) UnmarshalFromBuf(buf []byte) error
UnmarshalFromBuf implements chunk.Chunk.
func (Facade) Utilization ¶
func (f Facade) Utilization() float64
Utilization implements encoding.Chunk.
type HeadBlock ¶
type HeadBlock interface {
IsEmpty() bool
CheckpointTo(w io.Writer) error
CheckpointBytes(b []byte) ([]byte, error)
CheckpointSize() int
LoadBytes(b []byte) error
Serialise(pool compression.WriterPool) ([]byte, error)
Reset()
Bounds() (mint, maxt int64)
Entries() int
UncompressedSize() int
Convert(HeadBlockFmt, *symbolizer) (HeadBlock, error)
Append(int64, string, labels.Labels) (bool, error)
Iterator(
ctx context.Context,
direction logproto.Direction,
mint,
maxt int64,
pipeline log.StreamPipeline,
) iter.EntryIterator
SampleIterator(
ctx context.Context,
mint,
maxt int64,
extractor log.StreamSampleExtractor,
) iter.SampleIterator
Format() HeadBlockFmt
}
func HeadFromCheckpoint ¶
func HeadFromCheckpoint(b []byte, desiredIfNotUnordered HeadBlockFmt, symbolizer *symbolizer) (HeadBlock, error)
HeadFromCheckpoint handles reading any head block format and returning the desired form. This is particularly helpful replaying WALs from different configurations such as after enabling unordered writes.
type HeadBlockFmt ¶
type HeadBlockFmt byte
const (
OrderedHeadBlockFmt HeadBlockFmt
UnorderedHeadBlockFmt
UnorderedWithStructuredMetadataHeadBlockFmt
)
func ChunkHeadFormatFor ¶
func ChunkHeadFormatFor(chunkfmt byte) HeadBlockFmt
ChunkHeadFormatFor returns corresponding head block format for the given `chunkfmt`.
type MemChunk ¶
type MemChunk struct {
// contains filtered or unexported fields
}
MemChunk implements compressed log chunks.
func MemchunkFromCheckpoint ¶
func MemchunkFromCheckpoint(chk, head []byte, desiredIfNotUnordered HeadBlockFmt, blockSize int, targetSize int) (*MemChunk, error)
func NewByteChunk ¶
func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error)
NewByteChunk returns a MemChunk on the passed bytes.
func NewMemChunk ¶
func NewMemChunk(chunkFormat byte, enc compression.Codec, head HeadBlockFmt, blockSize, targetSize int) *MemChunk
NewMemChunk returns a new in-mem chunk.
func (*MemChunk) Append ¶
func (c *MemChunk) Append(entry *logproto.Entry) (bool, error)
Append implements Chunk. The MemChunk may return true or false, depending on what the head block returns.
func (*MemChunk) Blocks ¶
func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block
Blocks implements Chunk
func (*MemChunk) Bounds ¶
func (c *MemChunk) Bounds() (fromT, toT time.Time)
Bounds implements Chunk.
func (*MemChunk) Bytes ¶
func (c *MemChunk) Bytes() ([]byte, error)
Bytes implements Chunk. NOTE: Does not cut head block or include any head block data.
func (*MemChunk) BytesSize ¶
func (c *MemChunk) BytesSize() int
BytesSize returns the raw size of the chunk. NOTE: This does not account for the head block nor include any head block data.
func (*MemChunk) BytesWith ¶
func (c *MemChunk) BytesWith(b []byte) ([]byte, error)
BytesWith uses a provided []byte for buffer instantiation NOTE: This does not cut the head block nor include any head block data.
func (*MemChunk) CheckpointSize ¶
func (c *MemChunk) CheckpointSize() (chunk, head int)
func (*MemChunk) Close ¶
func (c *MemChunk) Close() error
Close implements Chunk. TODO: Fix this to check edge cases.
func (*MemChunk) CompressedSize ¶
func (c *MemChunk) CompressedSize() int
CompressedSize implements Chunk.
func (*MemChunk) ConvertHead ¶
func (c *MemChunk) ConvertHead(desired HeadBlockFmt) error
func (*MemChunk) Encoding ¶
func (c *MemChunk) Encoding() compression.Codec
Encoding implements Chunk.
func (*MemChunk) Iterator ¶
func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error)
Iterator implements Chunk.
func (*MemChunk) Rebound ¶
func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, error)
Rebound builds a smaller chunk with logs having timestamp from start and end(both inclusive)
func (*MemChunk) SampleIterator ¶
func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator
Iterator implements Chunk.
func (*MemChunk) SerializeForCheckpointTo ¶
func (c *MemChunk) SerializeForCheckpointTo(chk, head io.Writer) error
SerializeForCheckpointTo serialize the chunk & head into different `io.Writer` for checkpointing use. This is to ensure eventually flushed chunks don't have different substructures depending on when they were checkpointed. In turn this allows us to maintain a more effective dedupe ratio in storage.
func (*MemChunk) SpaceFor ¶
func (c *MemChunk) SpaceFor(e *logproto.Entry) bool
SpaceFor implements Chunk.
func (*MemChunk) UncompressedSize ¶
func (c *MemChunk) UncompressedSize() int
UncompressedSize implements Chunk.
func (*MemChunk) Utilization ¶
func (c *MemChunk) Utilization() float64
Utilization implements Chunk.