Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var DefaultConfig = Config{ MinLevel: core.LevelStreaming, MaxLevel: core.LevelStreaming, MinSegmentAge: time.Hour, MinSegmentCount: 10, MinSegmentSize: 1 * 1024 * 1024 * 1024, MaxSegmentCount: 10000, MaxSegmentSize: 4 * 1024 * 1024 * 1024, BatchSize: 10000, Delete: true, }
DefaultConfig is the default compaction configuration.
View Source
var ( // ErrSkipped indicates that compaction was skipped. ErrSkipped = errors.New("compaction skipped") )
Functions ¶
This section is empty.
Types ¶
type Compactor ¶
type Compactor struct {
// contains filtered or unexported fields
}
Compactor encapsulates segment compaction logic.
func NewCompactor ¶
func NewCompactor(store core.SegmentStore) (*Compactor, error)
NewCompactor creates a new Compactor instance
type Config ¶
type Config struct { // Minimum level of segments to include in compaction. MinLevel uint32 // Maximum level of segments to include in compaction. MaxLevel uint32 // Minimum age a segment must be in order to be considered for compaction. // // A higher value increases the chance segment was replicated to all // destinations and combats issues related to S3 eventual consistency model. MinSegmentAge time.Duration `required:"false" min:"0ms"` // Minimum number of segments required for compaction to run. MinSegmentCount int `min:"2"` // Minimum byte size of segments required for compaction to run. MinSegmentSize uint64 `min:"1"` // Maximum number of segments compacted in one run. MaxSegmentCount int `min:"2"` // Maximum byte size of segments compacted in one run. MaxSegmentSize uint64 `min:"1"` // Number of segment messages to read/write in each request // // A higher value usually results in better throughput. BatchSize int `min:"1"` // Allows to disable the removal of compacted segments. // // In normal operation, it does not make sense to keep around the compacted // segments, but it can be useful during troubleshooting. Delete bool }
Config represents the compaction configuration.
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
Controller represents the compaction controller.
func New ¶
func New(config ControllerConfig) (*Controller, error)
New returns a new compaction controller instance
func (*Controller) Compact ¶
func (c *Controller) Compact()
Compact starts the compaction for current assigned topic partitions that are not already running or scheduled to run.
type ControllerConfig ¶
type ControllerConfig struct { // Consumer is used to provide group membership functionality used to distribute // work across multiple instances. It ensures that only one instance is allowed to // process a certain source topic partition at any given moment. Consumer core.Factory `required:"true"` // SegmentStore provides access to segment contents. SegmentStore core.Factory `required:"true"` // Unique name that identifies the local region/data center/cloud. // // Field value is required. LocalRegion string `required:"true"` // Source Kafka topic names that will be compacted. // // Will use DefaultConfig if topic config was not set. // // Field value is required. Topics map[string]*Config `required:"true"` // Cron expression that determines compaction schedule. // // If not set, automatic compaction will not be executed and // is required to call Compact method to trigger the operation. CronSchedule string // Time zone location used for cron schedule // // Default is the system's local time zone. CronLocation *time.Location // Maximum number of compactions running simultaneously. Parallelism int `min:"1"` }
ControllerConfig represents the compaction controller configuration
Click to show internal directories.
Click to hide internal directories.