Documentation
¶
Index ¶
- Constants
- Variables
- func GetSink() api.Sink
- func GetSource() api.Source
- type KafkaCollectStats
- type KafkaSink
- func (k *KafkaSink) Close(ctx api.StreamContext) error
- func (k *KafkaSink) Collect(ctx api.StreamContext, item api.MessageTuple) (err error)
- func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) (err error)
- func (k *KafkaSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
- func (k *KafkaSink) Ping(ctx api.StreamContext, props map[string]any) error
- func (k *KafkaSink) Provision(ctx api.StreamContext, configs map[string]any) error
- func (k *KafkaSink) ResetStats()
- type KafkaSource
- func (k *KafkaSource) Close(ctx api.StreamContext) error
- func (k *KafkaSource) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
- func (k *KafkaSource) GetOffset() (interface{}, error)
- func (k *KafkaSource) Ping(ctx api.StreamContext, props map[string]any) error
- func (k *KafkaSource) Provision(ctx api.StreamContext, configs map[string]any) error
- func (k *KafkaSource) ResetOffset(input map[string]interface{}) error
- func (k *KafkaSource) Rewind(offset interface{}) error
- func (k *KafkaSource) Subscribe(ctx api.StreamContext, ingest api.BytesIngest, ingestError api.ErrorIngest) error
Constants ¶
View Source
const ( LblUnmarshal = "unmarshal" LblCollect = "collect" LblReq = "req" LblKafka = "kafka" LblMsg = "msg" )
View Source
const ( SASL_NONE = "none" SASL_PLAIN = "plain" SASL_SCRAM = "scram" )
View Source
const (
LblTarget = "target"
)
Variables ¶
View Source
var ( KafkaSinkCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "kuiper", Subsystem: "kafka_sink", Name: "counter", Help: "counter of Kafka Sink IO", }, []string{metrics.LblType, LblTarget, metrics.LblRuleIDType, metrics.LblOpIDType}) KafkaSinkCollectDurationHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "kuiper", Subsystem: "kafka_sink", Name: "collect_duration_hist", Help: "Sink Historgram Duration of IO", Buckets: prometheus.ExponentialBuckets(10, 2, 20), }, []string{metrics.LblType, LblTarget, metrics.LblRuleIDType, metrics.LblOpIDType}) )
Functions ¶
Types ¶
type KafkaCollectStats ¶ added in v2.0.7
type KafkaSink ¶
type KafkaSink struct { LastStats kafkago.WriterStats LastCollectStats *KafkaCollectStats // contains filtered or unexported fields }
func (*KafkaSink) Collect ¶
func (k *KafkaSink) Collect(ctx api.StreamContext, item api.MessageTuple) (err error)
func (*KafkaSink) CollectList ¶
func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) (err error)
func (*KafkaSink) Connect ¶
func (k *KafkaSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
func (*KafkaSink) ResetStats ¶ added in v2.0.7
func (k *KafkaSink) ResetStats()
type KafkaSource ¶
type KafkaSource struct {
// contains filtered or unexported fields
}
func (*KafkaSource) Close ¶
func (k *KafkaSource) Close(ctx api.StreamContext) error
func (*KafkaSource) Connect ¶
func (k *KafkaSource) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
func (*KafkaSource) GetOffset ¶
func (k *KafkaSource) GetOffset() (interface{}, error)
func (*KafkaSource) Ping ¶
func (k *KafkaSource) Ping(ctx api.StreamContext, props map[string]any) error
func (*KafkaSource) Provision ¶
func (k *KafkaSource) Provision(ctx api.StreamContext, configs map[string]any) error
func (*KafkaSource) ResetOffset ¶
func (k *KafkaSource) ResetOffset(input map[string]interface{}) error
func (*KafkaSource) Rewind ¶
func (k *KafkaSource) Rewind(offset interface{}) error
func (*KafkaSource) Subscribe ¶
func (k *KafkaSource) Subscribe(ctx api.StreamContext, ingest api.BytesIngest, ingestError api.ErrorIngest) error
Click to show internal directories.
Click to hide internal directories.