compactor

package
v1.19.0-rc.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2025 License: Apache-2.0 Imports: 60 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// BlockVisitMarkerFile is the known json filename for representing the most recent compactor visit.
	BlockVisitMarkerFile = "visit-mark.json"
	// VisitMarkerVersion1 is the current supported version of visit-mark file.
	VisitMarkerVersion1 = 1
)
View Source
const (
	// CleanerVisitMarkerName is the name of cleaner visit marker file.
	CleanerVisitMarkerName = "cleaner-visit-marker.json"
	// CleanerVisitMarkerVersion1 is the current supported version of cleaner visit mark file.
	CleanerVisitMarkerVersion1 = 1
)
View Source
const (
	// PartitionVisitMarkerDirectory is the name of directory where all visit markers are saved.
	PartitionVisitMarkerDirectory = "visit-marks"
	// PartitionVisitMarkerFileSuffix is the known suffix of json filename for representing the most recent compactor visit.
	PartitionVisitMarkerFileSuffix = "visit-mark.json"
	// PartitionVisitMarkerFilePrefix is the known prefix of json filename for representing the most recent compactor visit.
	PartitionVisitMarkerFilePrefix = "partition-"
	// PartitionVisitMarkerVersion1 is the current supported version of visit-mark file.
	PartitionVisitMarkerVersion1 = 1
)
View Source
const (
	PartitionedGroupDirectory    = "partitioned-groups"
	PartitionedGroupInfoVersion1 = 1
)

Variables

View Source
var (
	ErrorBlockVisitMarkerNotFound  = errors.New("block visit marker not found")
	ErrorUnmarshalBlockVisitMarker = errors.New("unmarshal block visit marker JSON")
	ErrorNotBlockVisitMarker       = errors.New("file is not block visit marker")
)
View Source
var (
	RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)

	DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, _ prometheus.Counter, _ prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter, _ int) compact.Grouper {
		return compact.NewDefaultGrouperWithMetrics(
			logger,
			bkt,
			cfg.AcceptMalformedIndex,
			true,
			compactorMetrics.compactions,
			compactorMetrics.compactionRunsStarted,
			compactorMetrics.compactionRunsCompleted,
			compactorMetrics.compactionFailures,
			compactorMetrics.verticalCompactions,
			syncerMetrics.BlocksMarkedForDeletion,
			syncerMetrics.GarbageCollectedBlocks,
			blocksMarkedForNoCompaction,
			metadata.NoneFunc,
			cfg.BlockFilesConcurrency,
			cfg.BlocksFetchConcurrency)
	}

	ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ingestionReplicationFactor int) compact.Grouper {
		if cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
			return NewPartitionCompactionGrouper(
				ctx,
				logger,
				bkt,
				cfg.AcceptMalformedIndex,
				true,
				blocksMarkedForNoCompaction,
				syncerMetrics,
				compactorMetrics,
				metadata.NoneFunc,
				cfg,
				ring,
				ringLifecycle.Addr,
				ringLifecycle.ID,
				limits,
				userID,
				cfg.BlockFilesConcurrency,
				cfg.BlocksFetchConcurrency,
				cfg.CompactionConcurrency,
				true,
				cfg.CompactionVisitMarkerTimeout,
				noCompactionMarkFilter.NoCompactMarkedBlocks,
				ingestionReplicationFactor)
		} else {
			return NewShuffleShardingGrouper(
				ctx,
				logger,
				bkt,
				cfg.AcceptMalformedIndex,
				true,
				blocksMarkedForNoCompaction,
				metadata.NoneFunc,
				syncerMetrics,
				compactorMetrics,
				cfg,
				ring,
				ringLifecycle.Addr,
				ringLifecycle.ID,
				limits,
				userID,
				cfg.BlockFilesConcurrency,
				cfg.BlocksFetchConcurrency,
				cfg.CompactionConcurrency,
				cfg.CompactionVisitMarkerTimeout,
				blockVisitMarkerReadFailed,
				blockVisitMarkerWriteFailed,
				noCompactionMarkFilter.NoCompactMarkedBlocks)
		}
	}

	DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
		compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil)
		if err != nil {
			return nil, nil, err
		}

		plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ string, _ prometheus.Counter, _ prometheus.Counter, _ *compactorMetrics) compact.Planner {
			return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter)
		}

		return compactor, plannerFactory, nil
	}

	ShuffleShardingBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
		compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil)
		if err != nil {
			return nil, nil, err
		}

		plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner {

			if cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
				return NewPartitionCompactionPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, userID, cfg.ShardingPlannerDelay, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, compactorMetrics)
			} else {
				return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
			}
		}
		return compactor, plannerFactory, nil
	}

	DefaultBlockDeletableCheckerFactory = func(_ context.Context, _ objstore.InstrumentedBucket, _ log.Logger) compact.BlockDeletableChecker {
		return compact.DefaultBlockDeletableChecker{}
	}

	PartitionCompactionBlockDeletableCheckerFactory = func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger) compact.BlockDeletableChecker {
		return NewPartitionCompactionBlockDeletableChecker()
	}

	DefaultCompactionLifecycleCallbackFactory = func(_ context.Context, _ objstore.InstrumentedBucket, _ log.Logger, _ int, _ string, _ string, _ *compactorMetrics) compact.CompactionLifecycleCallback {
		return compact.DefaultCompactionLifecycleCallback{}
	}

	ShardedCompactionLifecycleCallbackFactory = func(ctx context.Context, userBucket objstore.InstrumentedBucket, logger log.Logger, metaSyncConcurrency int, compactDir string, userID string, compactorMetrics *compactorMetrics) compact.CompactionLifecycleCallback {
		return NewShardedCompactionLifecycleCallback(
			ctx,
			userBucket,
			logger,
			metaSyncConcurrency,
			compactDir,
			userID,
			compactorMetrics,
		)
	}
)
View Source
var (
	ErrorPartitionedGroupInfoNotFound  = errors.New("partitioned group info not found")
	ErrorUnmarshalPartitionedGroupInfo = errors.New("unmarshal partitioned group info JSON")
)
View Source
var (
	DUMMY_BLOCK_ID = ulid.ULID{}
)

Functions

func GetCleanerVisitMarkerFilePath added in v1.18.0

func GetCleanerVisitMarkerFilePath() string

func GetPartitionVisitMarkerDirectoryPath added in v1.19.0

func GetPartitionVisitMarkerDirectoryPath(partitionedGroupID uint32) string

func GetPartitionVisitMarkerFilePath added in v1.19.0

func GetPartitionVisitMarkerFilePath(partitionedGroupID uint32, partitionID int) string

func GetPartitionedGroupFile added in v1.19.0

func GetPartitionedGroupFile(partitionedGroupID uint32) string

func IsBlockVisitMarker added in v1.16.0

func IsBlockVisitMarker(path string) bool

func IsNotBlockVisitMarkerError added in v1.16.0

func IsNotBlockVisitMarkerError(err error) bool

func IsNotPartitionVisitMarkerError added in v1.19.0

func IsNotPartitionVisitMarkerError(err error) bool

func IsPartitionVisitMarker added in v1.19.0

func IsPartitionVisitMarker(path string) bool

func NewBackgroundChunkSeriesSet added in v1.19.0

func NewBackgroundChunkSeriesSet(ctx context.Context, cs storage.ChunkSeriesSet) storage.ChunkSeriesSet

func NewShardedPosting added in v1.19.0

func NewShardedPosting(ctx context.Context, postings index.Postings, partitionCount uint64, partitionID uint64, labelsFn func(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error) (index.Postings, map[string]struct{}, error)

func UpdateBlockVisitMarker added in v1.14.0

func UpdateBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, reader io.Reader, blockVisitMarkerWriteFailed prometheus.Counter) error

Types

type BlockDeletableCheckerFactory added in v1.19.0

type BlockDeletableCheckerFactory func(
	ctx context.Context,
	bkt objstore.InstrumentedBucket,
	logger log.Logger,
) compact.BlockDeletableChecker

type BlockVisitMarker added in v1.14.0

type BlockVisitMarker struct {
	CompactorID string `json:"compactorID"`
	// VisitTime is a unix timestamp of when the block was visited (mark updated).
	VisitTime int64 `json:"visitTime"`
	// Version of the file.
	Version int `json:"version"`
}

func ReadBlockVisitMarker added in v1.14.0

func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error)

type BlocksCleaner added in v1.2.0

type BlocksCleaner struct {
	services.Service
	// contains filtered or unexported fields
}

func NewBlocksCleaner added in v1.2.0

func NewBlocksCleaner(
	cfg BlocksCleanerConfig,
	bucketClient objstore.InstrumentedBucket,
	usersScanner *cortex_tsdb.UsersScanner,
	compactionVisitMarkerTimeout time.Duration,
	cfgProvider ConfigProvider,
	logger log.Logger,
	ringLifecyclerID string,
	reg prometheus.Registerer,
	cleanerVisitMarkerTimeout time.Duration,
	cleanerVisitMarkerFileUpdateInterval time.Duration,
	blocksMarkedForDeletion *prometheus.CounterVec,
	remainingPlannedCompactions *prometheus.GaugeVec,
) *BlocksCleaner

type BlocksCleanerConfig added in v1.2.0

type BlocksCleanerConfig struct {
	DeletionDelay                      time.Duration
	CleanupInterval                    time.Duration
	CleanupConcurrency                 int
	BlockDeletionMarksMigrationEnabled bool          // TODO Discuss whether we should remove it in Cortex 1.8.0 and document that upgrading to 1.7.0 before 1.8.0 is required.
	TenantCleanupDelay                 time.Duration // Delay before removing tenant deletion mark and "debug".
	ShardingStrategy                   string
	CompactionStrategy                 string
}

type BlocksCompactorFactory added in v1.8.0

type BlocksCompactorFactory func(
	ctx context.Context,
	cfg Config,
	logger log.Logger,
	reg prometheus.Registerer,
) (compact.Compactor, PlannerFactory, error)

BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks.

type BlocksGrouperFactory added in v1.8.0

type BlocksGrouperFactory func(
	ctx context.Context,
	cfg Config,
	bkt objstore.InstrumentedBucket,
	logger log.Logger,
	blocksMarkedForNoCompact prometheus.Counter,
	blockVisitMarkerReadFailed prometheus.Counter,
	blockVisitMarkerWriteFailed prometheus.Counter,
	syncerMetrics *compact.SyncerMetrics,
	compactorMetrics *compactorMetrics,
	ring *ring.Ring,
	ringLifecycler *ring.Lifecycler,
	limit Limits,
	userID string,
	noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter,
	ingestionReplicationFactor int,
) compact.Grouper

BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks.

type CleanerVisitMarker added in v1.18.0

type CleanerVisitMarker struct {
	CompactorID string      `json:"compactorID"`
	Status      VisitStatus `json:"status"`
	// VisitTime is a unix timestamp of when the partition was visited (mark updated).
	VisitTime int64 `json:"visitTime"`
	// Version of the file.
	Version int `json:"version"`
}

func NewCleanerVisitMarker added in v1.18.0

func NewCleanerVisitMarker(compactorID string) *CleanerVisitMarker

func (*CleanerVisitMarker) GetStatus added in v1.18.0

func (b *CleanerVisitMarker) GetStatus() VisitStatus

func (*CleanerVisitMarker) GetVisitMarkerFilePath added in v1.18.0

func (b *CleanerVisitMarker) GetVisitMarkerFilePath() string

func (*CleanerVisitMarker) IsExpired added in v1.18.0

func (b *CleanerVisitMarker) IsExpired(cleanerVisitMarkerTimeout time.Duration) bool

func (*CleanerVisitMarker) IsVisited added in v1.18.0

func (b *CleanerVisitMarker) IsVisited(cleanerVisitMarkerTimeout time.Duration) bool

func (*CleanerVisitMarker) String added in v1.18.0

func (b *CleanerVisitMarker) String() string

func (*CleanerVisitMarker) UpdateStatus added in v1.18.0

func (b *CleanerVisitMarker) UpdateStatus(ownerIdentifier string, status VisitStatus)

type CompactionLifecycleCallbackFactory added in v1.19.0

type CompactionLifecycleCallbackFactory func(
	ctx context.Context,
	userBucket objstore.InstrumentedBucket,
	logger log.Logger,
	metaSyncConcurrency int,
	compactDir string,
	userID string,
	compactorMetrics *compactorMetrics,
) compact.CompactionLifecycleCallback

type Compactor

type Compactor struct {
	services.Service

	// Metrics.
	CompactorStartDurationSeconds  prometheus.Gauge
	CompactionRunsStarted          prometheus.Counter
	CompactionRunsInterrupted      prometheus.Counter
	CompactionRunsCompleted        prometheus.Counter
	CompactionRunsFailed           prometheus.Counter
	CompactionRunsLastSuccess      prometheus.Gauge
	CompactionRunDiscoveredTenants prometheus.Gauge
	CompactionRunSkippedTenants    prometheus.Gauge
	CompactionRunSucceededTenants  prometheus.Gauge
	CompactionRunFailedTenants     prometheus.Gauge
	CompactionRunInterval          prometheus.Gauge
	BlocksMarkedForNoCompaction    prometheus.Counter
	// contains filtered or unexported fields
}

Compactor is a multi-tenant TSDB blocks compactor based on Thanos.

func NewCompactor

func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides, ingestionReplicationFactor int) (*Compactor, error)

NewCompactor makes a new Compactor.

func (*Compactor) RingHandler added in v0.7.0

func (c *Compactor) RingHandler(w http.ResponseWriter, req *http.Request)

type Config

type Config struct {
	BlockRanges                           cortex_tsdb.DurationList `yaml:"block_ranges"`
	BlockSyncConcurrency                  int                      `yaml:"block_sync_concurrency"`
	MetaSyncConcurrency                   int                      `yaml:"meta_sync_concurrency"`
	ConsistencyDelay                      time.Duration            `yaml:"consistency_delay"`
	DataDir                               string                   `yaml:"data_dir"`
	CompactionInterval                    time.Duration            `yaml:"compaction_interval"`
	CompactionRetries                     int                      `yaml:"compaction_retries"`
	CompactionConcurrency                 int                      `yaml:"compaction_concurrency"`
	CleanupInterval                       time.Duration            `yaml:"cleanup_interval"`
	CleanupConcurrency                    int                      `yaml:"cleanup_concurrency"`
	DeletionDelay                         time.Duration            `yaml:"deletion_delay"`
	TenantCleanupDelay                    time.Duration            `yaml:"tenant_cleanup_delay"`
	SkipBlocksWithOutOfOrderChunksEnabled bool                     `yaml:"skip_blocks_with_out_of_order_chunks_enabled"`
	BlockFilesConcurrency                 int                      `yaml:"block_files_concurrency"`
	BlocksFetchConcurrency                int                      `yaml:"blocks_fetch_concurrency"`

	// Whether the migration of block deletion marks to the global markers location is enabled.
	BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"`

	EnabledTenants  flagext.StringSliceCSV `yaml:"enabled_tenants"`
	DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`

	// Compactors sharding.
	ShardingEnabled      bool          `yaml:"sharding_enabled"`
	ShardingStrategy     string        `yaml:"sharding_strategy"`
	ShardingRing         RingConfig    `yaml:"sharding_ring"`
	ShardingPlannerDelay time.Duration `yaml:"sharding_planner_delay"`

	// Compaction strategy.
	CompactionStrategy string `yaml:"compaction_strategy"`

	// Allow downstream projects to customise the blocks compactor.
	BlocksGrouperFactory   BlocksGrouperFactory   `yaml:"-"`
	BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"`

	// Compaction visit marker file config
	CompactionVisitMarkerTimeout            time.Duration `yaml:"compaction_visit_marker_timeout"`
	CompactionVisitMarkerFileUpdateInterval time.Duration `yaml:"compaction_visit_marker_file_update_interval"`

	// Cleaner visit marker file config
	CleanerVisitMarkerTimeout            time.Duration `yaml:"cleaner_visit_marker_timeout"`
	CleanerVisitMarkerFileUpdateInterval time.Duration `yaml:"cleaner_visit_marker_file_update_interval"`

	AcceptMalformedIndex bool `yaml:"accept_malformed_index"`
	CachingBucketEnabled bool `yaml:"caching_bucket_enabled"`
	// contains filtered or unexported fields
}

Config holds the Compactor config.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers the Compactor flags.

func (*Config) Validate added in v1.6.0

func (cfg *Config) Validate(limits validation.Limits) error

type ConfigProvider added in v1.8.0

type ConfigProvider interface {
	bucket.TenantConfigProvider
	CompactorBlocksRetentionPeriod(user string) time.Duration
}

ConfigProvider defines the per-tenant config provider for the Compactor.

type CortexMetadataFilter added in v1.19.0

type CortexMetadataFilter interface {
	block.DeduplicateFilter
	block.MetadataFilter
}

type LabelRemoverFilter added in v1.2.0

type LabelRemoverFilter struct {
	// contains filtered or unexported fields
}

func NewLabelRemoverFilter added in v1.2.0

func NewLabelRemoverFilter(labels []string) *LabelRemoverFilter

NewLabelRemoverFilter creates a LabelRemoverFilter.

func (*LabelRemoverFilter) Filter added in v1.2.0

Filter modifies external labels of existing blocks, removing given labels from the metadata of blocks that have it.

type Limits added in v1.13.0

type Limits interface {
	CompactorTenantShardSize(userID string) int
	CompactorPartitionIndexSizeBytes(userID string) int64
	CompactorPartitionSeriesCount(userID string) int64
}

Limits defines limits used by the Compactor.

type Partition added in v1.19.0

type Partition struct {
	PartitionID int         `json:"partitionID"`
	Blocks      []ulid.ULID `json:"blocks"`
}

type PartitionCompactionBlockDeletableChecker added in v1.19.0

type PartitionCompactionBlockDeletableChecker struct{}

func NewPartitionCompactionBlockDeletableChecker added in v1.19.0

func NewPartitionCompactionBlockDeletableChecker() *PartitionCompactionBlockDeletableChecker

func (*PartitionCompactionBlockDeletableChecker) CanDelete added in v1.19.0

type PartitionCompactionGrouper added in v1.19.0

type PartitionCompactionGrouper struct {
	// contains filtered or unexported fields
}

func NewPartitionCompactionGrouper added in v1.19.0

func NewPartitionCompactionGrouper(
	ctx context.Context,
	logger log.Logger,
	bkt objstore.InstrumentedBucket,
	acceptMalformedIndex bool,
	enableVerticalCompaction bool,
	blocksMarkedForNoCompact prometheus.Counter,
	syncerMetrics *compact.SyncerMetrics,
	compactorMetrics *compactorMetrics,
	hashFunc metadata.HashFunc,
	compactorCfg Config,
	ring ring.ReadRing,
	ringLifecyclerAddr string,
	ringLifecyclerID string,
	limits Limits,
	userID string,
	blockFilesConcurrency int,
	blocksFetchConcurrency int,
	compactionConcurrency int,
	doRandomPick bool,
	partitionVisitMarkerTimeout time.Duration,
	noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark,
	ingestionReplicationFactor int,
) *PartitionCompactionGrouper

func (*PartitionCompactionGrouper) Groups added in v1.19.0

func (g *PartitionCompactionGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error)

Groups function modified from https://github.com/cortexproject/cortex/pull/2616

type PartitionCompactionPlanner added in v1.19.0

type PartitionCompactionPlanner struct {
	// contains filtered or unexported fields
}

func NewPartitionCompactionPlanner added in v1.19.0

func NewPartitionCompactionPlanner(
	ctx context.Context,
	bkt objstore.InstrumentedBucket,
	logger log.Logger,
	ranges []int64,
	noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark,
	ringLifecyclerID string,
	userID string,
	plannerDelay time.Duration,
	partitionVisitMarkerTimeout time.Duration,
	partitionVisitMarkerFileUpdateInterval time.Duration,
	compactorMetrics *compactorMetrics,
) *PartitionCompactionPlanner

func (*PartitionCompactionPlanner) Plan added in v1.19.0

func (p *PartitionCompactionPlanner) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error)

func (*PartitionCompactionPlanner) PlanWithPartition added in v1.19.0

func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasByMinTime []*metadata.Meta, cortexMetaExtensions *tsdb.CortexMetaExtensions, errChan chan error) ([]*metadata.Meta, error)

type PartitionedGroupInfo added in v1.19.0

type PartitionedGroupInfo struct {
	PartitionedGroupID uint32      `json:"partitionedGroupID"`
	PartitionCount     int         `json:"partitionCount"`
	Partitions         []Partition `json:"partitions"`
	RangeStart         int64       `json:"rangeStart"`
	RangeEnd           int64       `json:"rangeEnd"`
	CreationTime       int64       `json:"creationTime"`
	// Version of the file.
	Version int `json:"version"`
}

func ReadPartitionedGroupInfo added in v1.19.0

func ReadPartitionedGroupInfo(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, partitionedGroupID uint32) (*PartitionedGroupInfo, error)

func ReadPartitionedGroupInfoFile added in v1.19.0

func ReadPartitionedGroupInfoFile(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, partitionedGroupFile string) (*PartitionedGroupInfo, error)

func UpdatePartitionedGroupInfo added in v1.19.0

func UpdatePartitionedGroupInfo(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, partitionedGroupInfo PartitionedGroupInfo) (*PartitionedGroupInfo, error)

func (*PartitionedGroupInfo) String added in v1.19.0

func (p *PartitionedGroupInfo) String() string

type PartitionedGroupStatus added in v1.19.0

type PartitionedGroupStatus struct {
	PartitionedGroupID        uint32
	CanDelete                 bool
	IsCompleted               bool
	DeleteVisitMarker         bool
	PendingPartitions         int
	InProgressPartitions      int
	PendingOrFailedPartitions []Partition
}

func (PartitionedGroupStatus) String added in v1.19.0

func (s PartitionedGroupStatus) String() string

type PlannerFactory added in v1.13.0

type PlannerFactory func(
	ctx context.Context,
	bkt objstore.InstrumentedBucket,
	logger log.Logger,
	cfg Config,
	noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter,
	ringLifecycle *ring.Lifecycler,
	userID string,
	blockVisitMarkerReadFailed prometheus.Counter,
	blockVisitMarkerWriteFailed prometheus.Counter,
	compactorMetrics *compactorMetrics,
) compact.Planner

type RingConfig added in v0.7.0

type RingConfig struct {
	KVStore          kv.Config     `yaml:"kvstore"`
	HeartbeatPeriod  time.Duration `yaml:"heartbeat_period"`
	HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`

	// Wait ring stability.
	WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration"`
	WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration"`

	// Instance details
	InstanceID             string   `yaml:"instance_id" doc:"hidden"`
	InstanceInterfaceNames []string `yaml:"instance_interface_names"`
	InstancePort           int      `yaml:"instance_port" doc:"hidden"`
	InstanceAddr           string   `yaml:"instance_addr" doc:"hidden"`
	TokensFilePath         string   `yaml:"tokens_file_path"`
	UnregisterOnShutdown   bool     `yaml:"unregister_on_shutdown"`

	// Injected internally
	ListenPort int `yaml:"-"`

	WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout"`

	ObservePeriod time.Duration `yaml:"-"`
}

RingConfig masks the ring lifecycler config which contains many options not really required by the compactors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.

func (*RingConfig) RegisterFlags added in v0.7.0

func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*RingConfig) ToLifecyclerConfig added in v0.7.0

func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig

ToLifecyclerConfig returns a LifecyclerConfig based on the compactor ring config.

type ShardedBlockPopulator added in v1.19.0

type ShardedBlockPopulator struct {
	// contains filtered or unexported fields
}

func (ShardedBlockPopulator) PopulateBlock added in v1.19.0

func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb.CompactorMetrics, _ log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []tsdb.BlockReader, meta *tsdb.BlockMeta, indexw tsdb.IndexWriter, chunkw tsdb.ChunkWriter, postingsFunc tsdb.IndexReaderPostingsFunc) (err error)

PopulateBlock fills the index and chunk writers with new data gathered as the union of the provided blocks. It returns meta information for the new block. It expects sorted blocks input by mint. The main logic is copied from tsdb.DefaultPopulateBlockFunc

type ShardedCompactionLifecycleCallback added in v1.19.0

type ShardedCompactionLifecycleCallback struct {
	// contains filtered or unexported fields
}

func NewShardedCompactionLifecycleCallback added in v1.19.0

func NewShardedCompactionLifecycleCallback(
	ctx context.Context,
	userBucket objstore.InstrumentedBucket,
	logger log.Logger,
	metaSyncConcurrency int,
	compactDir string,
	userID string,
	compactorMetrics *compactorMetrics,
) *ShardedCompactionLifecycleCallback

func (*ShardedCompactionLifecycleCallback) GetBlockPopulator added in v1.19.0

func (*ShardedCompactionLifecycleCallback) PostCompactionCallback added in v1.19.0

func (c *ShardedCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, logger log.Logger, cg *compact.Group, _ ulid.ULID) error

func (*ShardedCompactionLifecycleCallback) PreCompactionCallback added in v1.19.0

func (c *ShardedCompactionLifecycleCallback) PreCompactionCallback(_ context.Context, logger log.Logger, g *compact.Group, meta []*metadata.Meta) error

type ShuffleShardingGrouper added in v1.13.0

type ShuffleShardingGrouper struct {
	// contains filtered or unexported fields
}

func NewShuffleShardingGrouper added in v1.13.0

func NewShuffleShardingGrouper(
	ctx context.Context,
	logger log.Logger,
	bkt objstore.InstrumentedBucket,
	acceptMalformedIndex bool,
	enableVerticalCompaction bool,
	blocksMarkedForNoCompact prometheus.Counter,
	hashFunc metadata.HashFunc,
	syncerMetrics *compact.SyncerMetrics,
	compactorMetrics *compactorMetrics,
	compactorCfg Config,
	ring ring.ReadRing,
	ringLifecyclerAddr string,
	ringLifecyclerID string,
	limits Limits,
	userID string,
	blockFilesConcurrency int,
	blocksFetchConcurrency int,
	compactionConcurrency int,
	blockVisitMarkerTimeout time.Duration,
	blockVisitMarkerReadFailed prometheus.Counter,
	blockVisitMarkerWriteFailed prometheus.Counter,
	noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark,
) *ShuffleShardingGrouper

func (*ShuffleShardingGrouper) Groups added in v1.13.0

func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error)

Groups function modified from https://github.com/cortexproject/cortex/pull/2616

type ShuffleShardingPlanner added in v1.13.0

type ShuffleShardingPlanner struct {
	// contains filtered or unexported fields
}

func NewShuffleShardingPlanner added in v1.13.0

func NewShuffleShardingPlanner(
	ctx context.Context,
	bkt objstore.InstrumentedBucket,
	logger log.Logger,
	ranges []int64,
	noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark,
	ringLifecyclerID string,
	blockVisitMarkerTimeout time.Duration,
	blockVisitMarkerFileUpdateInterval time.Duration,
	blockVisitMarkerReadFailed prometheus.Counter,
	blockVisitMarkerWriteFailed prometheus.Counter,
) *ShuffleShardingPlanner

func (*ShuffleShardingPlanner) Plan added in v1.13.0

func (p *ShuffleShardingPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error)

type TimeRangeChecker added in v1.19.0

type TimeRangeChecker struct {
	// This is a map of timeRange to a map of rangeStart to timeRangeStatus
	TimeRangesStatus map[int64]map[int64]*timeRangeStatus
}

func NewCompletenessChecker added in v1.19.0

func NewCompletenessChecker(blocks map[ulid.ULID]*metadata.Meta, groups []blocksGroupWithPartition, timeRanges []int64) TimeRangeChecker

type VisitMarker added in v1.18.0

type VisitMarker interface {
	GetVisitMarkerFilePath() string
	UpdateStatus(ownerIdentifier string, status VisitStatus)
	GetStatus() VisitStatus
	IsExpired(visitMarkerTimeout time.Duration) bool
	String() string
}

type VisitMarkerManager added in v1.18.0

type VisitMarkerManager struct {
	// contains filtered or unexported fields
}

func NewVisitMarkerManager added in v1.18.0

func NewVisitMarkerManager(
	bkt objstore.InstrumentedBucket,
	logger log.Logger,
	ownerIdentifier string,
	visitMarker VisitMarker,
) *VisitMarkerManager

func (*VisitMarkerManager) DeleteVisitMarker added in v1.18.0

func (v *VisitMarkerManager) DeleteVisitMarker(ctx context.Context)

func (*VisitMarkerManager) HeartBeat added in v1.18.0

func (v *VisitMarkerManager) HeartBeat(ctx context.Context, errChan <-chan error, visitMarkerFileUpdateInterval time.Duration, deleteOnExit bool)

func (*VisitMarkerManager) MarkWithStatus added in v1.18.0

func (v *VisitMarkerManager) MarkWithStatus(ctx context.Context, status VisitStatus)

func (*VisitMarkerManager) ReadVisitMarker added in v1.18.0

func (v *VisitMarkerManager) ReadVisitMarker(ctx context.Context, visitMarker any) error

type VisitStatus added in v1.18.0

type VisitStatus string
const (
	Pending    VisitStatus = "pending"
	InProgress VisitStatus = "inProgress"
	Completed  VisitStatus = "completed"
	Failed     VisitStatus = "failed"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳