Documentation
¶
Index ¶
- Constants
- Variables
- func NewMQ(network string, address []string) (mq.MQ, error)
- type Consumer
- type MQ
- func (m *MQ) Close() error
- func (m *MQ) Consumer(topic, groupID string) (mq.Consumer, error)
- func (m *MQ) CreateTopic(ctx context.Context, name string, partitions int) error
- func (m *MQ) DeleteTopics(ctx context.Context, topics ...string) error
- func (m *MQ) Producer(topic string) (mq.Producer, error)
- type Producer
- type SpecifiedPartitionBalancer
Constants ¶
View Source
const (
SpecifiedPartitionKey = "specifiedPartition"
)
Variables ¶
View Source
var ErrInvalidArgument = errors.New("invalid argument")
Functions ¶
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
type MQ ¶
type MQ struct {
// contains filtered or unexported fields
}
func (*MQ) CreateTopic ¶
func (*MQ) DeleteTopics ¶
DeleteTopics 删除topic
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
type SpecifiedPartitionBalancer ¶
type SpecifiedPartitionBalancer struct {
// contains filtered or unexported fields
}
SpecifiedPartitionBalancer 借助kafka客户端提供的Balancer接口实现向指定分区生产消息 如果在message.WriterData中找到指定的partition信息则直接用其作为目标partition 如果没有找到则使用默认负载均衡器计算目标partition
func NewSpecifiedPartitionBalancer ¶
func NewSpecifiedPartitionBalancer(defaultBalancer kafkago.Balancer) (*SpecifiedPartitionBalancer, error)
Click to show internal directories.
Click to hide internal directories.