Documentation
¶
Index ¶
- func IsErrFailedCommit(err error) bool
- func IsErrInvalidValue(err error) bool
- type CachedSchemaRegistryClient
- func (cached *CachedSchemaRegistryClient) DeleteSubject(subject string) (versions []int, err error)
- func (cached *CachedSchemaRegistryClient) GetLatestSchema(subject string) (avro.Schema, error)
- func (cached *CachedSchemaRegistryClient) GetSchemaByID(id int) (avro.Schema, error)
- func (cached *CachedSchemaRegistryClient) GetSchemaBySubject(subject string, version int) (avro.Schema, error)
- func (cached *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, schema avro.Schema) (bool, schemaregistry.Schema, error)
- func (cached *CachedSchemaRegistryClient) RegisterNewSchema(subject string, schema avro.Schema) (int, error)
- func (cached *CachedSchemaRegistryClient) Subjects() ([]string, error)
- func (cached *CachedSchemaRegistryClient) Versions(subject string) ([]int, error)
- type Consumer
- type ConsumerOption
- type ErrFailedCommit
- type ErrInvalidValue
- type EventHandler
- type KafkaConsumer
- type KafkaProducer
- type Message
- type Producer
- type ProducerOption
- type SchemaRegistryClient
- type SharedOption
- type ValueFactory
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsErrFailedCommit ¶
func IsErrInvalidValue ¶
Types ¶
type CachedSchemaRegistryClient ¶
type CachedSchemaRegistryClient struct { SchemaRegistryClient *schemaregistry.Client // contains filtered or unexported fields }
CachedSchemaRegistryClient is a schema registry client that will cache some data to improve performance
func NewCachedSchemaRegistryClient ¶
func NewCachedSchemaRegistryClient(baseURL string, options ...schemaregistry.Option) (*CachedSchemaRegistryClient, error)
func (*CachedSchemaRegistryClient) DeleteSubject ¶
func (cached *CachedSchemaRegistryClient) DeleteSubject(subject string) (versions []int, err error)
DeleteSubject deletes the subject, should only be used in development
func (*CachedSchemaRegistryClient) GetLatestSchema ¶
func (cached *CachedSchemaRegistryClient) GetLatestSchema(subject string) (avro.Schema, error)
GetLatestSchema returns the highest version schema for a subject
func (*CachedSchemaRegistryClient) GetSchemaByID ¶
func (cached *CachedSchemaRegistryClient) GetSchemaByID(id int) (avro.Schema, error)
GetSchemaByID will return and cache the schema with the given id
func (*CachedSchemaRegistryClient) GetSchemaBySubject ¶
func (cached *CachedSchemaRegistryClient) GetSchemaBySubject(subject string, version int) (avro.Schema, error)
GetSchemaBySubject returns the schema for a specific version of a subject
func (*CachedSchemaRegistryClient) IsSchemaRegistered ¶
func (cached *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, schema avro.Schema) (bool, schemaregistry.Schema, error)
IsSchemaRegistered checks if a specific schema is already registered to a subject
func (*CachedSchemaRegistryClient) RegisterNewSchema ¶
func (cached *CachedSchemaRegistryClient) RegisterNewSchema(subject string, schema avro.Schema) (int, error)
RegisterNewSchema will return and cache the id with the given schema
func (*CachedSchemaRegistryClient) Subjects ¶
func (cached *CachedSchemaRegistryClient) Subjects() ([]string, error)
Subjects returns a list of subjects
type Consumer ¶
type Consumer struct { KafkaConsumer // contains filtered or unexported fields }
func NewConsumer ¶
func NewConsumer(topics []string, valueFactory ValueFactory, opts ...ConsumerOption) (*Consumer, error)
NewConsumer is a basic consumer to interact with schema registry, avro and kafka
Example ¶
package main import ( "log" "net/url" "github.com/confluentinc/confluent-kafka-go/kafka" kafkaavro "github.com/mycujoo/go-kafka-avro/v2" ) func main() { srURL, err := url.Parse("http://localhost:8081") if err != nil { log.Fatal(err) } type val struct { FieldName string `avro:"field_name"` } c, err := kafkaavro.NewConsumer( []string{"topic1"}, func(topic string) interface{} { return val{} }, kafkaavro.WithKafkaConfig(&kafka.ConfigMap{ "bootstrap.servers": "localhost:29092", "security.protocol": "ssl", "socket.keepalive.enable": true, "enable.auto.commit": false, "ssl.key.location": "/path/to/service.key", "ssl.certificate.location": "/path/to/service.cert", "ssl.ca.location": "/path/to/ca.pem", "group.id": "some-group-id", "session.timeout.ms": 6000, "auto.offset.reset": "earliest", }), kafkaavro.WithSchemaRegistryURL(srURL), kafkaavro.WithEventHandler(func(event kafka.Event) { log.Println(event) }), ) for { msg, err := c.ReadMessage(5000) if err != nil { log.Println("Error", err) continue } if msg == nil { continue } switch v := msg.Value.(type) { case val: log.Println(v) } } }
Output:
func (*Consumer) EnsureTopics ¶
EnsureTopics returns error if one of the consumed topics was not found on the server.
type ConsumerOption ¶
type ConsumerOption interface {
// contains filtered or unexported methods
}
func WithEventHandler ¶
func WithEventHandler(handler EventHandler) ConsumerOption
func WithKafkaConsumer ¶
func WithKafkaConsumer(consumer KafkaConsumer) ConsumerOption
func WithoutTopicsCheck ¶
func WithoutTopicsCheck() ConsumerOption
type ErrFailedCommit ¶
type ErrFailedCommit struct {
Err error
}
func (ErrFailedCommit) Error ¶
func (e ErrFailedCommit) Error() string
func (ErrFailedCommit) Unwrap ¶
func (e ErrFailedCommit) Unwrap() error
type ErrInvalidValue ¶
type ErrInvalidValue struct {
Topic string
}
func (ErrInvalidValue) Error ¶
func (e ErrInvalidValue) Error() string
type EventHandler ¶
type KafkaConsumer ¶
type KafkaProducer ¶
type Producer ¶
type Producer struct { KafkaProducer // contains filtered or unexported fields }
func NewProducer ¶
func NewProducer( topicName string, keySchemaJSON, valueSchemaJSON string, opts ...ProducerOption, ) (*Producer, error)
NewProducer is a producer that publishes messages to kafka topic using avro serialization format
type ProducerOption ¶
type ProducerOption interface {
// contains filtered or unexported methods
}
func WithBackoff ¶
func WithBackoff(backOff backoff.BackOff) ProducerOption
func WithKafkaProducer ¶
func WithKafkaProducer(producer KafkaProducer) ProducerOption
type SchemaRegistryClient ¶
type SharedOption ¶
type SharedOption interface { ConsumerOption ProducerOption }
func WithAvroAPI ¶
func WithAvroAPI(api avro.API) SharedOption
func WithKafkaConfig ¶
func WithKafkaConfig(cfg *kafka.ConfigMap) SharedOption
func WithSchemaRegistryClient ¶
func WithSchemaRegistryClient(srClient SchemaRegistryClient) SharedOption
func WithSchemaRegistryURL ¶
func WithSchemaRegistryURL(url *url.URL) SharedOption
type ValueFactory ¶
type ValueFactory func(topic string) interface{}