Documentation
¶
Index ¶
- Constants
- Variables
- func GetCleanerVisitMarkerFilePath() string
- func GetPartitionVisitMarkerDirectoryPath(partitionedGroupID uint32) string
- func GetPartitionVisitMarkerFilePath(partitionedGroupID uint32, partitionID int) string
- func GetPartitionedGroupFile(partitionedGroupID uint32) string
- func IsBlockVisitMarker(path string) bool
- func IsNotBlockVisitMarkerError(err error) bool
- func IsNotPartitionVisitMarkerError(err error) bool
- func IsPartitionVisitMarker(path string) bool
- func NewBackgroundChunkSeriesSet(ctx context.Context, cs storage.ChunkSeriesSet) storage.ChunkSeriesSet
- func NewShardedPosting(ctx context.Context, postings index.Postings, partitionCount uint64, ...) (index.Postings, map[string]struct{}, error)
- func UpdateBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, reader io.Reader, ...) error
- type BlockDeletableCheckerFactory
- type BlockVisitMarker
- type BlocksCleaner
- type BlocksCleanerConfig
- type BlocksCompactorFactory
- type BlocksGrouperFactory
- type CleanerVisitMarker
- func (b *CleanerVisitMarker) GetStatus() VisitStatus
- func (b *CleanerVisitMarker) GetVisitMarkerFilePath() string
- func (b *CleanerVisitMarker) IsExpired(cleanerVisitMarkerTimeout time.Duration) bool
- func (b *CleanerVisitMarker) IsVisited(cleanerVisitMarkerTimeout time.Duration) bool
- func (b *CleanerVisitMarker) String() string
- func (b *CleanerVisitMarker) UpdateStatus(ownerIdentifier string, status VisitStatus)
- type CompactionLifecycleCallbackFactory
- type Compactor
- type Config
- type ConfigProvider
- type CortexMetadataFilter
- type LabelRemoverFilter
- type Limits
- type Partition
- type PartitionCompactionBlockDeletableChecker
- type PartitionCompactionGrouper
- type PartitionCompactionPlanner
- type PartitionedGroupInfo
- func ReadPartitionedGroupInfo(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, ...) (*PartitionedGroupInfo, error)
- func ReadPartitionedGroupInfoFile(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, ...) (*PartitionedGroupInfo, error)
- func UpdatePartitionedGroupInfo(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, ...) (*PartitionedGroupInfo, error)
- type PartitionedGroupStatus
- type PlannerFactory
- type RingConfig
- type ShardedBlockPopulator
- type ShardedCompactionLifecycleCallback
- func (c *ShardedCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, logger log.Logger, cg *compact.Group) (tsdb.BlockPopulator, error)
- func (c *ShardedCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, logger log.Logger, cg *compact.Group, _ ulid.ULID) error
- func (c *ShardedCompactionLifecycleCallback) PreCompactionCallback(_ context.Context, logger log.Logger, g *compact.Group, meta []*metadata.Meta) error
- type ShuffleShardingGrouper
- type ShuffleShardingPlanner
- type TimeRangeChecker
- type VisitMarker
- type VisitMarkerManager
- func (v *VisitMarkerManager) DeleteVisitMarker(ctx context.Context)
- func (v *VisitMarkerManager) HeartBeat(ctx context.Context, errChan <-chan error, ...)
- func (v *VisitMarkerManager) MarkWithStatus(ctx context.Context, status VisitStatus)
- func (v *VisitMarkerManager) ReadVisitMarker(ctx context.Context, visitMarker any) error
- type VisitStatus
Constants ¶
const ( // BlockVisitMarkerFile is the known json filename for representing the most recent compactor visit. BlockVisitMarkerFile = "visit-mark.json" // VisitMarkerVersion1 is the current supported version of visit-mark file. VisitMarkerVersion1 = 1 )
const ( // CleanerVisitMarkerName is the name of cleaner visit marker file. CleanerVisitMarkerName = "cleaner-visit-marker.json" // CleanerVisitMarkerVersion1 is the current supported version of cleaner visit mark file. CleanerVisitMarkerVersion1 = 1 )
const ( // PartitionVisitMarkerDirectory is the name of directory where all visit markers are saved. PartitionVisitMarkerDirectory = "visit-marks" // PartitionVisitMarkerFileSuffix is the known suffix of json filename for representing the most recent compactor visit. PartitionVisitMarkerFileSuffix = "visit-mark.json" // PartitionVisitMarkerFilePrefix is the known prefix of json filename for representing the most recent compactor visit. PartitionVisitMarkerFilePrefix = "partition-" // PartitionVisitMarkerVersion1 is the current supported version of visit-mark file. PartitionVisitMarkerVersion1 = 1 )
const ( PartitionedGroupDirectory = "partitioned-groups" PartitionedGroupInfoVersion1 = 1 )
Variables ¶
var ( ErrorBlockVisitMarkerNotFound = errors.New("block visit marker not found") ErrorUnmarshalBlockVisitMarker = errors.New("unmarshal block visit marker JSON") ErrorNotBlockVisitMarker = errors.New("file is not block visit marker") )
var ( RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, _ prometheus.Counter, _ prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter, _ int) compact.Grouper { return compact.NewDefaultGrouperWithMetrics( logger, bkt, cfg.AcceptMalformedIndex, true, compactorMetrics.compactions, compactorMetrics.compactionRunsStarted, compactorMetrics.compactionRunsCompleted, compactorMetrics.compactionFailures, compactorMetrics.verticalCompactions, syncerMetrics.BlocksMarkedForDeletion, syncerMetrics.GarbageCollectedBlocks, blocksMarkedForNoCompaction, metadata.NoneFunc, cfg.BlockFilesConcurrency, cfg.BlocksFetchConcurrency) } ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ingestionReplicationFactor int) compact.Grouper { if cfg.CompactionStrategy == util.CompactionStrategyPartitioning { return NewPartitionCompactionGrouper( ctx, logger, bkt, cfg.AcceptMalformedIndex, true, blocksMarkedForNoCompaction, syncerMetrics, compactorMetrics, metadata.NoneFunc, cfg, ring, ringLifecycle.Addr, ringLifecycle.ID, limits, userID, cfg.BlockFilesConcurrency, cfg.BlocksFetchConcurrency, cfg.CompactionConcurrency, true, cfg.CompactionVisitMarkerTimeout, noCompactionMarkFilter.NoCompactMarkedBlocks, ingestionReplicationFactor) } else { return NewShuffleShardingGrouper( ctx, logger, bkt, cfg.AcceptMalformedIndex, true, blocksMarkedForNoCompaction, metadata.NoneFunc, syncerMetrics, compactorMetrics, cfg, ring, ringLifecycle.Addr, ringLifecycle.ID, limits, userID, cfg.BlockFilesConcurrency, cfg.BlocksFetchConcurrency, cfg.CompactionConcurrency, cfg.CompactionVisitMarkerTimeout, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed, noCompactionMarkFilter.NoCompactMarkedBlocks) } } DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil) if err != nil { return nil, nil, err } plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ string, _ prometheus.Counter, _ prometheus.Counter, _ *compactorMetrics) compact.Planner { return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter) } return compactor, plannerFactory, nil } ShuffleShardingBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil) if err != nil { return nil, nil, err } plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner { if cfg.CompactionStrategy == util.CompactionStrategyPartitioning { return NewPartitionCompactionPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, userID, cfg.ShardingPlannerDelay, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, compactorMetrics) } else { return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed) } } return compactor, plannerFactory, nil } DefaultBlockDeletableCheckerFactory = func(_ context.Context, _ objstore.InstrumentedBucket, _ log.Logger) compact.BlockDeletableChecker { return compact.DefaultBlockDeletableChecker{} } PartitionCompactionBlockDeletableCheckerFactory = func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger) compact.BlockDeletableChecker { return NewPartitionCompactionBlockDeletableChecker() } DefaultCompactionLifecycleCallbackFactory = func(_ context.Context, _ objstore.InstrumentedBucket, _ log.Logger, _ int, _ string, _ string, _ *compactorMetrics) compact.CompactionLifecycleCallback { return compact.DefaultCompactionLifecycleCallback{} } ShardedCompactionLifecycleCallbackFactory = func(ctx context.Context, userBucket objstore.InstrumentedBucket, logger log.Logger, metaSyncConcurrency int, compactDir string, userID string, compactorMetrics *compactorMetrics) compact.CompactionLifecycleCallback { return NewShardedCompactionLifecycleCallback( ctx, userBucket, logger, metaSyncConcurrency, compactDir, userID, compactorMetrics, ) } )
var ( ErrorPartitionedGroupInfoNotFound = errors.New("partitioned group info not found") ErrorUnmarshalPartitionedGroupInfo = errors.New("unmarshal partitioned group info JSON") )
var (
DUMMY_BLOCK_ID = ulid.ULID{}
)
Functions ¶
func GetCleanerVisitMarkerFilePath ¶ added in v1.18.0
func GetCleanerVisitMarkerFilePath() string
func GetPartitionVisitMarkerDirectoryPath ¶ added in v1.19.0
func GetPartitionVisitMarkerFilePath ¶ added in v1.19.0
func GetPartitionedGroupFile ¶ added in v1.19.0
func IsBlockVisitMarker ¶ added in v1.16.0
func IsNotBlockVisitMarkerError ¶ added in v1.16.0
func IsNotPartitionVisitMarkerError ¶ added in v1.19.0
func IsPartitionVisitMarker ¶ added in v1.19.0
func NewBackgroundChunkSeriesSet ¶ added in v1.19.0
func NewBackgroundChunkSeriesSet(ctx context.Context, cs storage.ChunkSeriesSet) storage.ChunkSeriesSet
func NewShardedPosting ¶ added in v1.19.0
Types ¶
type BlockDeletableCheckerFactory ¶ added in v1.19.0
type BlockDeletableCheckerFactory func( ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, ) compact.BlockDeletableChecker
type BlockVisitMarker ¶ added in v1.14.0
type BlockVisitMarker struct { CompactorID string `json:"compactorID"` // VisitTime is a unix timestamp of when the block was visited (mark updated). VisitTime int64 `json:"visitTime"` // Version of the file. Version int `json:"version"` }
func ReadBlockVisitMarker ¶ added in v1.14.0
func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error)
type BlocksCleaner ¶ added in v1.2.0
func NewBlocksCleaner ¶ added in v1.2.0
func NewBlocksCleaner( cfg BlocksCleanerConfig, bucketClient objstore.InstrumentedBucket, usersScanner *cortex_tsdb.UsersScanner, compactionVisitMarkerTimeout time.Duration, cfgProvider ConfigProvider, logger log.Logger, ringLifecyclerID string, reg prometheus.Registerer, cleanerVisitMarkerTimeout time.Duration, cleanerVisitMarkerFileUpdateInterval time.Duration, blocksMarkedForDeletion *prometheus.CounterVec, remainingPlannedCompactions *prometheus.GaugeVec, ) *BlocksCleaner
type BlocksCleanerConfig ¶ added in v1.2.0
type BlocksCleanerConfig struct { DeletionDelay time.Duration CleanupInterval time.Duration CleanupConcurrency int BlockDeletionMarksMigrationEnabled bool // TODO Discuss whether we should remove it in Cortex 1.8.0 and document that upgrading to 1.7.0 before 1.8.0 is required. TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug". ShardingStrategy string CompactionStrategy string }
type BlocksCompactorFactory ¶ added in v1.8.0
type BlocksCompactorFactory func( ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer, ) (compact.Compactor, PlannerFactory, error)
BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks.
type BlocksGrouperFactory ¶ added in v1.8.0
type BlocksGrouperFactory func( ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompact prometheus.Counter, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, ring *ring.Ring, ringLifecycler *ring.Lifecycler, limit Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ingestionReplicationFactor int, ) compact.Grouper
BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks.
type CleanerVisitMarker ¶ added in v1.18.0
type CleanerVisitMarker struct { CompactorID string `json:"compactorID"` Status VisitStatus `json:"status"` // VisitTime is a unix timestamp of when the partition was visited (mark updated). VisitTime int64 `json:"visitTime"` // Version of the file. Version int `json:"version"` }
func NewCleanerVisitMarker ¶ added in v1.18.0
func NewCleanerVisitMarker(compactorID string) *CleanerVisitMarker
func (*CleanerVisitMarker) GetStatus ¶ added in v1.18.0
func (b *CleanerVisitMarker) GetStatus() VisitStatus
func (*CleanerVisitMarker) GetVisitMarkerFilePath ¶ added in v1.18.0
func (b *CleanerVisitMarker) GetVisitMarkerFilePath() string
func (*CleanerVisitMarker) IsExpired ¶ added in v1.18.0
func (b *CleanerVisitMarker) IsExpired(cleanerVisitMarkerTimeout time.Duration) bool
func (*CleanerVisitMarker) IsVisited ¶ added in v1.18.0
func (b *CleanerVisitMarker) IsVisited(cleanerVisitMarkerTimeout time.Duration) bool
func (*CleanerVisitMarker) String ¶ added in v1.18.0
func (b *CleanerVisitMarker) String() string
func (*CleanerVisitMarker) UpdateStatus ¶ added in v1.18.0
func (b *CleanerVisitMarker) UpdateStatus(ownerIdentifier string, status VisitStatus)
type CompactionLifecycleCallbackFactory ¶ added in v1.19.0
type Compactor ¶
type Compactor struct { services.Service // Metrics. CompactorStartDurationSeconds prometheus.Gauge CompactionRunsStarted prometheus.Counter CompactionRunsInterrupted prometheus.Counter CompactionRunsCompleted prometheus.Counter CompactionRunsFailed prometheus.Counter CompactionRunsLastSuccess prometheus.Gauge CompactionRunDiscoveredTenants prometheus.Gauge CompactionRunSkippedTenants prometheus.Gauge CompactionRunSucceededTenants prometheus.Gauge CompactionRunFailedTenants prometheus.Gauge CompactionRunInterval prometheus.Gauge BlocksMarkedForNoCompaction prometheus.Counter // contains filtered or unexported fields }
Compactor is a multi-tenant TSDB blocks compactor based on Thanos.
func NewCompactor ¶
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides, ingestionReplicationFactor int) (*Compactor, error)
NewCompactor makes a new Compactor.
func (*Compactor) RingHandler ¶ added in v0.7.0
func (c *Compactor) RingHandler(w http.ResponseWriter, req *http.Request)
type Config ¶
type Config struct { BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"` BlockSyncConcurrency int `yaml:"block_sync_concurrency"` MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` ConsistencyDelay time.Duration `yaml:"consistency_delay"` DataDir string `yaml:"data_dir"` CompactionInterval time.Duration `yaml:"compaction_interval"` CompactionRetries int `yaml:"compaction_retries"` CompactionConcurrency int `yaml:"compaction_concurrency"` CleanupInterval time.Duration `yaml:"cleanup_interval"` CleanupConcurrency int `yaml:"cleanup_concurrency"` DeletionDelay time.Duration `yaml:"deletion_delay"` TenantCleanupDelay time.Duration `yaml:"tenant_cleanup_delay"` SkipBlocksWithOutOfOrderChunksEnabled bool `yaml:"skip_blocks_with_out_of_order_chunks_enabled"` BlockFilesConcurrency int `yaml:"block_files_concurrency"` BlocksFetchConcurrency int `yaml:"blocks_fetch_concurrency"` // Whether the migration of block deletion marks to the global markers location is enabled. BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"` EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` // Compactors sharding. ShardingEnabled bool `yaml:"sharding_enabled"` ShardingStrategy string `yaml:"sharding_strategy"` ShardingRing RingConfig `yaml:"sharding_ring"` ShardingPlannerDelay time.Duration `yaml:"sharding_planner_delay"` // Compaction strategy. CompactionStrategy string `yaml:"compaction_strategy"` // Allow downstream projects to customise the blocks compactor. BlocksGrouperFactory BlocksGrouperFactory `yaml:"-"` BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"` // Compaction visit marker file config CompactionVisitMarkerTimeout time.Duration `yaml:"compaction_visit_marker_timeout"` CompactionVisitMarkerFileUpdateInterval time.Duration `yaml:"compaction_visit_marker_file_update_interval"` // Cleaner visit marker file config CleanerVisitMarkerTimeout time.Duration `yaml:"cleaner_visit_marker_timeout"` CleanerVisitMarkerFileUpdateInterval time.Duration `yaml:"cleaner_visit_marker_file_update_interval"` AcceptMalformedIndex bool `yaml:"accept_malformed_index"` CachingBucketEnabled bool `yaml:"caching_bucket_enabled"` // contains filtered or unexported fields }
Config holds the Compactor config.
func (*Config) RegisterFlags ¶
RegisterFlags registers the Compactor flags.
type ConfigProvider ¶ added in v1.8.0
type ConfigProvider interface { bucket.TenantConfigProvider CompactorBlocksRetentionPeriod(user string) time.Duration }
ConfigProvider defines the per-tenant config provider for the Compactor.
type CortexMetadataFilter ¶ added in v1.19.0
type CortexMetadataFilter interface { block.DeduplicateFilter block.MetadataFilter }
type LabelRemoverFilter ¶ added in v1.2.0
type LabelRemoverFilter struct {
// contains filtered or unexported fields
}
func NewLabelRemoverFilter ¶ added in v1.2.0
func NewLabelRemoverFilter(labels []string) *LabelRemoverFilter
NewLabelRemoverFilter creates a LabelRemoverFilter.
func (*LabelRemoverFilter) Filter ¶ added in v1.2.0
func (f *LabelRemoverFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ block.GaugeVec, _ block.GaugeVec) error
Filter modifies external labels of existing blocks, removing given labels from the metadata of blocks that have it.
type Limits ¶ added in v1.13.0
type Limits interface { CompactorTenantShardSize(userID string) int CompactorPartitionIndexSizeBytes(userID string) int64 CompactorPartitionSeriesCount(userID string) int64 }
Limits defines limits used by the Compactor.
type PartitionCompactionBlockDeletableChecker ¶ added in v1.19.0
type PartitionCompactionBlockDeletableChecker struct{}
func NewPartitionCompactionBlockDeletableChecker ¶ added in v1.19.0
func NewPartitionCompactionBlockDeletableChecker() *PartitionCompactionBlockDeletableChecker
type PartitionCompactionGrouper ¶ added in v1.19.0
type PartitionCompactionGrouper struct {
// contains filtered or unexported fields
}
func NewPartitionCompactionGrouper ¶ added in v1.19.0
func NewPartitionCompactionGrouper( ctx context.Context, logger log.Logger, bkt objstore.InstrumentedBucket, acceptMalformedIndex bool, enableVerticalCompaction bool, blocksMarkedForNoCompact prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, hashFunc metadata.HashFunc, compactorCfg Config, ring ring.ReadRing, ringLifecyclerAddr string, ringLifecyclerID string, limits Limits, userID string, blockFilesConcurrency int, blocksFetchConcurrency int, compactionConcurrency int, doRandomPick bool, partitionVisitMarkerTimeout time.Duration, noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark, ingestionReplicationFactor int, ) *PartitionCompactionGrouper
type PartitionCompactionPlanner ¶ added in v1.19.0
type PartitionCompactionPlanner struct {
// contains filtered or unexported fields
}
func NewPartitionCompactionPlanner ¶ added in v1.19.0
func NewPartitionCompactionPlanner( ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, ranges []int64, noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark, ringLifecyclerID string, userID string, plannerDelay time.Duration, partitionVisitMarkerTimeout time.Duration, partitionVisitMarkerFileUpdateInterval time.Duration, compactorMetrics *compactorMetrics, ) *PartitionCompactionPlanner
func (*PartitionCompactionPlanner) PlanWithPartition ¶ added in v1.19.0
type PartitionedGroupInfo ¶ added in v1.19.0
type PartitionedGroupInfo struct { PartitionedGroupID uint32 `json:"partitionedGroupID"` PartitionCount int `json:"partitionCount"` Partitions []Partition `json:"partitions"` RangeStart int64 `json:"rangeStart"` RangeEnd int64 `json:"rangeEnd"` CreationTime int64 `json:"creationTime"` // Version of the file. Version int `json:"version"` }
func ReadPartitionedGroupInfo ¶ added in v1.19.0
func ReadPartitionedGroupInfo(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, partitionedGroupID uint32) (*PartitionedGroupInfo, error)
func ReadPartitionedGroupInfoFile ¶ added in v1.19.0
func ReadPartitionedGroupInfoFile(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, partitionedGroupFile string) (*PartitionedGroupInfo, error)
func UpdatePartitionedGroupInfo ¶ added in v1.19.0
func UpdatePartitionedGroupInfo(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, partitionedGroupInfo PartitionedGroupInfo) (*PartitionedGroupInfo, error)
func (*PartitionedGroupInfo) String ¶ added in v1.19.0
func (p *PartitionedGroupInfo) String() string
type PartitionedGroupStatus ¶ added in v1.19.0
type PartitionedGroupStatus struct { PartitionedGroupID uint32 CanDelete bool IsCompleted bool DeleteVisitMarker bool PendingPartitions int InProgressPartitions int PendingOrFailedPartitions []Partition }
func (PartitionedGroupStatus) String ¶ added in v1.19.0
func (s PartitionedGroupStatus) String() string
type PlannerFactory ¶ added in v1.13.0
type PlannerFactory func( ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics, ) compact.Planner
type RingConfig ¶ added in v0.7.0
type RingConfig struct { KVStore kv.Config `yaml:"kvstore"` HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` // Wait ring stability. WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration"` WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration"` // Instance details InstanceID string `yaml:"instance_id" doc:"hidden"` InstanceInterfaceNames []string `yaml:"instance_interface_names"` InstancePort int `yaml:"instance_port" doc:"hidden"` InstanceAddr string `yaml:"instance_addr" doc:"hidden"` TokensFilePath string `yaml:"tokens_file_path"` UnregisterOnShutdown bool `yaml:"unregister_on_shutdown"` // Injected internally ListenPort int `yaml:"-"` WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout"` ObservePeriod time.Duration `yaml:"-"` }
RingConfig masks the ring lifecycler config which contains many options not really required by the compactors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.
func (*RingConfig) RegisterFlags ¶ added in v0.7.0
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*RingConfig) ToLifecyclerConfig ¶ added in v0.7.0
func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig
ToLifecyclerConfig returns a LifecyclerConfig based on the compactor ring config.
type ShardedBlockPopulator ¶ added in v1.19.0
type ShardedBlockPopulator struct {
// contains filtered or unexported fields
}
func (ShardedBlockPopulator) PopulateBlock ¶ added in v1.19.0
func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb.CompactorMetrics, _ log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []tsdb.BlockReader, meta *tsdb.BlockMeta, indexw tsdb.IndexWriter, chunkw tsdb.ChunkWriter, postingsFunc tsdb.IndexReaderPostingsFunc) (err error)
PopulateBlock fills the index and chunk writers with new data gathered as the union of the provided blocks. It returns meta information for the new block. It expects sorted blocks input by mint. The main logic is copied from tsdb.DefaultPopulateBlockFunc
type ShardedCompactionLifecycleCallback ¶ added in v1.19.0
type ShardedCompactionLifecycleCallback struct {
// contains filtered or unexported fields
}
func NewShardedCompactionLifecycleCallback ¶ added in v1.19.0
func NewShardedCompactionLifecycleCallback( ctx context.Context, userBucket objstore.InstrumentedBucket, logger log.Logger, metaSyncConcurrency int, compactDir string, userID string, compactorMetrics *compactorMetrics, ) *ShardedCompactionLifecycleCallback
func (*ShardedCompactionLifecycleCallback) GetBlockPopulator ¶ added in v1.19.0
func (c *ShardedCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, logger log.Logger, cg *compact.Group) (tsdb.BlockPopulator, error)
func (*ShardedCompactionLifecycleCallback) PostCompactionCallback ¶ added in v1.19.0
type ShuffleShardingGrouper ¶ added in v1.13.0
type ShuffleShardingGrouper struct {
// contains filtered or unexported fields
}
func NewShuffleShardingGrouper ¶ added in v1.13.0
func NewShuffleShardingGrouper( ctx context.Context, logger log.Logger, bkt objstore.InstrumentedBucket, acceptMalformedIndex bool, enableVerticalCompaction bool, blocksMarkedForNoCompact prometheus.Counter, hashFunc metadata.HashFunc, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, compactorCfg Config, ring ring.ReadRing, ringLifecyclerAddr string, ringLifecyclerID string, limits Limits, userID string, blockFilesConcurrency int, blocksFetchConcurrency int, compactionConcurrency int, blockVisitMarkerTimeout time.Duration, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark, ) *ShuffleShardingGrouper
type ShuffleShardingPlanner ¶ added in v1.13.0
type ShuffleShardingPlanner struct {
// contains filtered or unexported fields
}
func NewShuffleShardingPlanner ¶ added in v1.13.0
func NewShuffleShardingPlanner( ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, ranges []int64, noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark, ringLifecyclerID string, blockVisitMarkerTimeout time.Duration, blockVisitMarkerFileUpdateInterval time.Duration, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ) *ShuffleShardingPlanner
type TimeRangeChecker ¶ added in v1.19.0
type TimeRangeChecker struct { // This is a map of timeRange to a map of rangeStart to timeRangeStatus TimeRangesStatus map[int64]map[int64]*timeRangeStatus }
func NewCompletenessChecker ¶ added in v1.19.0
type VisitMarker ¶ added in v1.18.0
type VisitMarker interface { GetVisitMarkerFilePath() string UpdateStatus(ownerIdentifier string, status VisitStatus) GetStatus() VisitStatus IsExpired(visitMarkerTimeout time.Duration) bool String() string }
type VisitMarkerManager ¶ added in v1.18.0
type VisitMarkerManager struct {
// contains filtered or unexported fields
}
func NewVisitMarkerManager ¶ added in v1.18.0
func NewVisitMarkerManager( bkt objstore.InstrumentedBucket, logger log.Logger, ownerIdentifier string, visitMarker VisitMarker, ) *VisitMarkerManager
func (*VisitMarkerManager) DeleteVisitMarker ¶ added in v1.18.0
func (v *VisitMarkerManager) DeleteVisitMarker(ctx context.Context)
func (*VisitMarkerManager) MarkWithStatus ¶ added in v1.18.0
func (v *VisitMarkerManager) MarkWithStatus(ctx context.Context, status VisitStatus)
func (*VisitMarkerManager) ReadVisitMarker ¶ added in v1.18.0
func (v *VisitMarkerManager) ReadVisitMarker(ctx context.Context, visitMarker any) error
type VisitStatus ¶ added in v1.18.0
type VisitStatus string
const ( Pending VisitStatus = "pending" InProgress VisitStatus = "inProgress" Completed VisitStatus = "completed" Failed VisitStatus = "failed" )
Source Files
¶
- background_chunks_series_set.go
- block_visit_marker.go
- blocks_cleaner.go
- cleaner_visit_marker.go
- compactor.go
- compactor_http.go
- compactor_metrics.go
- compactor_ring.go
- label_remover_filter.go
- partition_compaction_complete_checker.go
- partition_compaction_grouper.go
- partition_compaction_planner.go
- partition_visit_marker.go
- partitioned_group_info.go
- sharded_block_populator.go
- sharded_compaction_lifecycle_callback.go
- sharded_posting.go
- shuffle_sharding_grouper.go
- shuffle_sharding_planner.go
- visit_marker.go