consumer

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Code generated by options-gen. DO NOT EDIT.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AlwaysFailDecoder added in v0.3.0

type AlwaysFailDecoder struct{}

AlwaysFailDecoder will always return error. Useful for testing and HeaderDependantDecoder.DefaultDecoder.

func (*AlwaysFailDecoder) Decode added in v0.3.0

func (d *AlwaysFailDecoder) Decode(_ []Header, _ []byte, _ any) error

type Consumer

type Consumer[T any] struct {
	// contains filtered or unexported fields
}

func New

func New[T any](opts Options) (*Consumer[T], error)

func (*Consumer[T]) C

func (c *Consumer[T]) C() <-chan Msg[T]

C return a chan of messages from kafka.

func (*Consumer[T]) Run

func (c *Consumer[T]) Run(ctx context.Context)
type Header = kafka.Header

type HeaderDependantDecoder added in v0.3.0

type HeaderDependantDecoder struct {
	// Name of header that we will check.
	HeaderName string
	// Header value to Decoder mapping.
	Decoders map[string]IDecoder
	// Will be used in case if message have no headers, or we have no corresponding mapping in Decoders map.
	DefaultDecoder IDecoder
}

HeaderDependantDecoder allows you to specify which decoder will be used to decode each message.

func (*HeaderDependantDecoder) Decode added in v0.3.0

func (d *HeaderDependantDecoder) Decode(headers []Header, src []byte, res any) error

type IDecoder added in v0.2.0

type IDecoder interface {
	Decode(headers []Header, value []byte, dst any) error
}

type ILogger

type ILogger interface {
	WarnContext(ctx context.Context, msg string, attrs ...any)
	ErrorContext(ctx context.Context, msg string, attrs ...any)
}

type JSONDecoder added in v0.2.0

type JSONDecoder struct{}

func (*JSONDecoder) Decode added in v0.2.0

func (d *JSONDecoder) Decode(_ []Header, src []byte, res any) error

type Msg

type Msg[T any] struct {
	Partition     int
	Offset        int64
	HighWaterMark int64
	Key           []byte
	Value         T
	Headers       []Header
	Time          time.Time
}

type OptOptionsSetter

type OptOptionsSetter func(o *Options)

func WithBrokers

func WithBrokers(opt []string) OptOptionsSetter

func WithCommitInterval

func WithCommitInterval(opt time.Duration) OptOptionsSetter

0 - means sync mode.

func WithConsGroup

func WithConsGroup(opt string) OptOptionsSetter

func WithDecoder added in v0.3.0

func WithDecoder(opt IDecoder) OptOptionsSetter

func WithLogger

func WithLogger(opt ILogger) OptOptionsSetter

func WithMaxBytes

func WithMaxBytes(opt int) OptOptionsSetter

func WithMaxWait

func WithMaxWait(opt time.Duration) OptOptionsSetter

func WithMinBytes

func WithMinBytes(opt int) OptOptionsSetter

func WithName

func WithName(opt string) OptOptionsSetter

func WithRetentionTime

func WithRetentionTime(opt time.Duration) OptOptionsSetter

consumer group retention time. consumer group will be keep in kafka for this period.

func WithTlsConfig

func WithTlsConfig(opt *tls.Config) OptOptionsSetter

func WithTopic

func WithTopic(opt string) OptOptionsSetter

type Options

type Options struct {
	// contains filtered or unexported fields
}

func NewOptions

func NewOptions(
	options ...OptOptionsSetter,
) Options

func (*Options) Validate

func (o *Options) Validate() error

type ProtoJSONDecoder added in v0.2.0

type ProtoJSONDecoder struct{}

func (*ProtoJSONDecoder) Decode added in v0.2.0

func (d *ProtoJSONDecoder) Decode(_ []Header, src []byte, res any) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL
JackTT - Gopher 🇻🇳