Documentation
¶
Overview ¶
Copyright © 2020 Marvin
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://wwc.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- Variables
- func GetTableSortedColumnChangedEvent(cols []*ConvColumn) (map[string]interface{}, []*columnAttr, map[string]string, ...)
- func QuoteSchemaTable(schema string, table string) string
- type BatchDecoder
- func (b *BatchDecoder) AddKeyValue(key, value []byte) error
- func (b *BatchDecoder) HasNext() (MsgEventType, bool, error)
- func (b *BatchDecoder) NextDDLEvent() (*DDLChangedEvent, error)
- func (b *BatchDecoder) NextResolvedEvent() (uint64, error)
- func (b *BatchDecoder) NextRowChangedEvent() (*RowChangedEvent, error)
- type Column
- type ColumnFlagType
- func (b ColumnFlagType) IsBinary() bool
- func (b ColumnFlagType) IsGeneratedColumn() bool
- func (b ColumnFlagType) IsHandleKey() bool
- func (b ColumnFlagType) IsMultipleKey() bool
- func (b ColumnFlagType) IsNullable() bool
- func (b ColumnFlagType) IsPrimaryKey() bool
- func (b ColumnFlagType) IsUniqueKey() bool
- func (b ColumnFlagType) IsUnsigned() bool
- type ColumnType
- type Constraint
- type Consumer
- type ConsumerGroup
- func (cg *ConsumerGroup) AppendDDL(key *DDLChangedEvent, value int)
- func (cg *ConsumerGroup) Cancel(partition int) error
- func (cg *ConsumerGroup) Close() error
- func (cg *ConsumerGroup) CommitMessage(c *Consumer, msg kafka.Message) error
- func (cg *ConsumerGroup) ConsumeMessage(c *Consumer) (bool, error)
- func (cg *ConsumerGroup) Consumers() map[int]*Consumer
- func (cg *ConsumerGroup) Coordinators(key *DDLChangedEvent) int
- func (cg *ConsumerGroup) Coroutines(c *Consumer)
- func (cg *ConsumerGroup) Get(partition int) (*Consumer, error)
- func (cg *ConsumerGroup) GetDDL() *DDLChangedEvent
- func (cg *ConsumerGroup) IsEventDDLFlush(key *DDLChangedEvent) bool
- func (cg *ConsumerGroup) IsEventResolvedFlush(resolvedTs uint64) bool
- func (cg *ConsumerGroup) ObsoleteMessages(currentTs, nowTs uint64) bool
- func (cg *ConsumerGroup) Partitions(key *DDLChangedEvent) []int
- func (cg *ConsumerGroup) Pause(partition int) error
- func (cg *ConsumerGroup) PopDDL()
- func (cg *ConsumerGroup) RemoveDDL(key *DDLChangedEvent) error
- func (cg *ConsumerGroup) Resume(partition int) error
- func (cg *ConsumerGroup) Run() error
- func (cg *ConsumerGroup) Set(partition int, c *Consumer)
- func (cg *ConsumerGroup) Start(partition int) error
- func (cg *ConsumerGroup) Stop(partition int) error
- func (cg *ConsumerGroup) WriteMessage(c *Consumer, msg kafka.Message) (bool, error)
- type ConvColumn
- type DDLChangedEvent
- type DDLChangedEvents
- type DDLType
- type EventGroup
- type Flag
- type MessageDDLEventValue
- type MessageEventKey
- type MessageRowEventValue
- type Metadata
- type MetadataCache
- func (m *MetadataCache) All() string
- func (m *MetadataCache) Build(schemaName, tableName string) string
- func (m *MetadataCache) Delete(schemaName, tableName string)
- func (m *MetadataCache) DeleteSchema(schemaName string)
- func (m *MetadataCache) Get(schemaName, tableName string) (*metadata, bool)
- func (m *MetadataCache) Set(schemaName, tableName string, metadata *metadata)
- func (m *MetadataCache) Size() int
- type MsgEventType
- type RowChangedEvent
- func (e *RowChangedEvent) Delete(dbTypeT string, schemaNameT string, tableRoute []*rule.TableRouteRule, ...) (string, []interface{}, error)
- func (e *RowChangedEvent) Insert(dbTypeT string, schemaNameT string, tableRoute []*rule.TableRouteRule, ...) (string, []interface{}, error)
- func (e *RowChangedEvent) String() string
- type RowEventDecoder
Constants ¶
const BatchVersion uint64 = 1
BatchVersion represents the version of ticdc batch key-value format
Variables ¶
var ColumnTypeMap = map[ColumnType]string{ TypeUnspecified: "NONE", TypeTinyint: "TINYINT", TypeBool: "BOOL", TypeSmallint: "SMALLINT", TypeInt: "INT", TypeFloat: "FLOAT", TypeDouble: "DOUBLE", TypeNull: "NULL", TypeTimestamp: "TIMESTAMP", TypeBigint: "BIGINT", TypeMediumint: "MEDIUMINT", TypeDate: "DATE", TypeTime: "TIME", TypeDatetime: "DATETIME", TypeYear: "YEAR", TypeNewDate: "DATE", TypeVarchar: "VARCHAR", TypeBit: "BIT", TypeJSON: "JSON", TypeDecimal: "DECIMAL", TypeEnum: "ENUM", TypeSet: "SET", TypeTinyBlob: "TINYBLOB", TypeMediumBlob: "MEDIUMBLOB", TypeLongBlob: "LONGBLOB", TypeBlob: "BLOB", TypeVarbinary: "VARBINARY", TypeChar: "CHAR", TypeBinary: "Binary", TypeGeometry: "GEOMETRY", TypeTiDBVectorFloat32: "VECTOR", TypeTinyText: "TINYTEXT", TypeMediumText: "MEDIUMTEXT", TypeText: "TEXT", TypeLongText: "LONGTEXT", }
var DDLTypeMap = map[DDLType]string{ DDLCreateSchema: "create schema", DDLDropSchema: "drop schema", DDLCreateTable: "create table", DDLCreateTables: "create tables", DDLDropTable: "drop table", DDLAddColumn: "add column", DDLDropColumn: "drop column", DDLAddIndex: "add index", DDLDropIndex: "drop index", DDLAddForeignKey: "add foreign key", DDLDropForeignKey: "drop foreign key", DDLTruncateTable: "truncate table", DDLModifyColumn: "modify column", DDLRebaseAutoID: "rebase auto_increment ID", DDLRenameTable: "rename table", DDLRenameTables: "rename tables", DDLSetDefaultValue: "set default value", DDLShardRowID: "shard row ID", DDLModifyTableComment: "modify table comment", DDLRenameIndex: "rename index", DDLAddTablePartition: "add partition", DDLDropTablePartition: "drop partition", DDLCreateView: "create view", DDLModifyTableCharsetAndCollate: "modify table charset and collate", DDLTruncateTablePartition: "truncate partition", DDLDropView: "drop view", DDLRecoverTable: "recover table", DDLModifySchemaCharsetAndCollate: "modify schema charset and collate", DDLLockTable: "lock table", DDLUnlockTable: "unlock table", DDLRepairTable: "repair table", DDLSetTiFlashReplica: "set tiflash replica", DDLUpdateTiFlashReplicaStatus: "update tiflash replica status", DDLAddPrimaryKey: "add primary key", DDLDropPrimaryKey: "drop primary key", DDLCreateSequence: "create sequence", DDLAlterSequence: "alter sequence", DDLDropSequence: "drop sequence", DDLModifyTableAutoIDCache: "modify auto id cache", DDLRebaseAutoRandomBase: "rebase auto_random ID", DDLAlterIndexVisibility: "alter index visibility", DDLExchangeTablePartition: "exchange partition", DDLAddCheckConstraint: "add check constraint", DDLDropCheckConstraint: "drop check constraint", DDLAlterCheckConstraint: "alter check constraint", DDLAlterTableAttributes: "alter table attributes", DDLAlterTablePartitionPlacement: "alter table partition placement", DDLAlterTablePartitionAttributes: "alter table partition attributes", DDLCreatePlacementPolicy: "create placement policy", DDLAlterPlacementPolicy: "alter placement policy", DDLDropPlacementPolicy: "drop placement policy", DDLModifySchemaDefaultPlacement: "modify schema default placement", DDLAlterTablePlacement: "alter table placement", DDLAlterCacheTable: "alter table cache", DDLAlterNoCacheTable: "alter table nocache", DDLAlterTableStatsOptions: "alter table statistics options", DDLMultiSchemaChange: "alter table multi-schema change", DDLFlashbackCluster: "flashback cluster", DDLRecoverSchema: "flashback schema", DDLReorganizePartition: "alter table reorganize partition", DDLAlterTTLInfo: "alter table ttl", DDLAlterTTLRemove: "alter table no_ttl", DDLCreateResourceGroup: "create resource group", DDLAlterResourceGroup: "alter resource group", DDLDropResourceGroup: "drop resource group", DDLAlterTablePartitioning: "alter table partition by", DDLRemovePartitioning: "alter table remove partitioning", DDLAddVectorIndex: "add vector index", // contains filtered or unexported fields }
DDLTypeMap is the map of DDL DDLType to string.
Functions ¶
func GetTableSortedColumnChangedEvent ¶
func GetTableSortedColumnChangedEvent(cols []*ConvColumn) (map[string]interface{}, []*columnAttr, map[string]string, map[string]interface{})
func QuoteSchemaTable ¶
QuoteSchemaTable quotes a table name
Types ¶
type BatchDecoder ¶
type BatchDecoder struct {
// contains filtered or unexported fields
}
Solved Case: key [A{"ts":454376015366979596,"scm":"marvin","tbl":"t1","rid":1,"t":1}A{"ts":454376015366979596,"scm":"marvin","tbl":"t1","rid":3,"t":1}] values [I{"u":{"id":{"t":3,"h":true,"f":11,"v":1},"val":{"t":15,"f":64,"v":"aa"}}}I{"u":{"id":{"t":3,"h":true,"f":11,"v":3},"val":{"t":15,"f":64,"v":"cc"}}}]
BatchDecoder decodes the byte of a batch into the original messages.
func (*BatchDecoder) AddKeyValue ¶
func (b *BatchDecoder) AddKeyValue(key, value []byte) error
AddKeyValue implements the RowEventDecoder interface
func (*BatchDecoder) HasNext ¶
func (b *BatchDecoder) HasNext() (MsgEventType, bool, error)
HasNext implements the RowEventDecoder interface
func (*BatchDecoder) NextDDLEvent ¶
func (b *BatchDecoder) NextDDLEvent() (*DDLChangedEvent, error)
NextDDLEvent implements the RowEventDecoder interface
func (*BatchDecoder) NextResolvedEvent ¶
func (b *BatchDecoder) NextResolvedEvent() (uint64, error)
NextResolvedEvent implements the RowEventDecoder interface
func (*BatchDecoder) NextRowChangedEvent ¶
func (b *BatchDecoder) NextRowChangedEvent() (*RowChangedEvent, error)
NextRowChangedEvent implements the RowEventDecoder interface
type Column ¶
type Column struct { ColumnType ColumnType `json:"t"` WhereHandle bool `json:"h,omitempty"` ColumnFlag ColumnFlagType `json:"f"` ColumnValue interface{} `json:"v"` }
func FormatColumn ¶
TypeTinyBlob ColumnType = 249 // TINYTEXT/TINYBLOB -> 249 TypeMediumBlob ColumnType = 250 // MEDIUMTEXT/MEDIUMBLOB -> 250 TypeLongBlob ColumnType = 251 // LONGTEXT/LONGBLOB -> 251 TypeBlob ColumnType = 252 // TEXT/BLOB -> 252
The same field type id of the message event generated by tidb ticdc may represent different data types, and different data types correspond to different downstream database data types, such as text -> clob, blob -> blob. The consumption process cannot identify the specific downstream data type and needs to identify it based on downstream metadata to determine whether it is passed in string or []byte format.
FormatColumn formats a codec column.
func (*Column) ConvRowChangeColumn ¶
func (c *Column) ConvRowChangeColumn(name string) (*ConvColumn, error)
ConvRowChangeColumn converts from a codec column to a row changed event column.
type ColumnFlagType ¶
type ColumnFlagType Flag
ColumnFlagType is for encapsulating the flag operations for different flags.
const ( // BinaryFlag means the column charset is binary BinaryFlag ColumnFlagType = 1 << ColumnFlagType(iota) // HandleKeyFlag means the column is selected as the handle key // The handleKey is chosen by the following rules in the order: // 1. if the table has primary key, it's the handle key. // 2. If the table has not null unique key, it's the handle key. // 3. If the table has no primary key and no not null unique key, it has no handleKey. HandleKeyFlag // GeneratedColumnFlag means the column is a generated column GeneratedColumnFlag // PrimaryKeyFlag means the column is primary key PrimaryKeyFlag // UniqueKeyFlag means the column is unique key UniqueKeyFlag // MultipleKeyFlag means the column is multiple key MultipleKeyFlag // NullableFlag means the column is nullable NullableFlag // UnsignedFlag means the column stores an unsigned integer UnsignedFlag )
func (ColumnFlagType) IsBinary ¶
func (b ColumnFlagType) IsBinary() bool
IsBinary shows whether BinaryFlag is set
func (ColumnFlagType) IsGeneratedColumn ¶
func (b ColumnFlagType) IsGeneratedColumn() bool
IsGeneratedColumn shows whether GeneratedColumn is set
func (ColumnFlagType) IsHandleKey ¶
func (b ColumnFlagType) IsHandleKey() bool
IsHandleKey shows whether HandleKey is set
func (ColumnFlagType) IsMultipleKey ¶
func (b ColumnFlagType) IsMultipleKey() bool
IsMultipleKey shows whether MultipleKeyFlag is set
func (ColumnFlagType) IsNullable ¶
func (b ColumnFlagType) IsNullable() bool
IsNullable shows whether NullableFlag is set
func (ColumnFlagType) IsPrimaryKey ¶
func (b ColumnFlagType) IsPrimaryKey() bool
IsPrimaryKey shows whether PrimaryKeyFlag is set
func (ColumnFlagType) IsUniqueKey ¶
func (b ColumnFlagType) IsUniqueKey() bool
IsUniqueKey shows whether UniqueKeyFlag is set
func (ColumnFlagType) IsUnsigned ¶
func (b ColumnFlagType) IsUnsigned() bool
IsUnsigned shows whether UnsignedFlag is set
type ColumnType ¶
type ColumnType uint64
const ( TypeUnspecified ColumnType = 0 TypeTinyint ColumnType = 1 // TINYINT / BOOL TypeSmallint ColumnType = 2 // SMALLINT TypeInt ColumnType = 3 // INT TypeFloat ColumnType = 4 TypeDouble ColumnType = 5 TypeNull ColumnType = 6 TypeTimestamp ColumnType = 7 TypeBigint ColumnType = 8 // BIGINT TypeMediumint ColumnType = 9 // MEDIUMINT TypeDate ColumnType = 10 // Date -> 10/14 TypeTime ColumnType = 11 TypeDatetime ColumnType = 12 TypeYear ColumnType = 13 TypeNewDate ColumnType = 14 // value 编码为 UTF-8;当上游类型为 VARBINARY 时,将对不可见的字符转义 TypeVarchar ColumnType = 15 // VARCHAR/VARBINARY TypeBit ColumnType = 16 TypeJSON ColumnType = 245 TypeDecimal ColumnType = 246 TypeEnum ColumnType = 247 TypeSet ColumnType = 248 // value 编码为 Base64 TypeTinyBlob ColumnType = 249 // TINYTEXT/TINYBLOB -> 249 TypeMediumBlob ColumnType = 250 // MEDIUMTEXT/MEDIUMBLOB -> 250 TypeLongBlob ColumnType = 251 // LONGTEXT/LONGBLOB -> 251 TypeBlob ColumnType = 252 // TEXT/BLOB -> 252 TypeVarbinary ColumnType = 253 // 经过测试 varchar/varbinary 输出都是 15,253 可先不管 // value 编码为 UTF-8;当上游类型为 BINARY 时,将对不可见的字符转义 TypeChar ColumnType = 254 // CHAR/BINARY /// not support TypeGeometry ColumnType = 255 TypeTiDBVectorFloat32 ColumnType = 256 // 处理同个 ColumnType 代表不同数据类型 Case TypeBool ColumnType = 1001 TypeBinary ColumnType = 1002 TypeTinyText ColumnType = 1003 TypeMediumText ColumnType = 1004 TypeText ColumnType = 1005 TypeLongText ColumnType = 1006 )
func (ColumnType) ColumnType ¶
func (c ColumnType) ColumnType() string
type Constraint ¶ added in v0.0.19
type Constraint struct { Task *task.Task SchemaRoute *rule.SchemaRouteRule DatabaseS, DatabaseT database.IDatabase TaskTables []string TableThread int TableRoutes []*rule.TableRouteRule // contains filtered or unexported fields }
func (*Constraint) Inspect ¶ added in v0.0.19
func (c *Constraint) Inspect(ctx context.Context) error
func (*Constraint) InspectDownstream ¶ added in v0.0.19
func (c *Constraint) InspectDownstream(ctx context.Context) error
func (*Constraint) InspectUpstream ¶ added in v0.0.19
func (c *Constraint) InspectUpstream(ctx context.Context) error
TiDB TiCDC index-value dispatcher update event compatible https://docs.pingcap.com/zh/tidb/dev/ticdc-split-update-behavior v6.5 [>=v6.5.5] tidb database version greater than v6.5.5 and less than v7.0.0 All versions are supported normally v7 version and above [>=v7.1.2] all versions of the tidb database version greater than v7.1.2 can be supported normally
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer represents a specific consumer
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup represents the consumer manager
func NewConsumerGroup ¶
func NewConsumerGroup( ctx context.Context, task *task.Task, schemaRoute *rule.SchemaRouteRule, tableRoute []*rule.TableRouteRule, columnRoute []*rule.ColumnRouteRule, consumeTables []string, param *pb.CdcConsumeParam, partitions []int, dbTypeT string, databaseS, databaseT database.IDatabase) *ConsumerGroup
func (*ConsumerGroup) AppendDDL ¶ added in v0.0.19
func (cg *ConsumerGroup) AppendDDL(key *DDLChangedEvent, value int)
func (*ConsumerGroup) Cancel ¶ added in v0.0.19
func (cg *ConsumerGroup) Cancel(partition int) error
func (*ConsumerGroup) Close ¶
func (cg *ConsumerGroup) Close() error
func (*ConsumerGroup) CommitMessage ¶
func (cg *ConsumerGroup) CommitMessage(c *Consumer, msg kafka.Message) error
func (*ConsumerGroup) ConsumeMessage ¶ added in v0.0.19
func (cg *ConsumerGroup) ConsumeMessage(c *Consumer) (bool, error)
func (*ConsumerGroup) Consumers ¶ added in v0.0.19
func (cg *ConsumerGroup) Consumers() map[int]*Consumer
func (*ConsumerGroup) Coordinators ¶ added in v0.0.19
func (cg *ConsumerGroup) Coordinators(key *DDLChangedEvent) int
func (*ConsumerGroup) Coroutines ¶ added in v0.0.19
func (cg *ConsumerGroup) Coroutines(c *Consumer)
func (*ConsumerGroup) Get ¶ added in v0.0.19
func (cg *ConsumerGroup) Get(partition int) (*Consumer, error)
func (*ConsumerGroup) GetDDL ¶ added in v0.0.19
func (cg *ConsumerGroup) GetDDL() *DDLChangedEvent
func (*ConsumerGroup) IsEventDDLFlush ¶ added in v0.0.19
func (cg *ConsumerGroup) IsEventDDLFlush(key *DDLChangedEvent) bool
func (*ConsumerGroup) IsEventResolvedFlush ¶ added in v0.0.19
func (cg *ConsumerGroup) IsEventResolvedFlush(resolvedTs uint64) bool
func (*ConsumerGroup) ObsoleteMessages ¶ added in v0.0.19
func (cg *ConsumerGroup) ObsoleteMessages(currentTs, nowTs uint64) bool
ObsoleteMessages 1. Initially start filtering and filtration of synchronously consumed events 2. During the operation, the corresponding partition DDL/ResolvedTs Event is refreshed. The reason is that CDC guarantees that all events before the DDL/ResolvedTs Event have been sent, and there should be no more events smaller than the DDL/ResolvedTs Event Ts
func (*ConsumerGroup) Partitions ¶ added in v0.0.19
func (cg *ConsumerGroup) Partitions(key *DDLChangedEvent) []int
func (*ConsumerGroup) Pause ¶
func (cg *ConsumerGroup) Pause(partition int) error
func (*ConsumerGroup) PopDDL ¶ added in v0.0.19
func (cg *ConsumerGroup) PopDDL()
func (*ConsumerGroup) RemoveDDL ¶ added in v0.0.19
func (cg *ConsumerGroup) RemoveDDL(key *DDLChangedEvent) error
func (*ConsumerGroup) Resume ¶
func (cg *ConsumerGroup) Resume(partition int) error
func (*ConsumerGroup) Run ¶
func (cg *ConsumerGroup) Run() error
func (*ConsumerGroup) Set ¶ added in v0.0.19
func (cg *ConsumerGroup) Set(partition int, c *Consumer)
func (*ConsumerGroup) Start ¶
func (cg *ConsumerGroup) Start(partition int) error
func (*ConsumerGroup) Stop ¶
func (cg *ConsumerGroup) Stop(partition int) error
func (*ConsumerGroup) WriteMessage ¶
func (cg *ConsumerGroup) WriteMessage(c *Consumer, msg kafka.Message) (bool, error)
WriteMessage is to decode kafka message to event.
type ConvColumn ¶
type ConvColumn struct { ColumnName string `json:"columnName"` ColumnType string `json:"columnType"` ColumnFlag ColumnFlagType `json:"columnFlag"` ColumnValue interface{} `json:"columnValue"` }
type DDLChangedEvent ¶
type DDLChangedEvent struct { CommitTs uint64 `json:"commitTs"` SchemaName string `json:"schemaName"` TableName string `json:"tableName"` DdlQuery string `json:"ddlQuery"` DdlType DDLType `json:"ddlType"` }
func MsgConvDDLEvent ¶
func MsgConvDDLEvent(caseFieldRuleS string, key *MessageEventKey, value *MessageDDLEventValue) *DDLChangedEvent
func (*DDLChangedEvent) String ¶
func (d *DDLChangedEvent) String() string
type DDLChangedEvents ¶
type DDLChangedEvents []*DDLChangedEvent
DDLChangedEvents is a slice of DDLChangedEvent and implements the sort.Interface interface.
func (*DDLChangedEvents) Add ¶
func (d *DDLChangedEvents) Add(event *DDLChangedEvent)
Add a new DDLChangedEvent and keep the ddls sorted by CommitTs
func (DDLChangedEvents) Len ¶
func (d DDLChangedEvents) Len() int
func (DDLChangedEvents) Less ¶
func (d DDLChangedEvents) Less(i, j int) bool
func (DDLChangedEvents) Swap ¶
func (d DDLChangedEvents) Swap(i, j int)
type DDLType ¶
type DDLType int
DDLType is the type for DDL DDL.
const ( DDLNone DDLType = 0 DDLCreateSchema DDLType = 1 DDLDropSchema DDLType = 2 DDLCreateTable DDLType = 3 DDLDropTable DDLType = 4 DDLAddColumn DDLType = 5 DDLDropColumn DDLType = 6 DDLAddIndex DDLType = 7 DDLDropIndex DDLType = 8 DDLAddForeignKey DDLType = 9 DDLDropForeignKey DDLType = 10 DDLTruncateTable DDLType = 11 DDLModifyColumn DDLType = 12 DDLRebaseAutoID DDLType = 13 DDLRenameTable DDLType = 14 DDLSetDefaultValue DDLType = 15 DDLShardRowID DDLType = 16 DDLModifyTableComment DDLType = 17 DDLRenameIndex DDLType = 18 DDLAddTablePartition DDLType = 19 DDLDropTablePartition DDLType = 20 DDLCreateView DDLType = 21 DDLModifyTableCharsetAndCollate DDLType = 22 DDLTruncateTablePartition DDLType = 23 DDLDropView DDLType = 24 DDLRecoverTable DDLType = 25 DDLModifySchemaCharsetAndCollate DDLType = 26 DDLLockTable DDLType = 27 DDLUnlockTable DDLType = 28 DDLRepairTable DDLType = 29 DDLSetTiFlashReplica DDLType = 30 DDLUpdateTiFlashReplicaStatus DDLType = 31 DDLAddPrimaryKey DDLType = 32 DDLDropPrimaryKey DDLType = 33 DDLCreateSequence DDLType = 34 DDLAlterSequence DDLType = 35 DDLDropSequence DDLType = 36 DDLAddColumns DDLType = 37 // Deprecated, we use DDLMultiSchemaChange instead. DDLDropColumns DDLType = 38 // Deprecated, we use DDLMultiSchemaChange instead. DDLModifyTableAutoIDCache DDLType = 39 DDLRebaseAutoRandomBase DDLType = 40 DDLAlterIndexVisibility DDLType = 41 DDLExchangeTablePartition DDLType = 42 DDLAddCheckConstraint DDLType = 43 DDLDropCheckConstraint DDLType = 44 DDLAlterCheckConstraint DDLType = 45 DDLRenameTables DDLType = 47 DDLAlterTableAttributes DDLType = 49 DDLAlterTablePartitionAttributes DDLType = 50 DDLCreatePlacementPolicy DDLType = 51 DDLAlterPlacementPolicy DDLType = 52 DDLDropPlacementPolicy DDLType = 53 DDLAlterTablePartitionPlacement DDLType = 54 DDLModifySchemaDefaultPlacement DDLType = 55 DDLAlterTablePlacement DDLType = 56 DDLAlterCacheTable DDLType = 57 // not used DDLAlterTableStatsOptions DDLType = 58 DDLAlterNoCacheTable DDLType = 59 DDLCreateTables DDLType = 60 DDLMultiSchemaChange DDLType = 61 DDLFlashbackCluster DDLType = 62 DDLRecoverSchema DDLType = 63 DDLReorganizePartition DDLType = 64 DDLAlterTTLInfo DDLType = 65 DDLAlterTTLRemove DDLType = 67 DDLCreateResourceGroup DDLType = 68 DDLAlterResourceGroup DDLType = 69 DDLDropResourceGroup DDLType = 70 DDLAlterTablePartitioning DDLType = 71 DDLRemovePartitioning DDLType = 72 DDLAddVectorIndex DDLType = 73 )
List DDL actions.
type EventGroup ¶
type EventGroup struct {
// contains filtered or unexported fields
}
EventGroup could store change ddl and dml event message.
func (*EventGroup) Append ¶
func (g *EventGroup) Append(e *RowChangedEvent)
Append will append an event to event groups.
func (*EventGroup) DDLCommitTs ¶
func (g *EventGroup) DDLCommitTs(ddlCommitTs uint64) []*RowChangedEvent
DDLCommitTs returns all events strictly < ddlCommitTs
func (*EventGroup) OrderSortedCommitTs ¶ added in v0.0.19
func (g *EventGroup) OrderSortedCommitTs() []uint64
OrderSortedCommitTs extract and sort all Commits
func (*EventGroup) RemoveDDLCommitTs ¶ added in v0.0.19
func (g *EventGroup) RemoveDDLCommitTs(ddlCommitTs uint64)
RemoveDDLCommitTs remove the ddl event equal to ddlCommit ts
func (*EventGroup) ResolvedTs ¶
func (g *EventGroup) ResolvedTs(resolveTs uint64) []*RowChangedEvent
ResolvedTs will get events whose CommitTs is less than or equal to resolveTs, and at the same time remove events whose CommitTs is less than or equal to resolveTs from the original queue
type MessageDDLEventValue ¶
func (*MessageDDLEventValue) Decode ¶
func (m *MessageDDLEventValue) Decode(data []byte) error
func (*MessageDDLEventValue) Encode ¶
func (m *MessageDDLEventValue) Encode() ([]byte, error)
type MessageEventKey ¶
type MessageEventKey struct { CommitTs uint64 `json:"ts"` SchemaName string `json:"scm,omitempty"` TableName string `json:"tbl,omitempty"` RowID int64 `json:"rid,omitempty"` MsgEventType MsgEventType `json:"t"` }
func (*MessageEventKey) Decode ¶
func (m *MessageEventKey) Decode(data []byte) error
Decode codes a message key from a byte slice.
func (*MessageEventKey) Encode ¶
func (m *MessageEventKey) Encode() ([]byte, error)
Encode encodes the message key to a byte slice.
type MessageRowEventValue ¶
type MessageRowEventValue struct { // Update Event 统一拆分 Delete/Insert,Before 对应 Delete // 如果 before 值存在则说明 Upsert 对应 Update Event,否则说明 Upsert 对应 Insert Event Upsert map[string]Column `json:"u,omitempty"` Before map[string]Column `json:"p,omitempty"` Delete map[string]Column `json:"d,omitempty"` }
func (*MessageRowEventValue) Decode ¶
func (m *MessageRowEventValue) Decode(caseFieldRuleS string, data []byte) error
func (*MessageRowEventValue) Encode ¶
func (m *MessageRowEventValue) Encode() ([]byte, error)
type Metadata ¶ added in v0.0.19
type Metadata struct { TaskName string TaskFlow string TaskMode string SchemaNameS string SchemaNameT string TaskTables []string TableThread int DatabaseT database.IDatabase TableRoutes []*rule.TableRouteRule CaseFieldRuleS string CaseFieldRuleT string }
TypeTinyBlob ColumnType = 249 // TINYTEXT/TINYBLOB -> 249 TypeMediumBlob ColumnType = 250 // MEDIUMTEXT/MEDIUMBLOB -> 250 TypeLongBlob ColumnType = 251 // LONGTEXT/LONGBLOB -> 251 TypeBlob ColumnType = 252 // TEXT/BLOB -> 252
The same field type id of the message event generated by tidb ticdc may represent different data types, and different data types correspond to different downstream database data types, such as text -> clob, blob -> blob. The consumption process cannot identify the specific downstream data type and needs to identify it based on downstream metadata to determine whether it is passed in string or []byte format.
func (*Metadata) GenDownstream ¶ added in v0.0.19
type MetadataCache ¶ added in v0.0.19
type MetadataCache struct {
// contains filtered or unexported fields
}
func NewMetadataCache ¶ added in v0.0.19
func NewMetadataCache() *MetadataCache
func (*MetadataCache) All ¶ added in v0.0.19
func (m *MetadataCache) All() string
All returns the all of entries in the cache.
func (*MetadataCache) Build ¶ added in v0.0.19
func (m *MetadataCache) Build(schemaName, tableName string) string
func (*MetadataCache) Delete ¶ added in v0.0.19
func (m *MetadataCache) Delete(schemaName, tableName string)
Delete removes the metadata for a given schema and table name.
func (*MetadataCache) DeleteSchema ¶ added in v0.0.19
func (m *MetadataCache) DeleteSchema(schemaName string)
func (*MetadataCache) Get ¶ added in v0.0.19
func (m *MetadataCache) Get(schemaName, tableName string) (*metadata, bool)
Get retrieves the metadata for a given schema and table name.
func (*MetadataCache) Set ¶ added in v0.0.19
func (m *MetadataCache) Set(schemaName, tableName string, metadata *metadata)
Set sets or updates the metadata for a given schema and table name.
func (*MetadataCache) Size ¶ added in v0.0.19
func (m *MetadataCache) Size() int
Size returns the number of entries in the cache.
type MsgEventType ¶
type MsgEventType int
MsgEventType is the type of message, which is used by MqSink and RedoLog.
const ( // MsgEventTypeUnknown is unknown type of message key MsgEventTypeUnknown MsgEventType = iota // MsgEventTypeRow is row type of message key MsgEventTypeRow // MsgEventTypeDDL is ddl type of message key MsgEventTypeDDL // MsgEventTypeResolved is resolved type of message key MsgEventTypeResolved )
type RowChangedEvent ¶
type RowChangedEvent struct { SchemaName string `json:"schemaName"` TableName string `json:"tableName"` QueryType string `json:"queryType"` CommitTs uint64 `json:"commitTS"` IsDDL bool `json:"isDDL"` DdlQuery string `json:"ddlQuery"` // The table synchronized by TiCDC needs to have at least one valid index. The definition of a valid index is as follows: // 1, the primary key (PRIMARY KEY) is a valid index. // 2, each column in the unique index (UNIQUE INDEX) is explicitly defined as NOT NULL in the table structure and there are no virtual generated columns (VIRTUAL GENERATED COLUMNS). // Data synchronization TiCDC will select a valid index as the Handle Index. The HandleKeyFlag of the columns included in the Handle Index is set to 1. ValidUniqColumns map[string]interface{} `json:"validUniqColumns"` // Represents all field names, but Kafka information does not carry field offsets, and is not guaranteed to be strictly consistent with the order in which the fields are created in the table structure. Sort by field name Columns []*columnAttr `json:"columns"` ColumnType map[string]string `json:"columnType"` NewColumnData map[string]interface{} `json:"newColumnData"` // Only when this message is generated by an Update type event, record the name of each column and the data value before Update OldColumnData map[string]interface{} `json:"oldColumnData"` }
TypeTinyBlob ColumnType = 249 // TINYTEXT/TINYBLOB -> 249 TypeMediumBlob ColumnType = 250 // MEDIUMTEXT/MEDIUMBLOB -> 250 TypeLongBlob ColumnType = 251 // LONGTEXT/LONGBLOB -> 251 TypeBlob ColumnType = 252 // TEXT/BLOB -> 252
The same field type id of the message event generated by tidb ticdc may represent different data types, and different data types correspond to different downstream database data types, such as text -> clob, blob -> blob. The consumption process cannot identify the specific downstream data type and needs to identify it based on downstream metadata to determine whether it is passed in string or []byte format.
RowChangedEvent store the ddl and dml event
func MsgConvRowChangedEvent ¶
func MsgConvRowChangedEvent(key *MessageEventKey, value *MessageRowEventValue) (*RowChangedEvent, error)
func (*RowChangedEvent) Delete ¶
func (e *RowChangedEvent) Delete( dbTypeT string, schemaNameT string, tableRoute []*rule.TableRouteRule, columnRoute []*rule.ColumnRouteRule, caseFieldRuleT string) (string, []interface{}, error)
func (*RowChangedEvent) Insert ¶ added in v0.0.19
func (e *RowChangedEvent) Insert( dbTypeT string, schemaNameT string, tableRoute []*rule.TableRouteRule, columnRoute []*rule.ColumnRouteRule, caseFieldRuleT string) (string, []interface{}, error)
func (*RowChangedEvent) String ¶
func (e *RowChangedEvent) String() string
type RowEventDecoder ¶
type RowEventDecoder interface { // AddKeyValue add the received key and values to the decoder, // should be called before `HasNext` // decoder decode the key and value into the event format. AddKeyValue(key, value []byte) error // HasNext returns // 1. the type of the next event // 2. a bool if the next event is exist // 3. error HasNext() (MsgEventType, bool, error) // NextResolvedEvent returns the next resolved event if exists NextResolvedEvent() (uint64, error) // NextRowChangedEvent returns the next row changed event if exists NextRowChangedEvent() (*RowChangedEvent, error) // NextDDLEvent returns the next DDL event if exists NextDDLEvent() (*DDLChangedEvent, error) }
RowEventDecoder is an abstraction for events decoder this interface is only for testing now
func NewBatchDecoder ¶
func NewBatchDecoder(caseFieldRuleS, msgCompresion string) RowEventDecoder
NewBatchDecoder creates a new BatchDecoder.