Documentation
¶
Overview ¶
Работа с kafka
Используется библиотека github.com/confluentinc/confluent-kafka-go/kafka, основанная на нативном клиенте.
Так как под windows такового нет, то под ними работать не будет. Ну и не надо. Но если приспичит, есть другие библиотеки, где клиент реализован на go. Но они, по очевидным причинам, сильно медленнее и прожорливее по памяти.
Index ¶
- Constants
- Variables
- func LibraryVersion() (version string)
- type AdminClient
- type AssignedPartitions
- type AssignedPartitionsList
- type Config
- func (c *Config) Check(cfg any) (err error)
- func (c *Config) NewAdmin() (client *AdminClient, err error)
- func (c *Config) NewAdminEx(extra misc.InterfaceMap) (client *AdminClient, err error)
- func (c *Config) NewConsumer() (client *Consumer, err error)
- func (c *Config) NewConsumerEx(extra misc.InterfaceMap) (client *Consumer, err error)
- func (c *Config) NewProducer() (client *Producer, err error)
- func (c *Config) NewProducerEx(extra misc.InterfaceMap) (client *Producer, err error)
- type Consumer
- func (c *Consumer) AssignedPartitions() AssignedPartitions
- func (c *Consumer) Close()
- func (c *Consumer) Commit(message *Message) (err error)
- func (c *Consumer) Offsets(topics []string) (offsets []Offset, err error)
- func (c *Consumer) Read(timeout time.Duration) (message *Message, err error)
- func (c *Consumer) Seek(topic string, offset Offset) (err error)
- func (c *Consumer) Subscribe(topics []string) (err error)
- func (c *Consumer) Unsubscribe() (err error)
- func (c *Consumer) WaitingForAssign()
- type ConsumerTopicConfig
- type Error
- type Message
- type Messages
- type Metadata
- type Offset
- type Producer
- type ProducerTopicConfig
Constants ¶
const ( // Смешение - начало OffsetBeginning = Offset(kafka.OffsetBeginning) // Смещение - конец OffsetEnd = Offset(kafka.OffsetEnd) // Смещение - сохраненное в kafka OffsetStored = Offset(kafka.OffsetStored) // PartitionAny represents any partition (for partitioning), // or unspecified value (for all other cases) PartitionAny = kafka.PartitionAny )
Variables ¶
var ( // Log facility Log = log.NewFacility("kafka") // Ошибка - конец данных ErrPartitionEOF = errors.New("partition EOF") )
Functions ¶
Types ¶
type AdminClient ¶
type AdminClient struct {
// contains filtered or unexported fields
}
Админский клиент
func (*AdminClient) CreateTopic ¶
func (c *AdminClient) CreateTopic(name string, topic *ProducerTopicConfig) (err error)
Создать топик
func (*AdminClient) DeleteTopic ¶
func (c *AdminClient) DeleteTopic(topic string) (err error)
Удалить топик
func (*AdminClient) GetMetadata ¶
func (c *AdminClient) GetMetadata(topic string) (m *Metadata, err error)
Получить метаданные для топика. Если передано пустое имя, то всех.
type AssignedPartitions ¶ added in v0.1.34
type AssignedPartitions map[string]AssignedPartitionsList
type AssignedPartitionsList ¶ added in v0.1.34
type AssignedPartitionsList []int
type Config ¶
type Config struct { Servers string `toml:"servers"` // Список kafka серверов User string `toml:"user"` // Пользователь Password string `toml:"password"` // Пароль Timeout config.Duration `toml:"timeout"` // Таймаут RetryTimeout config.Duration `toml:"retry-timeout"` // Таймаут повторной отправки MaxRequestSize int `toml:"max-request-size"` // Максимальный размер сообщения Group string `toml:"group"` // Группа для консьюмера AutoCommit bool `toml:"auto-commit"` // Использовать auto commit для консьюмера? ConsumeInSeparateThreads bool `toml:"consume-in-separate-threads"` // Обрабатывать каждый топик в отдельном потоке ProducerTopics map[string]*ProducerTopicConfig `toml:"producer-topics"` // Список топиков продюсера с их параметрами map[virtualName]*config ConsumerTopics map[string]*ConsumerTopicConfig `toml:"consumer-topics"` // Список топиков консьюмера с их параметрами map[virtualName]*config Consumer *Consumer `toml:"-"` Producer *Producer `toml:"-"` }
Конфигурация
func (*Config) NewAdmin ¶
func (c *Config) NewAdmin() (client *AdminClient, err error)
Создать новое админское соединение
func (*Config) NewAdminEx ¶ added in v0.1.15
func (c *Config) NewAdminEx(extra misc.InterfaceMap) (client *AdminClient, err error)
func (*Config) NewConsumer ¶
Создать новое консьюмерское соединение
func (*Config) NewConsumerEx ¶ added in v0.1.15
func (c *Config) NewConsumerEx(extra misc.InterfaceMap) (client *Consumer, err error)
func (*Config) NewProducer ¶
Создать новое продюсерское соединение
func (*Config) NewProducerEx ¶ added in v0.1.15
func (c *Config) NewProducerEx(extra misc.InterfaceMap) (client *Producer, err error)
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
консьюмер
func (*Consumer) AssignedPartitions ¶ added in v0.1.34
func (c *Consumer) AssignedPartitions() AssignedPartitions
func (*Consumer) Unsubscribe ¶ added in v0.1.3
Отписаться от всех подписок
func (*Consumer) WaitingForAssign ¶ added in v0.1.19
func (c *Consumer) WaitingForAssign()
Ожидание получения первого AssignedPartitions
type ConsumerTopicConfig ¶ added in v0.1.1
type ConsumerTopicConfig struct { Active bool `toml:"active"` // Активный? Type string `toml:"type"` // Тип топика. Произвольное необязательное значение на усмотрение разработчика Encoding string `toml:"encoding"` // Формат данных Extra any `toml:"extra"` // Произвольные дополнительные данные }
Параметры топика консьюмера
func (*ConsumerTopicConfig) Check ¶ added in v0.1.1
func (c *ConsumerTopicConfig) Check(cfg any) (err error)
Проверка валидности ProducerTopicConfig
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Продюсер
func (*Producer) SaveMessages ¶
Сохранить сообщения в kafka
type ProducerTopicConfig ¶ added in v0.1.1
type ProducerTopicConfig struct { Active bool `toml:"active"` // Активный? Type string `toml:"type"` // Тип топика. Произвольное необязательное значение на усмотрение разработчика NumPartitions int `toml:"num-partitions"` // Количество партиций при создании ReplicationFactor int `toml:"replication-factor"` // Фактор репликации при создании RetentionTime config.Duration `toml:"retention-time"` // Время жизни данных RetentionSize int64 `toml:"retention-size"` // Максимальный размер для очистки по размеру Extra any `toml:"extra"` // Произвольные дополнительные данные }
Параметры топика продюсера
func (*ProducerTopicConfig) Check ¶ added in v0.1.1
func (c *ProducerTopicConfig) Check(cfg any) (err error)
Проверка валидности ProducerTopicConfig