Documentation
¶
Index ¶
- Variables
- func GetKafkaDialer(options ...DialerOption) *kafka.Dialer
- func NewGroupReader(brokers []string, topic, groupID string, options ...ReaderOption) (r *kafka.Reader, e error)
- func NewPartitionReader(brokers []string, topic string, partition int, options ...ReaderOption) (r *kafka.Reader, e error)
- func NewPartitionWriter(brokers []string, topic string, partition int, options ...WriterOption) (w *kafka.Writer, e error)
- func NewWriter(brokers []string, topic string, options ...WriterOption) (w *kafka.Writer, e error)
- func SetLog(l logger.Logger)
- type DialerOption
- type PartitionBalancer
- type ReaderOption
- type WriterOption
- func WriterWithAsync() WriterOption
- func WriterWithBatchBytes(batchBytes int) WriterOption
- func WriterWithBatchSize(batchSize int) WriterOption
- func WriterWithBatchTimeout(batchTimeout time.Duration) WriterOption
- func WriterWithDialer(dialer *kafka.Dialer) WriterOption
- func WriterWithLogger(debugLogger, errorLogger kafka.Logger) WriterOption
Constants ¶
This section is empty.
Variables ¶
View Source
var ( KLogger = &debugLogger{log} KErrorLogger = &errorLogger{log} )
Functions ¶
func GetKafkaDialer ¶ added in v1.2.0
func GetKafkaDialer(options ...DialerOption) *kafka.Dialer
func NewGroupReader ¶
func NewPartitionReader ¶
func NewPartitionWriter ¶
Types ¶
type DialerOption ¶ added in v1.2.0
type DialerOption func(*kafka.Dialer)
func DialerWithCaCertTls ¶ added in v1.2.0
func DialerWithCaCertTls(file string) DialerOption
func DialerWithUserAndPasswd ¶ added in v1.2.0
func DialerWithUserAndPasswd(user, passwd string) DialerOption
type PartitionBalancer ¶
type PartitionBalancer struct {
// contains filtered or unexported fields
}
partitionBalancer
func (*PartitionBalancer) Balance ¶
func (p *PartitionBalancer) Balance(_ kafka.Message, _ ...int) (partition int)
type ReaderOption ¶
type ReaderOption func(*kafka.ReaderConfig)
func ReaderWithCommitInterval ¶ added in v1.2.0
func ReaderWithCommitInterval(commitInterval time.Duration) ReaderOption
ReaderWithCommitInterval be careful commitInterval may cause msg repeated consume when server error exit.
func ReaderWithDialer ¶
func ReaderWithDialer(dialer *kafka.Dialer) ReaderOption
func ReaderWithMaxBytes ¶ added in v1.2.0
func ReaderWithMaxBytes(maxBytes int) ReaderOption
func ReaderWithMinBytes ¶ added in v1.2.0
func ReaderWithMinBytes(minBytes int) ReaderOption
func ReaderWithOffset ¶
func ReaderWithOffset(offset int64) ReaderOption
type WriterOption ¶
type WriterOption func(*kafka.WriterConfig)
func WriterWithAsync ¶
func WriterWithAsync() WriterOption
func WriterWithBatchBytes ¶ added in v1.2.0
func WriterWithBatchBytes(batchBytes int) WriterOption
func WriterWithBatchSize ¶ added in v1.2.0
func WriterWithBatchSize(batchSize int) WriterOption
func WriterWithBatchTimeout ¶ added in v1.2.0
func WriterWithBatchTimeout(batchTimeout time.Duration) WriterOption
func WriterWithDialer ¶ added in v1.2.0
func WriterWithDialer(dialer *kafka.Dialer) WriterOption
func WriterWithLogger ¶ added in v1.2.0
func WriterWithLogger(debugLogger, errorLogger kafka.Logger) WriterOption
Click to show internal directories.
Click to hide internal directories.