Documentation
¶
Index ¶
- Constants
- func NewSyncWaitGroup() *syncWaitGroup
- type BinlogParams
- type DataFilterCondition
- type Filter
- type SyncParams
- func (s *SyncParams) Clone(eventType string) *SyncParams
- func (s *SyncParams) GetBingLogParams() *BinlogParams
- func (s *SyncParams) GetIdentifyId() string
- func (s *SyncParams) GetJoinColumn() string
- func (s *SyncParams) GetUpdateValues(updatedColumns []string) interface{}
- func (s *SyncParams) GetWg() *syncWaitGroup
- func (s *SyncParams) IsPrimaryKeyUpdated() bool
- func (s *SyncParams) MergeOldToData() map[string]string
- func (s *SyncParams) Recycle()
- func (s *SyncParams) SetBinLogParams(params *BinlogParams)
- type SyncRule
Constants ¶
View Source
const ConditionTypeAnd = "and" // 多条件与关系
View Source
const ConditionTypeOr = "or" // 多条件或关系
View Source
const DataSourceElasticSearch = "es" // es 类型数据源
View Source
const DataSourceMysql = "mysql" // mysql 类型数据源
View Source
const EventTypeDelete = "delete" // 删除事件
View Source
const EventTypeInsert = "insert" // 插入事件
View Source
const EventTypeUpdate = "update" // 更新事件
View Source
const InnerSyncTypeDefaultKey = "innerSyncTypeDefaultKey"
View Source
const ReaderTypeKafka = "kafka" // kafka 类型读取器
View Source
const ReaderTypeWeb = "web" // web 类型读取器
View Source
const SyncTypeCopy = "copy" // 可复制的字段全部拷贝到目标数据源作为一条新的记录
View Source
const SyncTypeInner = "inner" // 只允许复制一个字段,复制到目标数据源符合记录的某个字段,这个字段一定是一个数组
View Source
const SyncTypeJoin = "join" // 可复制的字段全部拷贝到目标数据源中符合条件的记录,作为这条记录的某个对象字段
View Source
const TimestampCreatedAt = "created_at" // 创建时间戳
View Source
const TimestampUpdatedAt = "updated_at" // 更新时间戳
Variables ¶
This section is empty.
Functions ¶
func NewSyncWaitGroup ¶
func NewSyncWaitGroup() *syncWaitGroup
Types ¶
type BinlogParams ¶
type BinlogParams struct { EventId string `json:"event_id" binding:"required"` // 事件ID 唯一 Database string `json:"database" binding:"required"` // 库名 Table string `json:"table" binding:"required"` // 表名 EventAt int64 `json:"ts" binding:"required"` // 事件时间 EventType string `json:"type" binding:"required"` // 事件类型 IsDdl bool `json:"isDdl" binding:"omitempty"` // 是否 ddl修改(ddl 修改不处理) Data []map[string]string `json:"data" binding:"required"` // 更新后数据 (全量数据,根据 canal: canal.instance.filter.regex 的字段规则,没有字段规则就是全量) Old []map[string]string `json:"old" binding:"omitempty"` // 更新前数据 (只存在被更新的字段) Source interface{} `json:"-" binding:"omitempty"` // 原始数据 }
func (*BinlogParams) UnmarshalJSON ¶
func (c *BinlogParams) UnmarshalJSON(bytes []byte) error
UnmarshalJSON 重写 json 解析方法,如果是更新事件,记录本次更新的字段
type DataFilterCondition ¶
type DataFilterCondition struct { Column string `json:"column,omitempty"` // 字段名 Operator string `json:"operator,omitempty"` // 运算符 Value string `json:"value,omitempty"` // 值 ValueColumn string `json:"value_column,omitempty"` // 值字段 value 和 value_column 同时只存在其中一个 Children map[string][]DataFilterCondition `json:"children,omitempty"` // 子条件 }
DataFilterCondition 数据规则条件
type Filter ¶
type Filter interface { InsertEventRecord(params *SyncParams, updatedColumns []string) error FilterColumns(params *SyncParams, columns []string) ([]string, error, bool) }
type SyncParams ¶
type SyncParams struct { Rule SyncRule `json:"rule"` // 只读,不用指针传递 Data map[string]string `json:"data"` Old map[string]string `json:"old"` RealEventType string `json:"real_event_type"` // 和 BinlogParams 的 EventType 重复,用于记录真实执行同步的事件类型 // contains filtered or unexported fields }
SyncParams 同步参数 一条 sql 可能修改多条数据,每条数据都会拆分成不同都子任务, 每个子任务都有自己都 SyncParams 所以 SyncParams 不涉及并发
func NewSyncParams ¶
func NewSyncParams(wg *syncWaitGroup, rule *SyncRule, data, old map[string]string, binLog *BinlogParams) *SyncParams
func (*SyncParams) Clone ¶
func (s *SyncParams) Clone(eventType string) *SyncParams
Clone 克隆一个新的同步数据,只有事件类型不同 主要用户软删除字段更新时,更新事件并更为 删除/插入事件
func (*SyncParams) GetBingLogParams ¶
func (s *SyncParams) GetBingLogParams() *BinlogParams
func (*SyncParams) GetJoinColumn ¶
func (s *SyncParams) GetJoinColumn() string
GetJoinColumn 当 SyncType 等于 SyncTypeInner 时只同步一个字段, 暂时缓存起来 同一个 SyncParams 只会被一个协程使用, 所以不存在并发问题
func (*SyncParams) GetUpdateValues ¶
func (s *SyncParams) GetUpdateValues(updatedColumns []string) interface{}
GetUpdateValues 获取当次更新的数据 格式: {"column": "value"}
func (*SyncParams) GetWg ¶
func (s *SyncParams) GetWg() *syncWaitGroup
func (*SyncParams) IsPrimaryKeyUpdated ¶
func (s *SyncParams) IsPrimaryKeyUpdated() bool
IsPrimaryKeyUpdated 判断主键 或关联字段是否更新
func (*SyncParams) MergeOldToData ¶
func (s *SyncParams) MergeOldToData() map[string]string
MergeOldToData 获取这条记录 更新之前的所有数据
func (*SyncParams) Recycle ¶
func (s *SyncParams) Recycle()
func (*SyncParams) SetBinLogParams ¶
func (s *SyncParams) SetBinLogParams(params *BinlogParams)
type SyncRule ¶
type SyncRule struct { Database string `json:"database" yaml:"database"` // 需要同步的库 Table string `json:"table" yaml:"table"` // 需要同步的表 PrimaryKey string `json:"primary_key" yaml:"primary_key"` // 来源表中主键名称 LockColumns []string `json:"lock_columns" yaml:"lock_columns"` // 加锁时 依赖的字段 Columns map[string]string `json:"columns" yaml:"columns"` // 字段映射表 local:target 格式 SystemColumns []string `json:"-" yaml:"-"` // 系统字段 (特殊逻辑) SoftDeleteField string `json:"soft_delete_field,omitempty" yaml:"soft_delete_field,omitempty"` // 软删除字段名称 为空代表不支持软删除 UnSoftDeleteValue string `json:"un_soft_delete_value" yaml:"un_soft_delete_value"` // 未软删除值 SoftDeleteField 不为空且作为key获取到的值不相等及被软删除 DataConditions map[string][]DataFilterCondition `json:"data_conditions,omitempty" yaml:"data_conditions,omitempty"` // 同步条件 key为 and或or 比对结果false 不同步 TargetType string `json:"-" yaml:"target_type"` // 目标类型 mysql|es Target string `json:"target" yaml:"target"` // 目标 mysql:connect.database.table es:connect.index TargetDatabase string `json:"-" yaml:"target_database"` TargetTable string `json:"-" yaml:"target_table"` //Type string `json:"type"` // 同步类型 stats:统计 sync:同步 SyncType string `json:"sync_type" yaml:"sync_type"` // 具体同步或统计类型 JoinFieldName string `json:"join_field_name,omitempty" yaml:"join_field_name,omitempty"` // 加入字段名 sync_type:join|inner 时存在 //SyncConditions []SyncCondition `json:"sync_conditions"` // 同步条件 只允许and条件 TargetExtraParams map[string]string `json:"target_extra_params,omitempty" yaml:"target_extra_params,omitempty"` // 目标额外参数,常量同步时一起写入目标表 }
SyncRule 同步规则
func (*SyncRule) EvaluateFilterConditions ¶
EvaluateFilterConditions 判断是否符合同步条件
func (*SyncRule) UnmarshalJSON ¶
Click to show internal directories.
Click to hide internal directories.