Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsFatalErr ¶ added in v0.2.2
func NewFatalErr ¶ added in v0.2.2
NewFatalErr wrap original err to create FatalErr
Types ¶
type BatchHandler ¶
type BatchHandler interface { Handler // Batch processing the results returned by Handler.Handle. Batch(ctx context.Context, data []interface{}) error }
BatchHandler one more Batch method than Handler.
type FatalErr ¶ added in v0.2.2
type FatalErr interface {
Fatal() bool
}
FatalErr raise this error to exist processor
type HandleFunc ¶
HandleFunc type for Handler.Handle Func.
type Handler ¶
type Handler interface { // Info set the topic name and some config. Info() *Info // Handle for *kafka.Message. Handle(ctx context.Context, msg *kafka.Message) (interface{}, error) }
Handler only include Info and Handle func.
type Info ¶
type Info struct { // used to get reader from otkafka.ReaderMaker. // default: "default" Name string // reader workers count. // default: 1 ReadWorker int // batch workers count. // default: 1 BatchWorker int // data size for batch processing. // default: 1 BatchSize int // handler workers count. HandleWorker int // the size of the data channel. // default: 100 ChanSize int // run the batchFunc automatically at specified intervals, avoid not executing without reaching BatchSize // default: 30s AutoBatchInterval time.Duration }
Info the info of BatchHandler.
Note:
If sequence is necessary, make sure that per worker count is one. Multiple goroutines cannot guarantee the order in which data is processed.
type Out ¶
Out to provide Handler to in.Handlers.
func NewOut ¶
NewOut create Out to provide Handler to in.Handlers.
Usage: func newHandlerA(logger log.Logger) processor.Out { return processor.NewOut( &HandlerA{logger: logger}, ) } Or func newHandlers(logger log.Logger) processor.Out { return processor.NewOut( &HandlerA{logger: logger}, &HandlerB{logger: logger}, ) }
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor dispatch Handler.
func (*Processor) ProvideRunGroup ¶
ProvideRunGroup run workers:
- Fetch message from *kafka.Reader.
- Handle message by Handler.Handle.
- Batch data by BatchHandler.Batch. If batch success then commit message.
Click to show internal directories.
Click to hide internal directories.