Documentation
¶
Overview ¶
Copyright © 2020 Marvin
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright © 2020 Marvin ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- Variables
- func QuoteSchemaTable(schema string, table string) string
- type Constraint
- type Consumer
- type ConsumerGroup
- func (cg *ConsumerGroup) AppendDDL(key *DDLChangedEvent, value int)
- func (cg *ConsumerGroup) Cancel(partition int) error
- func (cg *ConsumerGroup) Close() error
- func (cg *ConsumerGroup) CommitMessage(c *Consumer, msg kafka.Message) error
- func (cg *ConsumerGroup) ConsumeMessage(c *Consumer) (bool, error)
- func (cg *ConsumerGroup) Consumers() map[int]*Consumer
- func (cg *ConsumerGroup) Coordinators(key *DDLChangedEvent) int
- func (cg *ConsumerGroup) Coroutines(c *Consumer)
- func (cg *ConsumerGroup) Get(partition int) (*Consumer, error)
- func (cg *ConsumerGroup) GetDDL() *DDLChangedEvent
- func (cg *ConsumerGroup) IsEventDDLFlush(key *DDLChangedEvent) bool
- func (cg *ConsumerGroup) ObsoleteMessages(currentTs, nowTs int64) bool
- func (cg *ConsumerGroup) Partitions(key *DDLChangedEvent) []int
- func (cg *ConsumerGroup) Pause(partition int) error
- func (cg *ConsumerGroup) PopDDL()
- func (cg *ConsumerGroup) RemoveDDL(key *DDLChangedEvent) error
- func (cg *ConsumerGroup) Resume(partition int) error
- func (cg *ConsumerGroup) Run() error
- func (cg *ConsumerGroup) Set(partition int, c *Consumer)
- func (cg *ConsumerGroup) Start(partition int) error
- func (cg *ConsumerGroup) Stop(partition int) error
- func (cg *ConsumerGroup) WriteMessage(c *Consumer, msg kafka.Message) (bool, error)
- type DDLChangedEvent
- type DDLChangedEvents
- type Decoder
- type EventGroup
- type Message
- type Metadata
- type MetadataCache
- func (m *MetadataCache) All() string
- func (m *MetadataCache) Build(schemaName, tableName string) string
- func (m *MetadataCache) Delete(schemaName, tableName string)
- func (m *MetadataCache) Get(schemaName, tableName string) (*metadata, bool)
- func (m *MetadataCache) GetTimezone() string
- func (m *MetadataCache) Set(schemaName, tableName string, metadata *metadata)
- func (m *MetadataCache) SetTimezone(timeZone string)
- func (m *MetadataCache) Size() int
- type RowChangedEvent
- func (e *RowChangedEvent) Delete(dbTypeS, dbTypeT, schemaNameT string, tableRoute []*rule.TableRouteRule, ...) (string, []interface{}, error)
- func (e *RowChangedEvent) DeleteFromObMySQL(dbTypeT string, schemaNameT string, tableRoute []*rule.TableRouteRule, ...) (string, []interface{}, error)
- func (e *RowChangedEvent) Insert(dbTypeS, dbTypeT, schemaNameT string, tableRoute []*rule.TableRouteRule, ...) (string, []interface{}, error)
- func (e *RowChangedEvent) InsertFromObMySQL(dbTypeT, schemaNameT string, tableRoute []*rule.TableRouteRule, ...) (string, []interface{}, error)
- func (e *RowChangedEvent) String() string
- type RowEventDecoder
Constants ¶
const ( MsgRecordTypeInsert = "INSERT" MsgRecordTypeUpdate = "UPDATE" MsgRecordTypeDelete = "DELETE" MsgRecordTypeHeartbeat = "HEARTBEAT" MsgRecordTypeDDL = "DDL" MsgRecordTypeROW = "ROW" )
Variables ¶
var ( DDLCreateTable = "CREATE TABLE" DDLDropTable = "DROP TABLE" DDLAlterTable = "ALTER TABLE" DDLRenameTable = "RENAME TABLE" )
Functions ¶
func QuoteSchemaTable ¶
QuoteSchemaTable quotes a table name
Types ¶
type Constraint ¶
type Constraint struct { Task *task.Task SchemaRoute *rule.SchemaRouteRule DbTypeT string DatabaseS, DatabaseT database.IDatabase TaskTables []string TableThread int TableRoutes []*rule.TableRouteRule // contains filtered or unexported fields }
func (*Constraint) InspectDownstream ¶
func (c *Constraint) InspectDownstream(ctx context.Context) error
func (*Constraint) InspectUpstream ¶
func (c *Constraint) InspectUpstream(ctx context.Context) error
TiDB TiCDC index-value dispatcher update event compatible https://docs.pingcap.com/zh/tidb/dev/ticdc-split-update-behavior v6.5 [>=v6.5.5] tidb database version greater than v6.5.5 and less than v7.0.0 All versions are supported normally v7 version and above [>=v7.1.2] all versions of the tidb database version greater than v7.1.2 can be supported normally
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer represents a specific consumer
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup represents the consumer manager
func NewConsumerGroup ¶
func NewConsumerGroup( ctx context.Context, task *task.Task, schemaRoute *rule.SchemaRouteRule, tableRoute []*rule.TableRouteRule, columnRoute []*rule.ColumnRouteRule, consumeTables []string, param *pb.CdcConsumeParam, partitions []int, dbTypeS string, dbTypeT string, databaseS, databaseT database.IDatabase) *ConsumerGroup
func (*ConsumerGroup) AppendDDL ¶
func (cg *ConsumerGroup) AppendDDL(key *DDLChangedEvent, value int)
func (*ConsumerGroup) Cancel ¶
func (cg *ConsumerGroup) Cancel(partition int) error
func (*ConsumerGroup) Close ¶
func (cg *ConsumerGroup) Close() error
func (*ConsumerGroup) CommitMessage ¶
func (cg *ConsumerGroup) CommitMessage(c *Consumer, msg kafka.Message) error
func (*ConsumerGroup) ConsumeMessage ¶
func (cg *ConsumerGroup) ConsumeMessage(c *Consumer) (bool, error)
func (*ConsumerGroup) Consumers ¶
func (cg *ConsumerGroup) Consumers() map[int]*Consumer
func (*ConsumerGroup) Coordinators ¶
func (cg *ConsumerGroup) Coordinators(key *DDLChangedEvent) int
func (*ConsumerGroup) Coroutines ¶
func (cg *ConsumerGroup) Coroutines(c *Consumer)
func (*ConsumerGroup) GetDDL ¶
func (cg *ConsumerGroup) GetDDL() *DDLChangedEvent
func (*ConsumerGroup) IsEventDDLFlush ¶
func (cg *ConsumerGroup) IsEventDDLFlush(key *DDLChangedEvent) bool
func (*ConsumerGroup) ObsoleteMessages ¶
func (cg *ConsumerGroup) ObsoleteMessages(currentTs, nowTs int64) bool
func (*ConsumerGroup) Partitions ¶
func (cg *ConsumerGroup) Partitions(key *DDLChangedEvent) []int
func (*ConsumerGroup) Pause ¶
func (cg *ConsumerGroup) Pause(partition int) error
func (*ConsumerGroup) PopDDL ¶
func (cg *ConsumerGroup) PopDDL()
func (*ConsumerGroup) RemoveDDL ¶
func (cg *ConsumerGroup) RemoveDDL(key *DDLChangedEvent) error
func (*ConsumerGroup) Resume ¶
func (cg *ConsumerGroup) Resume(partition int) error
func (*ConsumerGroup) Run ¶
func (cg *ConsumerGroup) Run() error
func (*ConsumerGroup) Set ¶
func (cg *ConsumerGroup) Set(partition int, c *Consumer)
func (*ConsumerGroup) Start ¶
func (cg *ConsumerGroup) Start(partition int) error
func (*ConsumerGroup) Stop ¶
func (cg *ConsumerGroup) Stop(partition int) error
func (*ConsumerGroup) WriteMessage ¶
func (cg *ConsumerGroup) WriteMessage(c *Consumer, msg kafka.Message) (bool, error)
WriteMessage is to decode kafka message to event.
type DDLChangedEvent ¶
type DDLChangedEvent struct { CommitTs uint64 `json:"commitTs"` SchemaName string `json:"schemaName"` TableName string `json:"tableName"` DdlQuery string `json:"ddlQuery"` DdlType string `json:"ddlType"` }
func (*DDLChangedEvent) String ¶
func (d *DDLChangedEvent) String() string
type DDLChangedEvents ¶
type DDLChangedEvents []*DDLChangedEvent
DDLChangedEvents is a slice of DDLChangedEvent and implements the sort.Interface interface.
func (*DDLChangedEvents) Add ¶
func (d *DDLChangedEvents) Add(event *DDLChangedEvent)
Add a new DDLChangedEvent and keep the ddls sorted by CommitTs
func (DDLChangedEvents) Len ¶
func (d DDLChangedEvents) Len() int
func (DDLChangedEvents) Less ¶
func (d DDLChangedEvents) Less(i, j int) bool
func (DDLChangedEvents) Swap ¶
func (d DDLChangedEvents) Swap(i, j int)
type Decoder ¶
type Decoder struct {
// contains filtered or unexported fields
}
Decoder decodes the byte of a batch into the original messages.
func (*Decoder) AddKeyValue ¶
AddKeyValue implements the RowEventDecoder interface
func (*Decoder) NextDDLEvent ¶
func (b *Decoder) NextDDLEvent() (*DDLChangedEvent, error)
NextDDLEvent implements the RowEventDecoder interface
func (*Decoder) NextRowChangedEvent ¶
func (b *Decoder) NextRowChangedEvent() (*RowChangedEvent, error)
NextRowChangedEvent implements the RowEventDecoder interface
type EventGroup ¶
type EventGroup struct {
// contains filtered or unexported fields
}
EventGroup could store change ddl and dml event message.
func (*EventGroup) Append ¶
func (g *EventGroup) Append(e *RowChangedEvent)
Append will append an event to event groups.
func (*EventGroup) DDLCommitTs ¶
func (g *EventGroup) DDLCommitTs(ddlCommitTs uint64) []*RowChangedEvent
DDLCommitTs returns all events strictly < ddlCommitTs
func (*EventGroup) OrderSortedEventCommitTs ¶
func (g *EventGroup) OrderSortedEventCommitTs() []*RowChangedEvent
OrderSortedEventCommitTs extract and sort all Commits
func (*EventGroup) RemoveDDLCommitTs ¶
func (g *EventGroup) RemoveDDLCommitTs(ddlCommitTs uint64)
RemoveDDLCommitTs remove the ddl event equal to ddlCommit ts
type Message ¶
type Message struct { /* DELETE only has prevStruct, INSERT and DDL only have postStruct, UPDATE has both prevStruct and postStruct, HEARTBEAT (regular heartbeat message) does not have postStruct and postStruct. */ // the table record kv image before change PrevStruct map[string]interface{} `json:"prevStruct"` // the table record kv image after change PostStruct map[string]interface{} `json:"postStruct"` Metadata allMetadata `json:"allMetaData"` // INSERT/UPDATE/DELETE/HEARTBEAT/DDL/ROW RecordType string `json:"recordType"` }
https://www.oceanbase.com/docs/enterprise-oms-doc-cn-1000000001781775 DefaultExtendColumnType JSON Message The DefaultExtendColumnType JSON message format adds a field __light_type to the postStruct image based on DEFAULT to indicate the data type of the field.
func (*Message) DecodeDDLChangedEvent ¶
func (m *Message) DecodeDDLChangedEvent(caseFieldRuleS string) *DDLChangedEvent
oceanbase message record database db has tenants, the format is tenant.schema
func (*Message) DecodeRowChangedEvent ¶
func (m *Message) DecodeRowChangedEvent(dbTypeS, caseFieldRuleS string) (*RowChangedEvent, error)
type Metadata ¶
type Metadata struct { TaskName string TaskFlow string TaskMode string DBTypeS string SchemaNameS string SchemaNameT string TaskTables []string TableThread int TableRoutes []*rule.TableRouteRule ColumnRoutes []*rule.ColumnRouteRule CaseFieldRuleS string CaseFieldRuleT string DatabaseS database.IDatabase DatabaseT database.IDatabase }
the database table metadata
type MetadataCache ¶
type MetadataCache struct { Timezone string // contains filtered or unexported fields }
func NewMetadataCache ¶
func NewMetadataCache() *MetadataCache
func (*MetadataCache) All ¶
func (m *MetadataCache) All() string
All returns the all of entries in the cache.
func (*MetadataCache) Build ¶
func (m *MetadataCache) Build(schemaName, tableName string) string
func (*MetadataCache) Delete ¶
func (m *MetadataCache) Delete(schemaName, tableName string)
Delete removes the metadata for a given schema and table name.
func (*MetadataCache) Get ¶
func (m *MetadataCache) Get(schemaName, tableName string) (*metadata, bool)
Get retrieves the metadata for a given schema and table name.
func (*MetadataCache) GetTimezone ¶
func (m *MetadataCache) GetTimezone() string
func (*MetadataCache) Set ¶
func (m *MetadataCache) Set(schemaName, tableName string, metadata *metadata)
Set sets or updates the metadata for a given schema and table name.
func (*MetadataCache) SetTimezone ¶
func (m *MetadataCache) SetTimezone(timeZone string)
Timezone returns the timezone of entries in the cache.
func (*MetadataCache) Size ¶
func (m *MetadataCache) Size() int
Size returns the number of entries in the cache.
type RowChangedEvent ¶
type RowChangedEvent struct { SchemaName string `json:"schemaName"` TableName string `json:"tableName"` QueryType string `json:"queryType"` CommitTs uint64 `json:"commitTS"` IsDDL bool `json:"isDDL"` DdlQuery string `json:"ddlQuery"` // The table synchronized by oceanbase needs to have at least one valid index. The definition of a valid index is as follows: // 1, the primary key (PRIMARY KEY) is a valid index. // 2, each column in the unique index (UNIQUE INDEX) is explicitly defined as NOT NULL in the table structure and there are no virtual generated columns (VIRTUAL GENERATED COLUMNS). ValidUniqColumns map[string]interface{} `json:"validUniqColumns"` ColumnType map[string]string NewColumnData map[string]interface{} `json:"newColumnData"` // Only when this message is generated by an Update type event, record the name of each column and the data value before Update OldColumnData map[string]interface{} `json:"oldColumnData"` }
func (*RowChangedEvent) Delete ¶
func (e *RowChangedEvent) Delete(dbTypeS, dbTypeT, schemaNameT string, tableRoute []*rule.TableRouteRule, columnRoute []*rule.ColumnRouteRule, caseFieldRuleT string) (string, []interface{}, error)
func (*RowChangedEvent) DeleteFromObMySQL ¶
func (e *RowChangedEvent) DeleteFromObMySQL( dbTypeT string, schemaNameT string, tableRoute []*rule.TableRouteRule, columnRoute []*rule.ColumnRouteRule, caseFieldRuleT string) (string, []interface{}, error)
func (*RowChangedEvent) Insert ¶
func (e *RowChangedEvent) Insert(dbTypeS, dbTypeT, schemaNameT string, tableRoute []*rule.TableRouteRule, columnRoute []*rule.ColumnRouteRule, caseFieldRuleT string) (string, []interface{}, error)
func (*RowChangedEvent) InsertFromObMySQL ¶
func (e *RowChangedEvent) InsertFromObMySQL(dbTypeT, schemaNameT string, tableRoute []*rule.TableRouteRule, columnRoute []*rule.ColumnRouteRule, caseFieldRuleT string) (string, []interface{}, error)
func (*RowChangedEvent) String ¶
func (e *RowChangedEvent) String() string
type RowEventDecoder ¶
type RowEventDecoder interface { // AddKeyValue add the received key and values to the decoder, // should be called before `HasNext` // decoder decode the key and value into the event format. AddKeyValue(key, value []byte) error // HasNext returns // 1. the type of the next event // 2. a bool if the next event is exist // 3. error HasNext() (string, bool, error) // NextRowChangedEvent returns the next row changed event if exists NextRowChangedEvent() (*RowChangedEvent, error) // NextDDLEvent returns the next DDL event if exists NextDDLEvent() (*DDLChangedEvent, error) }
RowEventDecoder is an abstraction for events decoder this interface is only for testing now
func NewDecoder ¶
func NewDecoder(dbTypeS, caseFieldRuleS string) RowEventDecoder
NewDecoder creates a new Decoder.