Documentation
¶
Index ¶
- Variables
- func ConfigEntries(props config.KafkaProperties) []kafka.ConfigEntry
- func DefaultTopicConfigs(props config.KafkaProperties, topics ...string) []kafka.TopicConfig
- func KafkaError(err error) kafka.Error
- func NewDialer(cfg *config.KeyFile) *kafka.Dialer
- type Client
- func (c *Client) Close() error
- func (c *Client) CreateTopics(ctx context.Context, topics ...string) (api.TopicErrors, error)
- func (c *Client) DeleteTopics(ctx context.Context, topics ...string) (api.TopicErrors, error)
- func (c *Client) GetLogger() api.LoggerFunc
- func (c *Client) IsExistsError(err error) bool
- func (c *Client) NewReader(topic string) api.Reader
- func (c *Client) NewWriter() api.Writer
- func (c *Client) Read(ctx context.Context, topic string, partition int, offset *uint64) (api.Message, error)
- func (c *Client) SetLogger(fn api.LoggerFunc)
- func (c *Client) Write(ctx context.Context, topic string, msg ...api.Message) error
- type Message
- type Reader
- type Writer
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrOffsetRequired = errors.New("offset required") ErrInvalidOffset = errors.New("invalid offset") )
Functions ¶
func ConfigEntries ¶
func ConfigEntries(props config.KafkaProperties) []kafka.ConfigEntry
func DefaultTopicConfigs ¶
func DefaultTopicConfigs(props config.KafkaProperties, topics ...string) []kafka.TopicConfig
func KafkaError ¶
func KafkaError(err error) kafka.Error
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) CreateTopics ¶
func (*Client) DeleteTopics ¶
func (*Client) GetLogger ¶
func (c *Client) GetLogger() api.LoggerFunc
func (*Client) IsExistsError ¶
func (*Client) SetLogger ¶
func (c *Client) SetLogger(fn api.LoggerFunc)
Click to show internal directories.
Click to hide internal directories.