Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type FanOut ¶
type FanOut[I any] struct { // contains filtered or unexported fields }
FanOut will fan out to `workerCount` total coroutines for processing.
func NewFanOut ¶
Example ¶
package main import ( "fmt" "sort" "github.com/bir/iken/worker" ) func main() { type Request struct { Name string Index int } type Reply struct { Name string Index int Size int } inputs := []Request{{"A", 0}, {"BBBB", 1}, {"CCCCCCCCCCC", 2}} w := worker.NewFanOut[Request](10, 0) go func() { // Call invoke once per input data. for _, i := range inputs { w.Invoke(i) } // Call worker.FanOut.Close when all inputs are loaded. w.Close() }() // Unbuffered reply channel, buffer size is a tunable parameter available to the implementation replies := make(chan Reply) go func() { // Process and close must be executed in a separate go routine, unless the reply channel // is sufficiently buffered. w.Process(func(r Request) { // Do the "work". In this example just get the size of the name. replies <- Reply{ Name: r.Name, Index: r.Index, Size: len(r.Name), } }) // When Process returns, all inputs have been handled. close(replies) }() var out []Reply for r := range replies { out = append(out, r) } // Sort the results in descending size sort.Slice(out, func(i int, j int) bool { return out[i].Size > out[j].Size }) fmt.Println(out) }
Output: [{CCCCCCCCCCC 2 11} {BBBB 1 4} {A 0 1}]
func (*FanOut[I]) Close ¶
func (f *FanOut[I]) Close()
Close closes the input channels. Invoke can not be called again after this call.
func (*FanOut[I]) Invoke ¶
func (f *FanOut[I]) Invoke(input I)
Invoke adds the data to the worker for processing.
func (*FanOut[I]) Process ¶
func (f *FanOut[I]) Process(p ProcessorFunc[I])
Process handles all inputs until the input channel is closed.
type HashFunc ¶
HashFunc converts an input to a uint value. It must be deterministic. See examples below.
type HashedFanOut ¶
type HashedFanOut[I any] struct { // contains filtered or unexported fields }
HashedFanOut is a special purpose fan out that hashes the input so that the same keys are always processed by the same worker. This is used to ensure guard against race conditions in non-reentrant processors.
func NewHashedFanOut ¶
func NewHashedFanOut[I any](workerCount, bufferSize uint, hasher HashFunc[I]) *HashedFanOut[I]
func (*HashedFanOut[I]) Close ¶
func (f *HashedFanOut[I]) Close()
func (*HashedFanOut[I]) Invoke ¶
func (f *HashedFanOut[I]) Invoke(input I)
func (*HashedFanOut[I]) Process ¶
func (f *HashedFanOut[I]) Process(p ProcessorFunc[I])
type ProcessorFunc ¶
type ProcessorFunc[I any] func(I)
Click to show internal directories.
Click to hide internal directories.