Documentation
¶
Overview ¶
Package batching provides mechanisms for batching writes of various types. A batcher's methods should be invoked from a single goroutine. It is the responsibility of the caller to invoke Flush on the batcher frequently to flush the current batch out to the writer.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
Batcher will accept messages and invoke the Writer when the batch requirements have been fulfilled (either batch size or interval have been exceeded). Batcher should be created with NewBatcher().
func NewBatcher ¶
NewBatcher creates a new Batcher. It is recommenended to use a wrapper type such as NewByteBatcher or NewV2EnvelopeBatcher vs using this directly.
func (*Batcher) Flush ¶
func (b *Batcher) Flush()
Flush will write a partial batch if there is data and the interval has lapsed. Otherwise it is a NOP. This method should be called freqently to make sure batches do not stick around for long periods of time. As a result it would be a bad idea to call Flush after an operation that might block for an un-specified amount of time. NOTE: Flush is *not* thread safe and should be called by the same goroutine that calls Write.
func (*Batcher) ForcedFlush ¶
func (b *Batcher) ForcedFlush()
ForcedFlush bypasses the batch interval and batch size checks and writes immediately.
func (*Batcher) Write ¶
func (b *Batcher) Write(data interface{})
Write stores data to the batch. It will not submit the batch to the writer until either the batch has been filled, or the interval has lapsed. NOTE: Write is *not* thread safe and should be called by the same goroutine that calls Flush.
type ByteBatcher ¶
type ByteBatcher struct {
*Batcher
}
ByteBatcher batches slices of bytes.
Example ¶
writer := batching.ByteWriterFunc(func(batch [][]byte) { for _, data := range batch { fmt.Printf("%s\n", data) } }) batcher := batching.NewByteBatcher(100, time.Nanosecond, writer) dataSource := make(chan []byte) done := make(chan struct{}) go func() { defer close(done) for i := 0; i < 3; i++ { dataSource <- []byte(fmt.Sprintf("data %d", i)) } }() for { // Do a non-blocking read from a data source. select { case data := <-dataSource: // If read succeeds write it out. This will flush if the batch // exceeds the batch size. batcher.Write(data) case <-done: return default: // If read fails make sure to call Flush to ensure data doesn't // get stuck in the batch for long periods of time. batcher.Flush() } }
Output: data 0 data 1 data 2
func NewByteBatcher ¶
func NewByteBatcher(size int, interval time.Duration, writer ByteWriter) *ByteBatcher
NewByteBatcher creates a new ByteBatcher.
func (*ByteBatcher) Write ¶
func (b *ByteBatcher) Write(data []byte)
Write stores data to the batch. It will not submit the batch to the writer until either the batch has been filled, or the interval has lapsed. NOTE: Write is *not* thread safe and should be called by the same goroutine that calls Flush.
type ByteWriter ¶
type ByteWriter interface { // Write submits the batch. Write(batch [][]byte) }
ByteWriter is used to submit the completed batch of slices of bytes. The batch may be partial if the interval lapsed instead of filling the batch.
type ByteWriterFunc ¶
type ByteWriterFunc func(batch [][]byte)
ByteWriterFunc is an adapter to allow ordinary functions to be a ByteWriter.
func (ByteWriterFunc) Write ¶
func (f ByteWriterFunc) Write(batch [][]byte)
Write implements ByteWriter.
type Writer ¶
type Writer interface {
// Write submits the batch.
Write(batch []interface{})
}
Writer is used to submit the completed batch. The batch may be partial if the interval lapsed instead of filling the batch.
type WriterFunc ¶
type WriterFunc func(batch []interface{})
WriterFunc is an adapter to allow ordinary functions to be a Writer.