tidb

package
v0.0.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2025 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Overview

Copyright © 2020 Marvin

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://wwc.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const BatchVersion uint64 = 1

BatchVersion represents the version of ticdc batch key-value format

Variables

View Source
var ColumnTypeMap = map[ColumnType]string{
	TypeUnspecified:       "NONE",
	TypeTinyint:           "TINYINT",
	TypeBool:              "BOOL",
	TypeSmallint:          "SMALLINT",
	TypeInt:               "INT",
	TypeFloat:             "FLOAT",
	TypeDouble:            "DOUBLE",
	TypeNull:              "NULL",
	TypeTimestamp:         "TIMESTAMP",
	TypeBigint:            "BIGINT",
	TypeMediumint:         "MEDIUMINT",
	TypeDate:              "DATE",
	TypeTime:              "TIME",
	TypeDatetime:          "DATETIME",
	TypeYear:              "YEAR",
	TypeNewDate:           "DATE",
	TypeVarchar:           "VARCHAR",
	TypeBit:               "BIT",
	TypeJSON:              "JSON",
	TypeDecimal:           "DECIMAL",
	TypeEnum:              "ENUM",
	TypeSet:               "SET",
	TypeTinyBlob:          "TINYBLOB",
	TypeMediumBlob:        "MEDIUMBLOB",
	TypeLongBlob:          "LONGBLOB",
	TypeBlob:              "BLOB",
	TypeVarbinary:         "VARBINARY",
	TypeChar:              "CHAR",
	TypeBinary:            "Binary",
	TypeGeometry:          "GEOMETRY",
	TypeTiDBVectorFloat32: "VECTOR",
	TypeTinyText:          "TINYTEXT",
	TypeMediumText:        "MEDIUMTEXT",
	TypeText:              "TEXT",
	TypeLongText:          "LONGTEXT",
}
View Source
var DDLTypeMap = map[DDLType]string{
	DDLCreateSchema:                  "create schema",
	DDLDropSchema:                    "drop schema",
	DDLCreateTable:                   "create table",
	DDLCreateTables:                  "create tables",
	DDLDropTable:                     "drop table",
	DDLAddColumn:                     "add column",
	DDLDropColumn:                    "drop column",
	DDLAddIndex:                      "add index",
	DDLDropIndex:                     "drop index",
	DDLAddForeignKey:                 "add foreign key",
	DDLDropForeignKey:                "drop foreign key",
	DDLTruncateTable:                 "truncate table",
	DDLModifyColumn:                  "modify column",
	DDLRebaseAutoID:                  "rebase auto_increment ID",
	DDLRenameTable:                   "rename table",
	DDLRenameTables:                  "rename tables",
	DDLSetDefaultValue:               "set default value",
	DDLShardRowID:                    "shard row ID",
	DDLModifyTableComment:            "modify table comment",
	DDLRenameIndex:                   "rename index",
	DDLAddTablePartition:             "add partition",
	DDLDropTablePartition:            "drop partition",
	DDLCreateView:                    "create view",
	DDLModifyTableCharsetAndCollate:  "modify table charset and collate",
	DDLTruncateTablePartition:        "truncate partition",
	DDLDropView:                      "drop view",
	DDLRecoverTable:                  "recover table",
	DDLModifySchemaCharsetAndCollate: "modify schema charset and collate",
	DDLLockTable:                     "lock table",
	DDLUnlockTable:                   "unlock table",
	DDLRepairTable:                   "repair table",
	DDLSetTiFlashReplica:             "set tiflash replica",
	DDLUpdateTiFlashReplicaStatus:    "update tiflash replica status",
	DDLAddPrimaryKey:                 "add primary key",
	DDLDropPrimaryKey:                "drop primary key",
	DDLCreateSequence:                "create sequence",
	DDLAlterSequence:                 "alter sequence",
	DDLDropSequence:                  "drop sequence",
	DDLModifyTableAutoIDCache:        "modify auto id cache",
	DDLRebaseAutoRandomBase:          "rebase auto_random ID",
	DDLAlterIndexVisibility:          "alter index visibility",
	DDLExchangeTablePartition:        "exchange partition",
	DDLAddCheckConstraint:            "add check constraint",
	DDLDropCheckConstraint:           "drop check constraint",
	DDLAlterCheckConstraint:          "alter check constraint",
	DDLAlterTableAttributes:          "alter table attributes",
	DDLAlterTablePartitionPlacement:  "alter table partition placement",
	DDLAlterTablePartitionAttributes: "alter table partition attributes",
	DDLCreatePlacementPolicy:         "create placement policy",
	DDLAlterPlacementPolicy:          "alter placement policy",
	DDLDropPlacementPolicy:           "drop placement policy",
	DDLModifySchemaDefaultPlacement:  "modify schema default placement",
	DDLAlterTablePlacement:           "alter table placement",
	DDLAlterCacheTable:               "alter table cache",
	DDLAlterNoCacheTable:             "alter table nocache",
	DDLAlterTableStatsOptions:        "alter table statistics options",
	DDLMultiSchemaChange:             "alter table multi-schema change",
	DDLFlashbackCluster:              "flashback cluster",
	DDLRecoverSchema:                 "flashback schema",
	DDLReorganizePartition:           "alter table reorganize partition",
	DDLAlterTTLInfo:                  "alter table ttl",
	DDLAlterTTLRemove:                "alter table no_ttl",
	DDLCreateResourceGroup:           "create resource group",
	DDLAlterResourceGroup:            "alter resource group",
	DDLDropResourceGroup:             "drop resource group",
	DDLAlterTablePartitioning:        "alter table partition by",
	DDLRemovePartitioning:            "alter table remove partitioning",
	DDLAddVectorIndex:                "add vector index",
	// contains filtered or unexported fields
}

DDLTypeMap is the map of DDL DDLType to string.

Functions

func GetTableSortedColumnChangedEvent

func GetTableSortedColumnChangedEvent(cols []*ConvColumn) (map[string]interface{}, []*columnAttr, map[string]string, map[string]interface{})

func QuoteSchemaTable

func QuoteSchemaTable(schema string, table string) string

QuoteSchemaTable quotes a table name

Types

type BatchDecoder

type BatchDecoder struct {
	// contains filtered or unexported fields
}
Solved Case:
	key [A{"ts":454376015366979596,"scm":"marvin","tbl":"t1","rid":1,"t":1}A{"ts":454376015366979596,"scm":"marvin","tbl":"t1","rid":3,"t":1}]
	values [I{"u":{"id":{"t":3,"h":true,"f":11,"v":1},"val":{"t":15,"f":64,"v":"aa"}}}I{"u":{"id":{"t":3,"h":true,"f":11,"v":3},"val":{"t":15,"f":64,"v":"cc"}}}]

BatchDecoder decodes the byte of a batch into the original messages.

func (*BatchDecoder) AddKeyValue

func (b *BatchDecoder) AddKeyValue(key, value []byte) error

AddKeyValue implements the RowEventDecoder interface

func (*BatchDecoder) HasNext

func (b *BatchDecoder) HasNext() (MsgEventType, bool, error)

HasNext implements the RowEventDecoder interface

func (*BatchDecoder) NextDDLEvent

func (b *BatchDecoder) NextDDLEvent() (*DDLChangedEvent, error)

NextDDLEvent implements the RowEventDecoder interface

func (*BatchDecoder) NextResolvedEvent

func (b *BatchDecoder) NextResolvedEvent() (uint64, error)

NextResolvedEvent implements the RowEventDecoder interface

func (*BatchDecoder) NextRowChangedEvent

func (b *BatchDecoder) NextRowChangedEvent() (*RowChangedEvent, error)

NextRowChangedEvent implements the RowEventDecoder interface

type Column

type Column struct {
	ColumnType  ColumnType     `json:"t"`
	WhereHandle bool           `json:"h,omitempty"`
	ColumnFlag  ColumnFlagType `json:"f"`
	ColumnValue interface{}    `json:"v"`
}

func FormatColumn

func FormatColumn(c Column) (Column, error)

TypeTinyBlob ColumnType = 249 // TINYTEXT/TINYBLOB -> 249 TypeMediumBlob ColumnType = 250 // MEDIUMTEXT/MEDIUMBLOB -> 250 TypeLongBlob ColumnType = 251 // LONGTEXT/LONGBLOB -> 251 TypeBlob ColumnType = 252 // TEXT/BLOB -> 252

The same field type id of the message event generated by tidb ticdc may represent different data types, and different data types correspond to different downstream database data types, such as text -> clob, blob -> blob. The consumption process cannot identify the specific downstream data type and needs to identify it based on downstream metadata to determine whether it is passed in string or []byte format.

FormatColumn formats a codec column.

func (*Column) ConvRowChangeColumn

func (c *Column) ConvRowChangeColumn(name string) (*ConvColumn, error)

ConvRowChangeColumn converts from a codec column to a row changed event column.

func (*Column) String

func (m *Column) String() string

type ColumnFlagType

type ColumnFlagType Flag

ColumnFlagType is for encapsulating the flag operations for different flags.

const (
	// BinaryFlag means the column charset is binary
	BinaryFlag ColumnFlagType = 1 << ColumnFlagType(iota)
	// HandleKeyFlag means the column is selected as the handle key
	// The handleKey is chosen by the following rules in the order:
	// 1. if the table has primary key, it's the handle key.
	// 2. If the table has not null unique key, it's the handle key.
	// 3. If the table has no primary key and no not null unique key, it has no handleKey.
	HandleKeyFlag
	// GeneratedColumnFlag means the column is a generated column
	GeneratedColumnFlag
	// PrimaryKeyFlag means the column is primary key
	PrimaryKeyFlag
	// UniqueKeyFlag means the column is unique key
	UniqueKeyFlag
	// MultipleKeyFlag means the column is multiple key
	MultipleKeyFlag
	// NullableFlag means the column is nullable
	NullableFlag
	// UnsignedFlag means the column stores an unsigned integer
	UnsignedFlag
)

func (ColumnFlagType) IsBinary

func (b ColumnFlagType) IsBinary() bool

IsBinary shows whether BinaryFlag is set

func (ColumnFlagType) IsGeneratedColumn

func (b ColumnFlagType) IsGeneratedColumn() bool

IsGeneratedColumn shows whether GeneratedColumn is set

func (ColumnFlagType) IsHandleKey

func (b ColumnFlagType) IsHandleKey() bool

IsHandleKey shows whether HandleKey is set

func (ColumnFlagType) IsMultipleKey

func (b ColumnFlagType) IsMultipleKey() bool

IsMultipleKey shows whether MultipleKeyFlag is set

func (ColumnFlagType) IsNullable

func (b ColumnFlagType) IsNullable() bool

IsNullable shows whether NullableFlag is set

func (ColumnFlagType) IsPrimaryKey

func (b ColumnFlagType) IsPrimaryKey() bool

IsPrimaryKey shows whether PrimaryKeyFlag is set

func (ColumnFlagType) IsUniqueKey

func (b ColumnFlagType) IsUniqueKey() bool

IsUniqueKey shows whether UniqueKeyFlag is set

func (ColumnFlagType) IsUnsigned

func (b ColumnFlagType) IsUnsigned() bool

IsUnsigned shows whether UnsignedFlag is set

type ColumnType

type ColumnType uint64
const (
	TypeUnspecified ColumnType = 0
	TypeTinyint     ColumnType = 1 // TINYINT / BOOL
	TypeSmallint    ColumnType = 2 // SMALLINT
	TypeInt         ColumnType = 3 // INT
	TypeFloat       ColumnType = 4
	TypeDouble      ColumnType = 5
	TypeNull        ColumnType = 6
	TypeTimestamp   ColumnType = 7
	TypeBigint      ColumnType = 8  // BIGINT
	TypeMediumint   ColumnType = 9  // MEDIUMINT
	TypeDate        ColumnType = 10 // Date -> 10/14
	TypeTime        ColumnType = 11
	TypeDatetime    ColumnType = 12
	TypeYear        ColumnType = 13
	TypeNewDate     ColumnType = 14
	// value 编码为 UTF-8;当上游类型为 VARBINARY 时,将对不可见的字符转义
	TypeVarchar ColumnType = 15 // VARCHAR/VARBINARY
	TypeBit     ColumnType = 16

	TypeJSON    ColumnType = 245
	TypeDecimal ColumnType = 246
	TypeEnum    ColumnType = 247
	TypeSet     ColumnType = 248

	// value 编码为 Base64
	TypeTinyBlob   ColumnType = 249 // TINYTEXT/TINYBLOB -> 249
	TypeMediumBlob ColumnType = 250 // MEDIUMTEXT/MEDIUMBLOB -> 250
	TypeLongBlob   ColumnType = 251 // LONGTEXT/LONGBLOB -> 251
	TypeBlob       ColumnType = 252 // TEXT/BLOB -> 252
	TypeVarbinary  ColumnType = 253 // 经过测试 varchar/varbinary 输出都是 15,253 可先不管

	// value 编码为 UTF-8;当上游类型为 BINARY 时,将对不可见的字符转义
	TypeChar ColumnType = 254 // CHAR/BINARY

	/// not support
	TypeGeometry          ColumnType = 255
	TypeTiDBVectorFloat32 ColumnType = 256

	// 处理同个 ColumnType 代表不同数据类型 Case
	TypeBool       ColumnType = 1001
	TypeBinary     ColumnType = 1002
	TypeTinyText   ColumnType = 1003
	TypeMediumText ColumnType = 1004
	TypeText       ColumnType = 1005
	TypeLongText   ColumnType = 1006
)

func (ColumnType) ColumnType

func (c ColumnType) ColumnType() string

type Constraint added in v0.0.19

type Constraint struct {
	Task                 *task.Task
	SchemaRoute          *rule.SchemaRouteRule
	DatabaseS, DatabaseT database.IDatabase
	TaskTables           []string
	TableThread          int
	TableRoutes          []*rule.TableRouteRule
	// contains filtered or unexported fields
}

func (*Constraint) Inspect added in v0.0.19

func (c *Constraint) Inspect(ctx context.Context) error

func (*Constraint) InspectDownstream added in v0.0.19

func (c *Constraint) InspectDownstream(ctx context.Context) error

func (*Constraint) InspectUpstream added in v0.0.19

func (c *Constraint) InspectUpstream(ctx context.Context) error

TiDB TiCDC index-value dispatcher update event compatible https://docs.pingcap.com/zh/tidb/dev/ticdc-split-update-behavior v6.5 [>=v6.5.5] tidb database version greater than v6.5.5 and less than v7.0.0 All versions are supported normally v7 version and above [>=v7.1.2] all versions of the tidb database version greater than v7.1.2 can be supported normally

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a specific consumer

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

ConsumerGroup represents the consumer manager

func NewConsumerGroup

func NewConsumerGroup(
	ctx context.Context,
	task *task.Task,
	schemaRoute *rule.SchemaRouteRule,
	tableRoute []*rule.TableRouteRule,
	columnRoute []*rule.ColumnRouteRule,
	consumeTables []string,
	param *pb.CdcConsumeParam,
	partitions []int,
	dbTypeT string,
	databaseS, databaseT database.IDatabase) *ConsumerGroup

func (*ConsumerGroup) AppendDDL added in v0.0.19

func (cg *ConsumerGroup) AppendDDL(key *DDLChangedEvent, value int)

func (*ConsumerGroup) Cancel added in v0.0.19

func (cg *ConsumerGroup) Cancel(partition int) error

func (*ConsumerGroup) Close

func (cg *ConsumerGroup) Close() error

func (*ConsumerGroup) CommitMessage

func (cg *ConsumerGroup) CommitMessage(c *Consumer, msg kafka.Message) error

func (*ConsumerGroup) ConsumeMessage added in v0.0.19

func (cg *ConsumerGroup) ConsumeMessage(c *Consumer) (bool, error)

func (*ConsumerGroup) Consumers added in v0.0.19

func (cg *ConsumerGroup) Consumers() map[int]*Consumer

func (*ConsumerGroup) Coordinators added in v0.0.19

func (cg *ConsumerGroup) Coordinators(key *DDLChangedEvent) int

func (*ConsumerGroup) Coroutines added in v0.0.19

func (cg *ConsumerGroup) Coroutines(c *Consumer)

func (*ConsumerGroup) Get added in v0.0.19

func (cg *ConsumerGroup) Get(partition int) (*Consumer, error)

func (*ConsumerGroup) GetDDL added in v0.0.19

func (cg *ConsumerGroup) GetDDL() *DDLChangedEvent

func (*ConsumerGroup) IsEventDDLFlush added in v0.0.19

func (cg *ConsumerGroup) IsEventDDLFlush(key *DDLChangedEvent) bool

func (*ConsumerGroup) IsEventResolvedFlush added in v0.0.19

func (cg *ConsumerGroup) IsEventResolvedFlush(resolvedTs uint64) bool

func (*ConsumerGroup) ObsoleteMessages added in v0.0.19

func (cg *ConsumerGroup) ObsoleteMessages(currentTs, nowTs uint64) bool

ObsoleteMessages 1. Initially start filtering and filtration of synchronously consumed events 2. During the operation, the corresponding partition DDL/ResolvedTs Event is refreshed. The reason is that CDC guarantees that all events before the DDL/ResolvedTs Event have been sent, and there should be no more events smaller than the DDL/ResolvedTs Event Ts

func (*ConsumerGroup) Partitions added in v0.0.19

func (cg *ConsumerGroup) Partitions(key *DDLChangedEvent) []int

func (*ConsumerGroup) Pause

func (cg *ConsumerGroup) Pause(partition int) error

func (*ConsumerGroup) PopDDL added in v0.0.19

func (cg *ConsumerGroup) PopDDL()

func (*ConsumerGroup) RemoveDDL added in v0.0.19

func (cg *ConsumerGroup) RemoveDDL(key *DDLChangedEvent) error

func (*ConsumerGroup) Resume

func (cg *ConsumerGroup) Resume(partition int) error

func (*ConsumerGroup) Run

func (cg *ConsumerGroup) Run() error

func (*ConsumerGroup) Set added in v0.0.19

func (cg *ConsumerGroup) Set(partition int, c *Consumer)

func (*ConsumerGroup) Start

func (cg *ConsumerGroup) Start(partition int) error

func (*ConsumerGroup) Stop

func (cg *ConsumerGroup) Stop(partition int) error

func (*ConsumerGroup) WriteMessage

func (cg *ConsumerGroup) WriteMessage(c *Consumer, msg kafka.Message) (bool, error)

WriteMessage is to decode kafka message to event.

type ConvColumn

type ConvColumn struct {
	ColumnName  string         `json:"columnName"`
	ColumnType  string         `json:"columnType"`
	ColumnFlag  ColumnFlagType `json:"columnFlag"`
	ColumnValue interface{}    `json:"columnValue"`
}

type DDLChangedEvent

type DDLChangedEvent struct {
	CommitTs   uint64  `json:"commitTs"`
	SchemaName string  `json:"schemaName"`
	TableName  string  `json:"tableName"`
	DdlQuery   string  `json:"ddlQuery"`
	DdlType    DDLType `json:"ddlType"`
}

func MsgConvDDLEvent

func MsgConvDDLEvent(caseFieldRuleS string, key *MessageEventKey, value *MessageDDLEventValue) *DDLChangedEvent

func (*DDLChangedEvent) String

func (d *DDLChangedEvent) String() string

type DDLChangedEvents

type DDLChangedEvents []*DDLChangedEvent

DDLChangedEvents is a slice of DDLChangedEvent and implements the sort.Interface interface.

func (*DDLChangedEvents) Add

func (d *DDLChangedEvents) Add(event *DDLChangedEvent)

Add a new DDLChangedEvent and keep the ddls sorted by CommitTs

func (DDLChangedEvents) Len

func (d DDLChangedEvents) Len() int

func (DDLChangedEvents) Less

func (d DDLChangedEvents) Less(i, j int) bool

func (DDLChangedEvents) Swap

func (d DDLChangedEvents) Swap(i, j int)

type DDLType

type DDLType int

DDLType is the type for DDL DDL.

const (
	DDLNone                          DDLType = 0
	DDLCreateSchema                  DDLType = 1
	DDLDropSchema                    DDLType = 2
	DDLCreateTable                   DDLType = 3
	DDLDropTable                     DDLType = 4
	DDLAddColumn                     DDLType = 5
	DDLDropColumn                    DDLType = 6
	DDLAddIndex                      DDLType = 7
	DDLDropIndex                     DDLType = 8
	DDLAddForeignKey                 DDLType = 9
	DDLDropForeignKey                DDLType = 10
	DDLTruncateTable                 DDLType = 11
	DDLModifyColumn                  DDLType = 12
	DDLRebaseAutoID                  DDLType = 13
	DDLRenameTable                   DDLType = 14
	DDLSetDefaultValue               DDLType = 15
	DDLShardRowID                    DDLType = 16
	DDLModifyTableComment            DDLType = 17
	DDLRenameIndex                   DDLType = 18
	DDLAddTablePartition             DDLType = 19
	DDLDropTablePartition            DDLType = 20
	DDLCreateView                    DDLType = 21
	DDLModifyTableCharsetAndCollate  DDLType = 22
	DDLTruncateTablePartition        DDLType = 23
	DDLDropView                      DDLType = 24
	DDLRecoverTable                  DDLType = 25
	DDLModifySchemaCharsetAndCollate DDLType = 26
	DDLLockTable                     DDLType = 27
	DDLUnlockTable                   DDLType = 28
	DDLRepairTable                   DDLType = 29
	DDLSetTiFlashReplica             DDLType = 30
	DDLUpdateTiFlashReplicaStatus    DDLType = 31
	DDLAddPrimaryKey                 DDLType = 32
	DDLDropPrimaryKey                DDLType = 33
	DDLCreateSequence                DDLType = 34
	DDLAlterSequence                 DDLType = 35
	DDLDropSequence                  DDLType = 36
	DDLAddColumns                    DDLType = 37 // Deprecated, we use DDLMultiSchemaChange instead.
	DDLDropColumns                   DDLType = 38 // Deprecated, we use DDLMultiSchemaChange instead.
	DDLModifyTableAutoIDCache        DDLType = 39
	DDLRebaseAutoRandomBase          DDLType = 40
	DDLAlterIndexVisibility          DDLType = 41
	DDLExchangeTablePartition        DDLType = 42
	DDLAddCheckConstraint            DDLType = 43
	DDLDropCheckConstraint           DDLType = 44
	DDLAlterCheckConstraint          DDLType = 45

	DDLRenameTables DDLType = 47

	DDLAlterTableAttributes          DDLType = 49
	DDLAlterTablePartitionAttributes DDLType = 50
	DDLCreatePlacementPolicy         DDLType = 51
	DDLAlterPlacementPolicy          DDLType = 52
	DDLDropPlacementPolicy           DDLType = 53
	DDLAlterTablePartitionPlacement  DDLType = 54
	DDLModifySchemaDefaultPlacement  DDLType = 55
	DDLAlterTablePlacement           DDLType = 56
	DDLAlterCacheTable               DDLType = 57
	// not used
	DDLAlterTableStatsOptions DDLType = 58
	DDLAlterNoCacheTable      DDLType = 59
	DDLCreateTables           DDLType = 60
	DDLMultiSchemaChange      DDLType = 61
	DDLFlashbackCluster       DDLType = 62
	DDLRecoverSchema          DDLType = 63
	DDLReorganizePartition    DDLType = 64
	DDLAlterTTLInfo           DDLType = 65
	DDLAlterTTLRemove         DDLType = 67
	DDLCreateResourceGroup    DDLType = 68
	DDLAlterResourceGroup     DDLType = 69
	DDLDropResourceGroup      DDLType = 70
	DDLAlterTablePartitioning DDLType = 71
	DDLRemovePartitioning     DDLType = 72
	DDLAddVectorIndex         DDLType = 73
)

List DDL actions.

func (DDLType) String

func (d DDLType) String() string

String return current ddl DDL in string

type EventGroup

type EventGroup struct {
	// contains filtered or unexported fields
}

EventGroup could store change ddl and dml event message.

func NewEventGroup

func NewEventGroup() *EventGroup

NewEventsGroup will create new event group.

func (*EventGroup) Append

func (g *EventGroup) Append(e *RowChangedEvent)

Append will append an event to event groups.

func (*EventGroup) DDLCommitTs

func (g *EventGroup) DDLCommitTs(ddlCommitTs uint64) []*RowChangedEvent

DDLCommitTs returns all events strictly < ddlCommitTs

func (*EventGroup) OrderSortedCommitTs added in v0.0.19

func (g *EventGroup) OrderSortedCommitTs() []uint64

OrderSortedCommitTs extract and sort all Commits

func (*EventGroup) RemoveDDLCommitTs added in v0.0.19

func (g *EventGroup) RemoveDDLCommitTs(ddlCommitTs uint64)

RemoveDDLCommitTs remove the ddl event equal to ddlCommit ts

func (*EventGroup) ResolvedTs

func (g *EventGroup) ResolvedTs(resolveTs uint64) []*RowChangedEvent

ResolvedTs will get events whose CommitTs is less than or equal to resolveTs, and at the same time remove events whose CommitTs is less than or equal to resolveTs from the original queue

type Flag

type Flag uint64

Flag is a uint64 flag to show a 64 bit mask

func (*Flag) Add

func (f *Flag) Add(flags ...Flag)

Add add flags

func (*Flag) Clear

func (f *Flag) Clear()

Clear clear all flags

func (Flag) HasAll

func (f Flag) HasAll(flags ...Flag) bool

HasAll means has all flags

func (Flag) HasOne

func (f Flag) HasOne(flags ...Flag) bool

HasOne means has one of the flags

func (*Flag) Remove

func (f *Flag) Remove(flags ...Flag)

Remove remove flags

type MessageDDLEventValue

type MessageDDLEventValue struct {
	DdlQuery string  `json:"q"`
	DdlType  DDLType `json:"t"`
}

func (*MessageDDLEventValue) Decode

func (m *MessageDDLEventValue) Decode(data []byte) error

func (*MessageDDLEventValue) Encode

func (m *MessageDDLEventValue) Encode() ([]byte, error)

type MessageEventKey

type MessageEventKey struct {
	CommitTs     uint64       `json:"ts"`
	SchemaName   string       `json:"scm,omitempty"`
	TableName    string       `json:"tbl,omitempty"`
	RowID        int64        `json:"rid,omitempty"`
	MsgEventType MsgEventType `json:"t"`
}

func (*MessageEventKey) Decode

func (m *MessageEventKey) Decode(data []byte) error

Decode codes a message key from a byte slice.

func (*MessageEventKey) Encode

func (m *MessageEventKey) Encode() ([]byte, error)

Encode encodes the message key to a byte slice.

type MessageRowEventValue

type MessageRowEventValue struct {
	// Update Event 统一拆分 Delete/Insert,Before 对应 Delete
	// 如果 before 值存在则说明 Upsert 对应 Update Event,否则说明 Upsert 对应 Insert Event
	Upsert map[string]Column `json:"u,omitempty"`
	Before map[string]Column `json:"p,omitempty"`
	Delete map[string]Column `json:"d,omitempty"`
}

func (*MessageRowEventValue) Decode

func (m *MessageRowEventValue) Decode(caseFieldRuleS string, data []byte) error

func (*MessageRowEventValue) Encode

func (m *MessageRowEventValue) Encode() ([]byte, error)

type Metadata added in v0.0.19

type Metadata struct {
	TaskName       string
	TaskFlow       string
	TaskMode       string
	SchemaNameS    string
	SchemaNameT    string
	TaskTables     []string
	TableThread    int
	DatabaseT      database.IDatabase
	TableRoutes    []*rule.TableRouteRule
	CaseFieldRuleS string
	CaseFieldRuleT string
}

TypeTinyBlob ColumnType = 249 // TINYTEXT/TINYBLOB -> 249 TypeMediumBlob ColumnType = 250 // MEDIUMTEXT/MEDIUMBLOB -> 250 TypeLongBlob ColumnType = 251 // LONGTEXT/LONGBLOB -> 251 TypeBlob ColumnType = 252 // TEXT/BLOB -> 252

The same field type id of the message event generated by tidb ticdc may represent different data types, and different data types correspond to different downstream database data types, such as text -> clob, blob -> blob. The consumption process cannot identify the specific downstream data type and needs to identify it based on downstream metadata to determine whether it is passed in string or []byte format.

func (*Metadata) GenDownstream added in v0.0.19

func (m *Metadata) GenDownstream(ctx context.Context) error

func (*Metadata) GenMetadata added in v0.0.19

func (m *Metadata) GenMetadata(ctx context.Context) error

type MetadataCache added in v0.0.19

type MetadataCache struct {
	// contains filtered or unexported fields
}

func NewMetadataCache added in v0.0.19

func NewMetadataCache() *MetadataCache

func (*MetadataCache) All added in v0.0.19

func (m *MetadataCache) All() string

All returns the all of entries in the cache.

func (*MetadataCache) Build added in v0.0.19

func (m *MetadataCache) Build(schemaName, tableName string) string

func (*MetadataCache) Delete added in v0.0.19

func (m *MetadataCache) Delete(schemaName, tableName string)

Delete removes the metadata for a given schema and table name.

func (*MetadataCache) DeleteSchema added in v0.0.19

func (m *MetadataCache) DeleteSchema(schemaName string)

func (*MetadataCache) Get added in v0.0.19

func (m *MetadataCache) Get(schemaName, tableName string) (*metadata, bool)

Get retrieves the metadata for a given schema and table name.

func (*MetadataCache) Set added in v0.0.19

func (m *MetadataCache) Set(schemaName, tableName string, metadata *metadata)

Set sets or updates the metadata for a given schema and table name.

func (*MetadataCache) Size added in v0.0.19

func (m *MetadataCache) Size() int

Size returns the number of entries in the cache.

type MsgEventType

type MsgEventType int

MsgEventType is the type of message, which is used by MqSink and RedoLog.

const (
	// MsgEventTypeUnknown is unknown type of message key
	MsgEventTypeUnknown MsgEventType = iota
	// MsgEventTypeRow is row type of message key
	MsgEventTypeRow
	// MsgEventTypeDDL is ddl type of message key
	MsgEventTypeDDL
	// MsgEventTypeResolved is resolved type of message key
	MsgEventTypeResolved
)

type RowChangedEvent

type RowChangedEvent struct {
	SchemaName string `json:"schemaName"`
	TableName  string `json:"tableName"`
	QueryType  string `json:"queryType"`
	CommitTs   uint64 `json:"commitTS"`

	IsDDL    bool   `json:"isDDL"`
	DdlQuery string `json:"ddlQuery"`

	// The table synchronized by TiCDC needs to have at least one valid index. The definition of a valid index is as follows:
	// 1,	the primary key (PRIMARY KEY) is a valid index.
	// 2,	each column in the unique index (UNIQUE INDEX) is explicitly defined as NOT NULL in the table structure and there are no virtual generated columns (VIRTUAL GENERATED COLUMNS).
	// Data synchronization TiCDC will select a valid index as the Handle Index. The HandleKeyFlag of the columns included in the Handle Index is set to 1.
	ValidUniqColumns map[string]interface{} `json:"validUniqColumns"`

	// Represents all field names, but Kafka information does not carry field offsets, and is not guaranteed to be strictly consistent with the order in which the fields are created in the table structure. Sort by field name
	Columns    []*columnAttr     `json:"columns"`
	ColumnType map[string]string `json:"columnType"`

	NewColumnData map[string]interface{} `json:"newColumnData"`
	// Only when this message is generated by an Update type event, record the name of each column and the data value before Update
	OldColumnData map[string]interface{} `json:"oldColumnData"`
}

TypeTinyBlob ColumnType = 249 // TINYTEXT/TINYBLOB -> 249 TypeMediumBlob ColumnType = 250 // MEDIUMTEXT/MEDIUMBLOB -> 250 TypeLongBlob ColumnType = 251 // LONGTEXT/LONGBLOB -> 251 TypeBlob ColumnType = 252 // TEXT/BLOB -> 252

The same field type id of the message event generated by tidb ticdc may represent different data types, and different data types correspond to different downstream database data types, such as text -> clob, blob -> blob. The consumption process cannot identify the specific downstream data type and needs to identify it based on downstream metadata to determine whether it is passed in string or []byte format.

RowChangedEvent store the ddl and dml event

func MsgConvRowChangedEvent

func MsgConvRowChangedEvent(key *MessageEventKey, value *MessageRowEventValue) (*RowChangedEvent, error)

func (*RowChangedEvent) Delete

func (e *RowChangedEvent) Delete(
	dbTypeT string,
	schemaNameT string,
	tableRoute []*rule.TableRouteRule,
	columnRoute []*rule.ColumnRouteRule,
	caseFieldRuleT string) (string, []interface{}, error)

func (*RowChangedEvent) Insert added in v0.0.19

func (e *RowChangedEvent) Insert(
	dbTypeT string,
	schemaNameT string,
	tableRoute []*rule.TableRouteRule,
	columnRoute []*rule.ColumnRouteRule,
	caseFieldRuleT string) (string, []interface{}, error)

func (*RowChangedEvent) String

func (e *RowChangedEvent) String() string

type RowEventDecoder

type RowEventDecoder interface {
	// AddKeyValue add the received key and values to the decoder,
	// should be called before `HasNext`
	// decoder decode the key and value into the event format.
	AddKeyValue(key, value []byte) error

	// HasNext returns
	//     1. the type of the next event
	//     2. a bool if the next event is exist
	//     3. error
	HasNext() (MsgEventType, bool, error)
	// NextResolvedEvent returns the next resolved event if exists
	NextResolvedEvent() (uint64, error)
	// NextRowChangedEvent returns the next row changed event if exists
	NextRowChangedEvent() (*RowChangedEvent, error)
	// NextDDLEvent returns the next DDL event if exists
	NextDDLEvent() (*DDLChangedEvent, error)
}

RowEventDecoder is an abstraction for events decoder this interface is only for testing now

func NewBatchDecoder

func NewBatchDecoder(caseFieldRuleS, msgCompresion string) RowEventDecoder

NewBatchDecoder creates a new BatchDecoder.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳