kafkaavro

package module
v2.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2022 License: Apache-2.0 Imports: 11 Imported by: 1

README

go-kafka-avro

A wrapper for Confluent's libraries for Apache Kafka and Schema Registry.

Installation

First install dependencies:

go get github.com/confluentinc/confluent-kafka-go github.com/landoop/schema-registry

To install use go get:

go get github.com/mycujoo/go-kafka-avro

Usage

By default this library would fetch configuration from environment variables. But you can customize everything using options.

Consumer
c, err := kafkaavro.NewConsumer(
    []string{"topic1"},
    func(topic string) interface{} {
        return val{}
    },
    kafkaavro.WithKafkaConfig(&kafka.ConfigMap{
        "bootstrap.servers":        "localhost:29092",
        "security.protocol":        "ssl",
        "socket.keepalive.enable":  true,
        "enable.auto.commit":       false,
        "ssl.key.location":         "/path/to/service.key",
        "ssl.certificate.location": "/path/to/service.cert",
        "ssl.ca.location":          "/path/to/ca.pem",
        "group.id":                 "some-group-id",
        "session.timeout.ms":       6000,
        "auto.offset.reset":        "earliest",
    }),
    kafkaavro.WithSchemaRegistryURL(srURL),
    kafkaavro.WithEventHandler(func(event kafka.Event) {
        log.Println(event)
    }),
)

for {
    msg, err := c.ReadMessage(5000)
    if err != nil {
        log.Println("Error", err)
        continue
    }
    if msg == nil {
        continue
    }
    switch v := msg.Value.(type) {
    case val:
        log.Println(v)
    }
}
Producer
producer, err := kafkaavro.NewProducer(
    "topic",
    `"string"`,
    `{"type": "record", "name": "test", "fields" : [{"name": "val", "type": "int", "default": 0}]}`,
)

Publish message using Produce method:

err = producer.Produce("key", "value", nil)

If you provide deliverChan then call will not be blocking until delivery.

Some code for cached schema registry client was based on https://github.com/dangkaka/go-kafka-avro implementation.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsErrFailedCommit

func IsErrFailedCommit(err error) bool

func IsErrInvalidValue

func IsErrInvalidValue(err error) bool

Types

type CachedSchemaRegistryClient

type CachedSchemaRegistryClient struct {
	SchemaRegistryClient *schemaregistry.Client
	// contains filtered or unexported fields
}

CachedSchemaRegistryClient is a schema registry client that will cache some data to improve performance

func NewCachedSchemaRegistryClient

func NewCachedSchemaRegistryClient(baseURL string, options ...schemaregistry.Option) (*CachedSchemaRegistryClient, error)

func (*CachedSchemaRegistryClient) DeleteSubject

func (cached *CachedSchemaRegistryClient) DeleteSubject(subject string) (versions []int, err error)

DeleteSubject deletes the subject, should only be used in development

func (*CachedSchemaRegistryClient) GetLatestSchema

func (cached *CachedSchemaRegistryClient) GetLatestSchema(subject string) (avro.Schema, error)

GetLatestSchema returns the highest version schema for a subject

func (*CachedSchemaRegistryClient) GetSchemaByID

func (cached *CachedSchemaRegistryClient) GetSchemaByID(id int) (avro.Schema, error)

GetSchemaByID will return and cache the schema with the given id

func (*CachedSchemaRegistryClient) GetSchemaBySubject

func (cached *CachedSchemaRegistryClient) GetSchemaBySubject(subject string, version int) (avro.Schema, error)

GetSchemaBySubject returns the schema for a specific version of a subject

func (*CachedSchemaRegistryClient) IsSchemaRegistered

func (cached *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, schema avro.Schema) (bool, schemaregistry.Schema, error)

IsSchemaRegistered checks if a specific schema is already registered to a subject

func (*CachedSchemaRegistryClient) RegisterNewSchema

func (cached *CachedSchemaRegistryClient) RegisterNewSchema(subject string, schema avro.Schema) (int, error)

RegisterNewSchema will return and cache the id with the given schema

func (*CachedSchemaRegistryClient) Subjects

func (cached *CachedSchemaRegistryClient) Subjects() ([]string, error)

Subjects returns a list of subjects

func (*CachedSchemaRegistryClient) Versions

func (cached *CachedSchemaRegistryClient) Versions(subject string) ([]int, error)

Versions returns a list of all versions of a subject

type Consumer

type Consumer struct {
	KafkaConsumer
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(topics []string, valueFactory ValueFactory, opts ...ConsumerOption) (*Consumer, error)

NewConsumer is a basic consumer to interact with schema registry, avro and kafka

Example
package main

import (
	"log"
	"net/url"

	"github.com/confluentinc/confluent-kafka-go/kafka"

	kafkaavro "github.com/mycujoo/go-kafka-avro/v2"
)

func main() {
	srURL, err := url.Parse("http://localhost:8081")
	if err != nil {
		log.Fatal(err)
	}

	type val struct {
		FieldName string `avro:"field_name"`
	}

	c, err := kafkaavro.NewConsumer(
		[]string{"topic1"},
		func(topic string) interface{} {
			return val{}
		},
		kafkaavro.WithKafkaConfig(&kafka.ConfigMap{
			"bootstrap.servers":        "localhost:29092",
			"security.protocol":        "ssl",
			"socket.keepalive.enable":  true,
			"enable.auto.commit":       false,
			"ssl.key.location":         "/path/to/service.key",
			"ssl.certificate.location": "/path/to/service.cert",
			"ssl.ca.location":          "/path/to/ca.pem",
			"group.id":                 "some-group-id",
			"session.timeout.ms":       6000,
			"auto.offset.reset":        "earliest",
		}),
		kafkaavro.WithSchemaRegistryURL(srURL),
		kafkaavro.WithEventHandler(func(event kafka.Event) {
			log.Println(event)
		}),
	)

	for {
		msg, err := c.ReadMessage(5000)
		if err != nil {
			log.Println("Error", err)
			continue
		}
		if msg == nil {
			continue
		}
		switch v := msg.Value.(type) {
		case val:
			log.Println(v)
		}
	}

}
Output:

func (*Consumer) EnsureTopics

func (ac *Consumer) EnsureTopics(topics []string) error

EnsureTopics returns error if one of the consumed topics was not found on the server.

func (*Consumer) FetchMessage

func (ac *Consumer) FetchMessage(timeoutMs int) (*Message, error)

func (*Consumer) ReadMessage

func (ac *Consumer) ReadMessage(timeoutMs int) (*Message, error)

type ConsumerOption

type ConsumerOption interface {
	// contains filtered or unexported methods
}

func WithEventHandler

func WithEventHandler(handler EventHandler) ConsumerOption

func WithKafkaConsumer

func WithKafkaConsumer(consumer KafkaConsumer) ConsumerOption

func WithoutTopicsCheck

func WithoutTopicsCheck() ConsumerOption

type ErrFailedCommit

type ErrFailedCommit struct {
	Err error
}

func (ErrFailedCommit) Error

func (e ErrFailedCommit) Error() string

func (ErrFailedCommit) Unwrap

func (e ErrFailedCommit) Unwrap() error

type ErrInvalidValue

type ErrInvalidValue struct {
	Topic string
}

func (ErrInvalidValue) Error

func (e ErrInvalidValue) Error() string

type EventHandler

type EventHandler func(event kafka.Event)

type KafkaConsumer

type KafkaConsumer interface {
	CommitMessage(m *kafka.Message) ([]kafka.TopicPartition, error)
	SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error)
	Poll(timeoutMs int) kafka.Event
	GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
}

type KafkaProducer

type KafkaProducer interface {
	Close()
	Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error
}

type Message

type Message struct {
	*kafka.Message
	Value interface{}
}

type Producer

type Producer struct {
	KafkaProducer
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(
	topicName string,
	keySchemaJSON, valueSchemaJSON string,
	opts ...ProducerOption,
) (*Producer, error)

NewProducer is a producer that publishes messages to kafka topic using avro serialization format

func (*Producer) Produce

func (ap *Producer) Produce(key interface{}, value interface{}, deliveryChan chan kafka.Event) error

type ProducerOption

type ProducerOption interface {
	// contains filtered or unexported methods
}

func WithBackoff

func WithBackoff(backOff backoff.BackOff) ProducerOption

func WithKafkaProducer

func WithKafkaProducer(producer KafkaProducer) ProducerOption

type SchemaRegistryClient

type SchemaRegistryClient interface {
	GetSchemaByID(id int) (avro.Schema, error)
	RegisterNewSchema(subject string, schema avro.Schema) (int, error)
}

type SharedOption

type SharedOption interface {
	ConsumerOption
	ProducerOption
}

func WithAvroAPI

func WithAvroAPI(api avro.API) SharedOption

func WithKafkaConfig

func WithKafkaConfig(cfg *kafka.ConfigMap) SharedOption

func WithSchemaRegistryClient

func WithSchemaRegistryClient(srClient SchemaRegistryClient) SharedOption

func WithSchemaRegistryURL

func WithSchemaRegistryURL(url *url.URL) SharedOption

type ValueFactory

type ValueFactory func(topic string) interface{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳