Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var (
// ErrMessageSizeLimit indicate that message is rejected by server due to size limitation
ErrMessageSizeLimit = errors.New("message was too large, server rejected it to avoid allocation error")
)
Functions ¶
func CreateTLSConfig ¶ added in v0.6.0
func CreateTLSConfig(tlsConfig auth.TLS) (*tls.Config, error)
CreateTLSConfig return tls config
Types ¶
type Client ¶
type Client interface {
NewConsumer(appName, consumerName string, concurrency int) (Consumer, error)
NewConsumerWithClusterName(currentCluster, sourceCluster, consumerName string, concurrency int) (Consumer, error)
NewProducer(appName string) (Producer, error)
NewProducerWithClusterName(sourceCluster string) (Producer, error)
}
Client is the interface used to abstract out interaction with messaging system for replication
func NewKafkaClient ¶ added in v0.4.0
func NewKafkaClient(kc *KafkaConfig, metricsClient metrics.Client, zLogger *zap.Logger, logger log.Logger, metricScope tally.Scope,
checkCluster, checkApp bool) Client
NewKafkaClient is used to create an instance of KafkaClient
type CloseableProducer ¶ added in v0.27.0
type CloseableProducer interface {
Producer
Close() error
}
CloseableProducer is a Producer that can be closed
type ClusterConfig ¶
type ClusterConfig struct {
Brokers []string `yaml:"brokers"`
}
ClusterConfig describes the configuration for a single Kafka cluster
type Consumer ¶ added in v0.4.0
type Consumer interface {
// Start starts the consumer
Start() error
// Stop stops the consumer
Stop()
// Messages return the message channel for this consumer
Messages() <-chan Message
}
Consumer is the unified interface for both internal and external kafka clients
type KafkaConfig ¶
type KafkaConfig struct {
TLS auth.TLS `yaml:"tls"`
Clusters map[string]ClusterConfig `yaml:"clusters"`
Topics map[string]TopicConfig `yaml:"topics"`
ClusterToTopic map[string]TopicList `yaml:"temporal-cluster-topics"`
Applications map[string]TopicList `yaml:"applications"`
}
KafkaConfig describes the configuration needed to connect to all kafka clusters
type Message ¶ added in v0.4.0
type Message interface {
// Value is a mutable reference to the message's value
Value() []byte
// Partition is the ID of the partition from which the message was read.
Partition() int32
// Offset is the message's offset.
Offset() int64
// Ack marks the message as successfully processed.
Ack() error
// Nack marks the message processing as failed and the message will be retried or sent to DLQ.
Nack() error
}
Message is the unified interface for a Kafka message
type Producer ¶
type Producer interface {
Publish(message interface{}) error
}
Producer is the interface used to send replication tasks to other clusters through replicator
func NewKafkaProducer ¶
func NewKafkaProducer(topic string, producer sarama.SyncProducer, logger log.Logger) Producer
NewKafkaProducer is used to create the Kafka based producer implementation
func NewMetricProducer ¶ added in v0.5.0
func NewMetricProducer(producer Producer,
metricsClient metrics.Client) Producer
NewMetricProducer creates a new instance of producer that emits metrics
type TopicConfig ¶
type TopicConfig struct {
Cluster string `yaml:"cluster"`
}
TopicConfig describes the mapping from topic to Kafka cluster