sharding

package
v0.0.0-...-b9d6cac Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2025 License: MIT Imports: 9 Imported by: 6

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitializeMetrics

func InitializeMetrics(prefix string, config *Config) error

func SetGlobalMetrics

func SetGlobalMetrics(prefix string, metricsChan chan interface{})

func StopAndFlushMetrics

func StopAndFlushMetrics()

Types

type Config

type Config struct {
	*ghostferry.Config

	ShardingKey   string
	ShardingValue int64
	SourceDB      string
	TargetDB      string

	SourceReplicationMaster       *ghostferry.DatabaseConfig
	ReplicatedMasterPositionQuery string
	RunFerryFromReplica           bool

	StatsDAddress string

	JoinedTables map[string][]JoinTable

	// IgnoredTables and IncludedTables are mutually exclusive. Specifying both is an error.
	IgnoredTables  []string
	IncludedTables []string

	PrimaryKeyTables []string

	Throttle *ghostferry.LagThrottlerConfig

	// ShardedCopyFilterConfig is used to configure the sharded copy filter query.
	ShardedCopyFilterConfig *ShardedCopyFilterConfig
}

func (*Config) ValidateConfig

func (c *Config) ValidateConfig() error

type IndexConfigPerTable

type IndexConfigPerTable struct {
	IndexHint string
	IndexName string
}

type JoinTable

type JoinTable struct {
	TableName, JoinColumn string
}

type ShardedCopyFilter

type ShardedCopyFilter struct {
	ShardingKey      string
	ShardingValue    interface{}
	JoinedTables     map[string][]JoinTable
	PrimaryKeyTables map[string]struct{}

	IndexHint           string
	IndexConfigPerTable map[string]IndexConfigPerTable
	// contains filtered or unexported fields
}

func NewShardedCopyFilter

func NewShardedCopyFilter(config *Config) *ShardedCopyFilter

func (*ShardedCopyFilter) ApplicableEvent

func (f *ShardedCopyFilter) ApplicableEvent(event ghostferry.DMLEvent) (bool, error)

func (*ShardedCopyFilter) BuildSelect

func (f *ShardedCopyFilter) BuildSelect(columns []string, table *ghostferry.TableSchema, lastPaginationKey, batchSize uint64) (sq.SelectBuilder, error)

type ShardedCopyFilterConfig

type ShardedCopyFilterConfig struct {

	// See https://dev.mysql.com/doc/refman/8.0/en/index-hints.html for more information on index hints.
	IndexHint string // none or force or use

	// IndexHintingPerTable has greatest specificity and takes precedence over the other options if specified.
	// Otherwise it will inherit the higher level IndexHint.
	// IndexName option is ignored if IndexHint is "none".
	// IndexName option is ignored if it is not an index on the table.
	//
	// example:
	// IndexHintingPerTable: {
	//   "blog": {
	//     "users": {
	//       "IndexHint": "force",
	//       "IndexName": "ix_users_some_id"
	//     }
	//   }
	// }
	IndexHintingPerTable map[string]map[string]IndexConfigPerTable
}

func (*ShardedCopyFilterConfig) IndexConfigForTables

func (c *ShardedCopyFilterConfig) IndexConfigForTables(schemaName string) map[string]IndexConfigPerTable

SchemaName => TableName => IndexConfig

func (*ShardedCopyFilterConfig) Validate

func (c *ShardedCopyFilterConfig) Validate() error

type ShardedTableFilter

type ShardedTableFilter struct {
	SourceShard      string
	ShardingKey      string
	JoinedTables     map[string][]JoinTable
	Type             ShardedTableFilterType
	Tables           []*regexp.Regexp
	PrimaryKeyTables map[string]struct{}
}

func (*ShardedTableFilter) ApplicableDatabases

func (s *ShardedTableFilter) ApplicableDatabases(dbs []string) ([]string, error)

func (*ShardedTableFilter) ApplicableTables

func (s *ShardedTableFilter) ApplicableTables(tables []*ghostferry.TableSchema) (applicable []*ghostferry.TableSchema, err error)

type ShardedTableFilterType

type ShardedTableFilterType int64
const (
	IgnoredTablesFilter ShardedTableFilterType = iota
	IncludedTablesFilter
)

type ShardingFerry

type ShardingFerry struct {
	Ferry *ghostferry.Ferry
	// contains filtered or unexported fields
}

func NewFerry

func NewFerry(config *Config) (*ShardingFerry, error)

func (*ShardingFerry) AbortIfTargetDbNoLongerWriteable

func (r *ShardingFerry) AbortIfTargetDbNoLongerWriteable()

func (*ShardingFerry) Initialize

func (r *ShardingFerry) Initialize() error

func (*ShardingFerry) Run

func (r *ShardingFerry) Run()

func (*ShardingFerry) Start

func (r *ShardingFerry) Start() error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳