Documentation
¶
Index ¶
- func IsErrFailedCommit(err error) bool
- func IsErrInvalidValue(err error) bool
- type CachedSchemaRegistryClient
- func (cached *CachedSchemaRegistryClient) DeleteSubject(subject string) (versions []int, err error)
- func (cached *CachedSchemaRegistryClient) GetLatestSchema(subject string) (avro.Schema, error)
- func (cached *CachedSchemaRegistryClient) GetSchemaByID(id int) (avro.Schema, error)
- func (cached *CachedSchemaRegistryClient) GetSchemaBySubject(subject string, version int) (avro.Schema, error)
- func (cached *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, schema avro.Schema) (bool, schemaregistry.Schema, error)
- func (cached *CachedSchemaRegistryClient) RegisterNewSchema(subject string, schema avro.Schema) (int, error)
- func (cached *CachedSchemaRegistryClient) Subjects() ([]string, error)
- func (cached *CachedSchemaRegistryClient) Versions(subject string) ([]int, error)
- type Consumer
- type ConsumerOption
- type ErrFailedCommit
- type ErrInvalidValue
- type EventHandler
- type KafkaConsumer
- type KafkaProducer
- type Message
- type Producer
- type ProducerOption
- type SchemaRegistryClient
- type SharedOption
- type ValueFactory
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsErrFailedCommit ¶
func IsErrInvalidValue ¶
Types ¶
type CachedSchemaRegistryClient ¶
type CachedSchemaRegistryClient struct { SchemaRegistryClient *schemaregistry.Client // contains filtered or unexported fields }
CachedSchemaRegistryClient is a schema registry client that will cache some data to improve performance
func NewCachedSchemaRegistryClient ¶
func NewCachedSchemaRegistryClient(baseURL string, options ...schemaregistry.Option) (*CachedSchemaRegistryClient, error)
func (*CachedSchemaRegistryClient) DeleteSubject ¶
func (cached *CachedSchemaRegistryClient) DeleteSubject(subject string) (versions []int, err error)
DeleteSubject deletes the subject, should only be used in development
func (*CachedSchemaRegistryClient) GetLatestSchema ¶
func (cached *CachedSchemaRegistryClient) GetLatestSchema(subject string) (avro.Schema, error)
GetLatestSchema returns the highest version schema for a subject
func (*CachedSchemaRegistryClient) GetSchemaByID ¶
func (cached *CachedSchemaRegistryClient) GetSchemaByID(id int) (avro.Schema, error)
GetSchemaByID will return and cache the schema with the given id
func (*CachedSchemaRegistryClient) GetSchemaBySubject ¶
func (cached *CachedSchemaRegistryClient) GetSchemaBySubject(subject string, version int) (avro.Schema, error)
GetSchemaBySubject returns the schema for a specific version of a subject
func (*CachedSchemaRegistryClient) IsSchemaRegistered ¶
func (cached *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, schema avro.Schema) (bool, schemaregistry.Schema, error)
IsSchemaRegistered checks if a specific schema is already registered to a subject
func (*CachedSchemaRegistryClient) RegisterNewSchema ¶
func (cached *CachedSchemaRegistryClient) RegisterNewSchema(subject string, schema avro.Schema) (int, error)
RegisterNewSchema will return and cache the id with the given schema
func (*CachedSchemaRegistryClient) Subjects ¶
func (cached *CachedSchemaRegistryClient) Subjects() ([]string, error)
Subjects returns a list of subjects
type Consumer ¶
type Consumer struct { KafkaConsumer // contains filtered or unexported fields }
func NewConsumer ¶
func NewConsumer(topics []string, valueFactory ValueFactory, opts ...ConsumerOption) (*Consumer, error)
NewConsumer is a basic consumer to interact with schema registry, avro and kafka
func (*Consumer) EnsureTopics ¶
EnsureTopics returns error if one of the consumed topics was not found on the server.
type ConsumerOption ¶
type ConsumerOption interface {
// contains filtered or unexported methods
}
func WithEventHandler ¶
func WithEventHandler(handler EventHandler) ConsumerOption
func WithKafkaConsumer ¶
func WithKafkaConsumer(consumer KafkaConsumer) ConsumerOption
func WithoutTopicsCheck ¶
func WithoutTopicsCheck() ConsumerOption
type ErrFailedCommit ¶
type ErrFailedCommit struct {
Err error
}
func (ErrFailedCommit) Error ¶
func (e ErrFailedCommit) Error() string
func (ErrFailedCommit) Unwrap ¶
func (e ErrFailedCommit) Unwrap() error
type ErrInvalidValue ¶
type ErrInvalidValue struct {
Topic string
}
func (ErrInvalidValue) Error ¶
func (e ErrInvalidValue) Error() string
type EventHandler ¶
type KafkaConsumer ¶
type KafkaProducer ¶
type Producer ¶
type Producer struct { KafkaProducer // contains filtered or unexported fields }
func NewProducer ¶
func NewProducer( topicName string, partition int, keySchemaJSON, valueSchemaJSON string, opts ...ProducerOption, ) (*Producer, error)
NewProducer is a producer that publishes messages to kafka topic using avro serialization format
type ProducerOption ¶
type ProducerOption interface {
// contains filtered or unexported methods
}
func WithBackoff ¶
func WithBackoff(backOff backoff.BackOff) ProducerOption
func WithKafkaProducer ¶
func WithKafkaProducer(producer KafkaProducer) ProducerOption
type SchemaRegistryClient ¶
type SharedOption ¶
type SharedOption interface { ConsumerOption ProducerOption }
func WithAvroAPI ¶
func WithAvroAPI(api avro.API) SharedOption
func WithKafkaConfig ¶
func WithKafkaConfig(cfg *kafka.ConfigMap) SharedOption
func WithSchemaRegistryClient ¶
func WithSchemaRegistryClient(srClient SchemaRegistryClient) SharedOption
func WithSchemaRegistryURL ¶
func WithSchemaRegistryURL(url *url.URL) SharedOption
type ValueFactory ¶
type ValueFactory func(topic string) interface{}