kafkaavro

package
v0.0.0-...-ff1bf79 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2021 License: MIT, Apache-2.0 Imports: 11 Imported by: 0

README

Credits

kafka/avro code origins from https://github.com/mycujoo/go-kafka-avro from mycujoo.tv "Democratizing football broadcasting."

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsErrFailedCommit

func IsErrFailedCommit(err error) bool

func IsErrInvalidValue

func IsErrInvalidValue(err error) bool

Types

type CachedSchemaRegistryClient

type CachedSchemaRegistryClient struct {
	SchemaRegistryClient *schemaregistry.Client
	// contains filtered or unexported fields
}

CachedSchemaRegistryClient is a schema registry client that will cache some data to improve performance

func NewCachedSchemaRegistryClient

func NewCachedSchemaRegistryClient(baseURL string, options ...schemaregistry.Option) (*CachedSchemaRegistryClient, error)

func (*CachedSchemaRegistryClient) DeleteSubject

func (cached *CachedSchemaRegistryClient) DeleteSubject(subject string) (versions []int, err error)

DeleteSubject deletes the subject, should only be used in development

func (*CachedSchemaRegistryClient) GetLatestSchema

func (cached *CachedSchemaRegistryClient) GetLatestSchema(subject string) (avro.Schema, error)

GetLatestSchema returns the highest version schema for a subject

func (*CachedSchemaRegistryClient) GetSchemaByID

func (cached *CachedSchemaRegistryClient) GetSchemaByID(id int) (avro.Schema, error)

GetSchemaByID will return and cache the schema with the given id

func (*CachedSchemaRegistryClient) GetSchemaBySubject

func (cached *CachedSchemaRegistryClient) GetSchemaBySubject(subject string, version int) (avro.Schema, error)

GetSchemaBySubject returns the schema for a specific version of a subject

func (*CachedSchemaRegistryClient) IsSchemaRegistered

func (cached *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, schema avro.Schema) (bool, schemaregistry.Schema, error)

IsSchemaRegistered checks if a specific schema is already registered to a subject

func (*CachedSchemaRegistryClient) RegisterNewSchema

func (cached *CachedSchemaRegistryClient) RegisterNewSchema(subject string, schema avro.Schema) (int, error)

RegisterNewSchema will return and cache the id with the given schema

func (*CachedSchemaRegistryClient) Subjects

func (cached *CachedSchemaRegistryClient) Subjects() ([]string, error)

Subjects returns a list of subjects

func (*CachedSchemaRegistryClient) Versions

func (cached *CachedSchemaRegistryClient) Versions(subject string) ([]int, error)

Versions returns a list of all versions of a subject

type Consumer

type Consumer struct {
	KafkaConsumer
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(topics []string, valueFactory ValueFactory, opts ...ConsumerOption) (*Consumer, error)

NewConsumer is a basic consumer to interact with schema registry, avro and kafka

func (*Consumer) EnsureTopics

func (ac *Consumer) EnsureTopics(topics []string) error

EnsureTopics returns error if one of the consumed topics was not found on the server.

func (*Consumer) FetchMessage

func (ac *Consumer) FetchMessage(timeoutMs int) (*Message, error)

func (*Consumer) ReadMessage

func (ac *Consumer) ReadMessage(timeoutMs int) (*Message, error)

type ConsumerOption

type ConsumerOption interface {
	// contains filtered or unexported methods
}

func WithEventHandler

func WithEventHandler(handler EventHandler) ConsumerOption

func WithKafkaConsumer

func WithKafkaConsumer(consumer KafkaConsumer) ConsumerOption

func WithoutTopicsCheck

func WithoutTopicsCheck() ConsumerOption

type ErrFailedCommit

type ErrFailedCommit struct {
	Err error
}

func (ErrFailedCommit) Error

func (e ErrFailedCommit) Error() string

func (ErrFailedCommit) Unwrap

func (e ErrFailedCommit) Unwrap() error

type ErrInvalidValue

type ErrInvalidValue struct {
	Topic string
}

func (ErrInvalidValue) Error

func (e ErrInvalidValue) Error() string

type EventHandler

type EventHandler func(event kafka.Event)

type KafkaConsumer

type KafkaConsumer interface {
	CommitMessage(m *kafka.Message) ([]kafka.TopicPartition, error)
	SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error)
	Poll(timeoutMs int) kafka.Event
	GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
}

type KafkaProducer

type KafkaProducer interface {
	Close()
	Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error
}

type Message

type Message struct {
	*kafka.Message
	Value interface{}
}

type Producer

type Producer struct {
	KafkaProducer
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(
	topicName string,
	partition int,
	keySchemaJSON, valueSchemaJSON string,
	opts ...ProducerOption,
) (*Producer, error)

NewProducer is a producer that publishes messages to kafka topic using avro serialization format

func (*Producer) Produce

func (ap *Producer) Produce(key interface{}, value interface{}, deliveryChan chan kafka.Event) error

func (*Producer) ProduceFast

func (ap *Producer) ProduceFast(key interface{}, binaryValue []byte, deliveryChan chan kafka.Event) error

Produce will try to publish message to a topic. If deliveryChan is provided then function will return immediately, otherwise it will wait for delivery

type ProducerOption

type ProducerOption interface {
	// contains filtered or unexported methods
}

func WithBackoff

func WithBackoff(backOff backoff.BackOff) ProducerOption

func WithKafkaProducer

func WithKafkaProducer(producer KafkaProducer) ProducerOption

type SchemaRegistryClient

type SchemaRegistryClient interface {
	GetSchemaByID(id int) (avro.Schema, error)
	RegisterNewSchema(subject string, schema avro.Schema) (int, error)
}

type SharedOption

type SharedOption interface {
	ConsumerOption
	ProducerOption
}

func WithAvroAPI

func WithAvroAPI(api avro.API) SharedOption

func WithKafkaConfig

func WithKafkaConfig(cfg *kafka.ConfigMap) SharedOption

func WithSchemaRegistryClient

func WithSchemaRegistryClient(srClient SchemaRegistryClient) SharedOption

func WithSchemaRegistryURL

func WithSchemaRegistryURL(url *url.URL) SharedOption

type ValueFactory

type ValueFactory func(topic string) interface{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳