oceanbase

package
v0.0.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2025 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Overview

Copyright © 2020 Marvin

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	MsgRecordTypeInsert    = "INSERT"
	MsgRecordTypeUpdate    = "UPDATE"
	MsgRecordTypeDelete    = "DELETE"
	MsgRecordTypeHeartbeat = "HEARTBEAT"
	MsgRecordTypeDDL       = "DDL"
	MsgRecordTypeROW       = "ROW"
)

Variables

View Source
var (
	DDLCreateTable = "CREATE TABLE"
	DDLDropTable   = "DROP TABLE"
	DDLAlterTable  = "ALTER TABLE"
	DDLRenameTable = "RENAME TABLE"
)

Functions

func QuoteSchemaTable

func QuoteSchemaTable(schema string, table string) string

QuoteSchemaTable quotes a table name

Types

type Constraint

type Constraint struct {
	Task                 *task.Task
	SchemaRoute          *rule.SchemaRouteRule
	DbTypeT              string
	DatabaseS, DatabaseT database.IDatabase
	TaskTables           []string
	TableThread          int
	TableRoutes          []*rule.TableRouteRule
	// contains filtered or unexported fields
}

func (*Constraint) Inspect

func (c *Constraint) Inspect(ctx context.Context) error

func (*Constraint) InspectDownstream

func (c *Constraint) InspectDownstream(ctx context.Context) error

func (*Constraint) InspectUpstream

func (c *Constraint) InspectUpstream(ctx context.Context) error

TiDB TiCDC index-value dispatcher update event compatible https://docs.pingcap.com/zh/tidb/dev/ticdc-split-update-behavior v6.5 [>=v6.5.5] tidb database version greater than v6.5.5 and less than v7.0.0 All versions are supported normally v7 version and above [>=v7.1.2] all versions of the tidb database version greater than v7.1.2 can be supported normally

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a specific consumer

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

ConsumerGroup represents the consumer manager

func NewConsumerGroup

func NewConsumerGroup(
	ctx context.Context,
	task *task.Task,
	schemaRoute *rule.SchemaRouteRule,
	tableRoute []*rule.TableRouteRule,
	columnRoute []*rule.ColumnRouteRule,
	consumeTables []string,
	param *pb.CdcConsumeParam,
	partitions []int,
	dbTypeS string,
	dbTypeT string,
	databaseS, databaseT database.IDatabase) *ConsumerGroup

func (*ConsumerGroup) AppendDDL

func (cg *ConsumerGroup) AppendDDL(key *DDLChangedEvent, value int)

func (*ConsumerGroup) Cancel

func (cg *ConsumerGroup) Cancel(partition int) error

func (*ConsumerGroup) Close

func (cg *ConsumerGroup) Close() error

func (*ConsumerGroup) CommitMessage

func (cg *ConsumerGroup) CommitMessage(c *Consumer, msg kafka.Message) error

func (*ConsumerGroup) ConsumeMessage

func (cg *ConsumerGroup) ConsumeMessage(c *Consumer) (bool, error)

func (*ConsumerGroup) Consumers

func (cg *ConsumerGroup) Consumers() map[int]*Consumer

func (*ConsumerGroup) Coordinators

func (cg *ConsumerGroup) Coordinators(key *DDLChangedEvent) int

func (*ConsumerGroup) Coroutines

func (cg *ConsumerGroup) Coroutines(c *Consumer)

func (*ConsumerGroup) Get

func (cg *ConsumerGroup) Get(partition int) (*Consumer, error)

func (*ConsumerGroup) GetDDL

func (cg *ConsumerGroup) GetDDL() *DDLChangedEvent

func (*ConsumerGroup) IsEventDDLFlush

func (cg *ConsumerGroup) IsEventDDLFlush(key *DDLChangedEvent) bool

func (*ConsumerGroup) ObsoleteMessages

func (cg *ConsumerGroup) ObsoleteMessages(currentTs, nowTs int64) bool

func (*ConsumerGroup) Partitions

func (cg *ConsumerGroup) Partitions(key *DDLChangedEvent) []int

func (*ConsumerGroup) Pause

func (cg *ConsumerGroup) Pause(partition int) error

func (*ConsumerGroup) PopDDL

func (cg *ConsumerGroup) PopDDL()

func (*ConsumerGroup) RemoveDDL

func (cg *ConsumerGroup) RemoveDDL(key *DDLChangedEvent) error

func (*ConsumerGroup) Resume

func (cg *ConsumerGroup) Resume(partition int) error

func (*ConsumerGroup) Run

func (cg *ConsumerGroup) Run() error

func (*ConsumerGroup) Set

func (cg *ConsumerGroup) Set(partition int, c *Consumer)

func (*ConsumerGroup) Start

func (cg *ConsumerGroup) Start(partition int) error

func (*ConsumerGroup) Stop

func (cg *ConsumerGroup) Stop(partition int) error

func (*ConsumerGroup) WriteMessage

func (cg *ConsumerGroup) WriteMessage(c *Consumer, msg kafka.Message) (bool, error)

WriteMessage is to decode kafka message to event.

type DDLChangedEvent

type DDLChangedEvent struct {
	CommitTs   uint64 `json:"commitTs"`
	SchemaName string `json:"schemaName"`
	TableName  string `json:"tableName"`
	DdlQuery   string `json:"ddlQuery"`
	DdlType    string `json:"ddlType"`
}

func (*DDLChangedEvent) String

func (d *DDLChangedEvent) String() string

type DDLChangedEvents

type DDLChangedEvents []*DDLChangedEvent

DDLChangedEvents is a slice of DDLChangedEvent and implements the sort.Interface interface.

func (*DDLChangedEvents) Add

func (d *DDLChangedEvents) Add(event *DDLChangedEvent)

Add a new DDLChangedEvent and keep the ddls sorted by CommitTs

func (DDLChangedEvents) Len

func (d DDLChangedEvents) Len() int

func (DDLChangedEvents) Less

func (d DDLChangedEvents) Less(i, j int) bool

func (DDLChangedEvents) Swap

func (d DDLChangedEvents) Swap(i, j int)

type Decoder

type Decoder struct {
	// contains filtered or unexported fields
}

Decoder decodes the byte of a batch into the original messages.

func (*Decoder) AddKeyValue

func (b *Decoder) AddKeyValue(key, value []byte) error

AddKeyValue implements the RowEventDecoder interface

func (*Decoder) HasNext

func (b *Decoder) HasNext() (string, bool, error)

HasNext implements the RowEventDecoder interface

func (*Decoder) NextDDLEvent

func (b *Decoder) NextDDLEvent() (*DDLChangedEvent, error)

NextDDLEvent implements the RowEventDecoder interface

func (*Decoder) NextRowChangedEvent

func (b *Decoder) NextRowChangedEvent() (*RowChangedEvent, error)

NextRowChangedEvent implements the RowEventDecoder interface

type EventGroup

type EventGroup struct {
	// contains filtered or unexported fields
}

EventGroup could store change ddl and dml event message.

func NewEventGroup

func NewEventGroup() *EventGroup

NewEventGroup will create new event group.

func (*EventGroup) Append

func (g *EventGroup) Append(e *RowChangedEvent)

Append will append an event to event groups.

func (*EventGroup) DDLCommitTs

func (g *EventGroup) DDLCommitTs(ddlCommitTs uint64) []*RowChangedEvent

DDLCommitTs returns all events strictly < ddlCommitTs

func (*EventGroup) OrderSortedEventCommitTs

func (g *EventGroup) OrderSortedEventCommitTs() []*RowChangedEvent

OrderSortedEventCommitTs extract and sort all Commits

func (*EventGroup) RemoveDDLCommitTs

func (g *EventGroup) RemoveDDLCommitTs(ddlCommitTs uint64)

RemoveDDLCommitTs remove the ddl event equal to ddlCommit ts

type Message

type Message struct {
	/*
		DELETE only has prevStruct, INSERT and DDL only have postStruct, UPDATE has both prevStruct and postStruct, HEARTBEAT (regular heartbeat message) does not have postStruct and postStruct.
	*/
	// the table record kv image before change
	PrevStruct map[string]interface{} `json:"prevStruct"`
	// the table record kv image after change
	PostStruct map[string]interface{} `json:"postStruct"`

	Metadata allMetadata `json:"allMetaData"`
	// INSERT/UPDATE/DELETE/HEARTBEAT/DDL/ROW
	RecordType string `json:"recordType"`
}

https://www.oceanbase.com/docs/enterprise-oms-doc-cn-1000000001781775 DefaultExtendColumnType JSON Message The DefaultExtendColumnType JSON message format adds a field __light_type to the postStruct image based on DEFAULT to indicate the data type of the field.

func (*Message) DecodeDDLChangedEvent

func (m *Message) DecodeDDLChangedEvent(caseFieldRuleS string) *DDLChangedEvent

oceanbase message record database db has tenants, the format is tenant.schema

func (*Message) DecodeRowChangedEvent

func (m *Message) DecodeRowChangedEvent(dbTypeS, caseFieldRuleS string) (*RowChangedEvent, error)

func (*Message) String

func (m *Message) String() string

type Metadata

type Metadata struct {
	TaskName       string
	TaskFlow       string
	TaskMode       string
	DBTypeS        string
	SchemaNameS    string
	SchemaNameT    string
	TaskTables     []string
	TableThread    int
	TableRoutes    []*rule.TableRouteRule
	ColumnRoutes   []*rule.ColumnRouteRule
	CaseFieldRuleS string
	CaseFieldRuleT string
	DatabaseS      database.IDatabase
	DatabaseT      database.IDatabase
}

the database table metadata

func (*Metadata) GenDownstream

func (m *Metadata) GenDownstream(ctx context.Context) error

func (*Metadata) GenMetadata

func (m *Metadata) GenMetadata(ctx context.Context) error

func (*Metadata) GenUpstream

func (m *Metadata) GenUpstream(ctx context.Context) error

type MetadataCache

type MetadataCache struct {
	Timezone string
	// contains filtered or unexported fields
}

func NewMetadataCache

func NewMetadataCache() *MetadataCache

func (*MetadataCache) All

func (m *MetadataCache) All() string

All returns the all of entries in the cache.

func (*MetadataCache) Build

func (m *MetadataCache) Build(schemaName, tableName string) string

func (*MetadataCache) Delete

func (m *MetadataCache) Delete(schemaName, tableName string)

Delete removes the metadata for a given schema and table name.

func (*MetadataCache) Get

func (m *MetadataCache) Get(schemaName, tableName string) (*metadata, bool)

Get retrieves the metadata for a given schema and table name.

func (*MetadataCache) GetTimezone

func (m *MetadataCache) GetTimezone() string

func (*MetadataCache) Set

func (m *MetadataCache) Set(schemaName, tableName string, metadata *metadata)

Set sets or updates the metadata for a given schema and table name.

func (*MetadataCache) SetTimezone

func (m *MetadataCache) SetTimezone(timeZone string)

Timezone returns the timezone of entries in the cache.

func (*MetadataCache) Size

func (m *MetadataCache) Size() int

Size returns the number of entries in the cache.

type RowChangedEvent

type RowChangedEvent struct {
	SchemaName string `json:"schemaName"`
	TableName  string `json:"tableName"`
	QueryType  string `json:"queryType"`
	CommitTs   uint64 `json:"commitTS"`

	IsDDL    bool   `json:"isDDL"`
	DdlQuery string `json:"ddlQuery"`

	// The table synchronized by oceanbase needs to have at least one valid index. The definition of a valid index is as follows:
	// 1,	the primary key (PRIMARY KEY) is a valid index.
	// 2,	each column in the unique index (UNIQUE INDEX) is explicitly defined as NOT NULL in the table structure and there are no virtual generated columns (VIRTUAL GENERATED COLUMNS).
	ValidUniqColumns map[string]interface{} `json:"validUniqColumns"`

	ColumnType    map[string]string
	NewColumnData map[string]interface{} `json:"newColumnData"`
	// Only when this message is generated by an Update type event, record the name of each column and the data value before Update
	OldColumnData map[string]interface{} `json:"oldColumnData"`
}

func (*RowChangedEvent) Delete

func (e *RowChangedEvent) Delete(dbTypeS, dbTypeT, schemaNameT string,
	tableRoute []*rule.TableRouteRule,
	columnRoute []*rule.ColumnRouteRule,
	caseFieldRuleT string) (string, []interface{}, error)

func (*RowChangedEvent) DeleteFromObMySQL

func (e *RowChangedEvent) DeleteFromObMySQL(
	dbTypeT string,
	schemaNameT string,
	tableRoute []*rule.TableRouteRule,
	columnRoute []*rule.ColumnRouteRule,
	caseFieldRuleT string) (string, []interface{}, error)

func (*RowChangedEvent) Insert

func (e *RowChangedEvent) Insert(dbTypeS, dbTypeT, schemaNameT string,
	tableRoute []*rule.TableRouteRule,
	columnRoute []*rule.ColumnRouteRule,
	caseFieldRuleT string) (string, []interface{}, error)

func (*RowChangedEvent) InsertFromObMySQL

func (e *RowChangedEvent) InsertFromObMySQL(dbTypeT, schemaNameT string,
	tableRoute []*rule.TableRouteRule,
	columnRoute []*rule.ColumnRouteRule,
	caseFieldRuleT string) (string, []interface{}, error)

func (*RowChangedEvent) String

func (e *RowChangedEvent) String() string

type RowEventDecoder

type RowEventDecoder interface {
	// AddKeyValue add the received key and values to the decoder,
	// should be called before `HasNext`
	// decoder decode the key and value into the event format.
	AddKeyValue(key, value []byte) error

	// HasNext returns
	//     1. the type of the next event
	//     2. a bool if the next event is exist
	//     3. error
	HasNext() (string, bool, error)
	// NextRowChangedEvent returns the next row changed event if exists
	NextRowChangedEvent() (*RowChangedEvent, error)
	// NextDDLEvent returns the next DDL event if exists
	NextDDLEvent() (*DDLChangedEvent, error)
}

RowEventDecoder is an abstraction for events decoder this interface is only for testing now

func NewDecoder

func NewDecoder(dbTypeS, caseFieldRuleS string) RowEventDecoder

NewDecoder creates a new Decoder.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳