Documentation
¶
Index ¶
- func DecodeRecordFromNative(src interface{}, dst interface{}) error
- type CachedSchemaRegistryClient
- func (cached *CachedSchemaRegistryClient) DeleteSubject(subject string) (versions []int, err error)
- func (cached *CachedSchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)
- func (cached *CachedSchemaRegistryClient) GetSchemaByID(id int) (*goavro.Codec, error)
- func (cached *CachedSchemaRegistryClient) GetSchemaBySubject(subject string, version int) (*goavro.Codec, error)
- func (cached *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (bool, schemaregistry.Schema, error)
- func (cached *CachedSchemaRegistryClient) RegisterNewSchema(subject string, codec *goavro.Codec) (int, error)
- func (cached *CachedSchemaRegistryClient) Subjects() ([]string, error)
- func (cached *CachedSchemaRegistryClient) Versions(subject string) ([]int, error)
- type Consumer
- func (ac *Consumer) Close()
- func (ac *Consumer) CommitMessage(msg ConsumerMessage) ([]kafka.TopicPartition, error)
- func (ac *Consumer) EnsureTopics() error
- func (ac *Consumer) Messages(stopChan chan struct{}) (chan ConsumerMessage, chan kafka.Event)
- func (ac *Consumer) SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) error
- type ConsumerMessage
- type NativeDecoder
- type OptionalDay
- type OptionalInt
- type OptionalString
- type Producer
- type ProducerConfig
- type SchemaRegistryClient
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DecodeRecordFromNative ¶ added in v0.10.0
func DecodeRecordFromNative(src interface{}, dst interface{}) error
Types ¶
type CachedSchemaRegistryClient ¶
type CachedSchemaRegistryClient struct { SchemaRegistryClient *schemaregistry.Client // contains filtered or unexported fields }
CachedSchemaRegistryClient is a schema registry client that will cache some data to improve performance
func NewCachedSchemaRegistryClient ¶
func NewCachedSchemaRegistryClient(baseURL string, options ...schemaregistry.Option) (*CachedSchemaRegistryClient, error)
func (*CachedSchemaRegistryClient) DeleteSubject ¶
func (cached *CachedSchemaRegistryClient) DeleteSubject(subject string) (versions []int, err error)
DeleteSubject deletes the subject, should only be used in development
func (*CachedSchemaRegistryClient) GetLatestSchema ¶
func (cached *CachedSchemaRegistryClient) GetLatestSchema(subject string) (*goavro.Codec, error)
GetLatestSchema returns the highest version schema for a subject
func (*CachedSchemaRegistryClient) GetSchemaByID ¶
func (cached *CachedSchemaRegistryClient) GetSchemaByID(id int) (*goavro.Codec, error)
GetSchemaByID will return and cache the codec with the given id
func (*CachedSchemaRegistryClient) GetSchemaBySubject ¶
func (cached *CachedSchemaRegistryClient) GetSchemaBySubject(subject string, version int) (*goavro.Codec, error)
GetSchemaBySubject returns the codec for a specific version of a subject
func (*CachedSchemaRegistryClient) IsSchemaRegistered ¶
func (cached *CachedSchemaRegistryClient) IsSchemaRegistered(subject string, codec *goavro.Codec) (bool, schemaregistry.Schema, error)
IsSchemaRegistered checks if a specific codec is already registered to a subject
func (*CachedSchemaRegistryClient) RegisterNewSchema ¶
func (cached *CachedSchemaRegistryClient) RegisterNewSchema(subject string, codec *goavro.Codec) (int, error)
RegisterNewSchema will return and cache the id with the given codec
func (*CachedSchemaRegistryClient) Subjects ¶
func (cached *CachedSchemaRegistryClient) Subjects() ([]string, error)
Subjects returns a list of subjects
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer(topics []string, consumer *kafka.Consumer, schemaRegistryClient SchemaRegistryClient) (*Consumer, error)
NewConsumer is a basic consumer to interact with schema registry, avro and kafka
Example ¶
package main import ( "log" "github.com/confluentinc/confluent-kafka-go/kafka" kafkaavro "github.com/mycujoo/go-kafka-avro" ) func main() { kafkaConsumer, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:29092", "security.protocol": "ssl", "socket.keepalive.enable": true, "enable.auto.commit": false, "ssl.key.location": "/path/to/service.key", "ssl.certificate.location": "/path/to/service.cert", "ssl.ca.location": "/path/to/ca.pem", "group.id": "some-group-id", "session.timeout.ms": 6000, "default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"}, }) if err != nil { log.Fatal(err) } cachedSchemaRegistry, err := kafkaavro.NewCachedSchemaRegistryClient("http://localhost:8081") if err != nil { log.Fatal(err) } kafkaavro.NewConsumer([]string{"topic1"}, kafkaConsumer, cachedSchemaRegistry) }
Output:
func (*Consumer) CommitMessage ¶
func (ac *Consumer) CommitMessage(msg ConsumerMessage) ([]kafka.TopicPartition, error)
func (*Consumer) EnsureTopics ¶ added in v0.11.0
EnsureTopics returns error if one of the consumed topics was not found on the server.
func (*Consumer) Messages ¶
func (ac *Consumer) Messages(stopChan chan struct{}) (chan ConsumerMessage, chan kafka.Event)
Messages returns the ConsumerMessage channel (that contains decoded messages) and other events channel for events like kafka.PartitionEOF, kafka.Stats
func (*Consumer) SubscribeTopics ¶ added in v0.12.0
func (ac *Consumer) SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) error
type ConsumerMessage ¶
type NativeDecoder ¶ added in v0.10.0
type NativeDecoder interface {
FromNative(interface{}) error
}
type OptionalDay ¶ added in v0.10.0
func NewOptionalDay ¶ added in v0.10.0
func NewOptionalDay(t time.Time, valid bool) OptionalDay
func (*OptionalDay) FromNative ¶ added in v0.10.0
func (od *OptionalDay) FromNative(data interface{}) error
func (OptionalDay) MarshalJSON ¶ added in v0.10.0
func (od OptionalDay) MarshalJSON() ([]byte, error)
type OptionalInt ¶ added in v0.10.0
type OptionalInt struct {
null.Int
}
func NewOptionalInt ¶ added in v0.10.0
func NewOptionalInt(i int64, valid bool) OptionalInt
func (*OptionalInt) FromNative ¶ added in v0.10.0
func (i *OptionalInt) FromNative(data interface{}) error
type OptionalString ¶ added in v0.10.0
type OptionalString struct {
null.String
}
func NewOptionalString ¶ added in v0.10.0
func NewOptionalString(s string, valid bool) OptionalString
func (*OptionalString) FromNative ¶ added in v0.10.0
func (s *OptionalString) FromNative(data interface{}) error
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func NewProducer(cfg ProducerConfig) (*Producer, error)
NewProducer is a producer that publishes messages to kafka topic using avro serialization format
type ProducerConfig ¶ added in v1.2.0
type ProducerConfig struct { // Name of the topic where messages will be produced TopicName string // Avro schema for message key KeySchema string // Avro schema for message value ValueSchema string // Low level kafka producer used to produce messages Producer kafkaProducer // Schema registry client used for messages validation and schema management SchemaRegistryClient SchemaRegistryClient // BackOffConfig is used for setting backoff strategy for retry logic BackOffConfig backoff.BackOff }