Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChangeMessage ¶
type ChangeMessage struct { Schema struct { Type string `json:"type"` Fields []struct { Type string `json:"type"` Fields []struct { Type string `json:"type"` Optional bool `json:"optional"` Field string `json:"field"` } `json:"fields,omitempty"` Optional bool `json:"optional"` Name string `json:"name,omitempty"` Field string `json:"field"` } `json:"fields"` Optional bool `json:"optional"` Name string `json:"name"` } `json:"schema"` Payload struct { Before json.RawMessage `json:"before"` After json.RawMessage `json:"after"` Source struct { Version string `json:"version"` Connector string `json:"connector"` Name string `json:"name"` TsMs int64 `json:"ts_ms"` Snapshot string `json:"snapshot"` Db string `json:"db"` Schema string `json:"schema"` Table string `json:"table"` TxID int `json:"txId"` Lsn int `json:"lsn"` Xmin interface{} `json:"xmin"` } `json:"source"` Op string `json:"op"` TsMs int64 `json:"ts_ms"` } `json:"payload"` }
ChangeMessage ...
type Filter ¶
Filter is responsible for reading the change stream, filtering out the events that are not interesting to us and writing new messages based on the changes to the filtered topic
func NewFilter ¶
func NewFilter(kafkaAddress, kafkaGroupID, changesTopic, filteredTopic string, filter FilterFunc) *Filter
NewFilter returns an initilized Filter
type FilterFunc ¶
type FilterFunc func(*ChangeMessage) ([]kafka.Message, error)
FilterFunc given a ChangeMessage from the changesTopic returns zero, one or multiple kafka Messages that should be written to the filteredTopic
Click to show internal directories.
Click to hide internal directories.