Documentation
¶
Overview ¶
Package errgroup simplifies common patterns of goroutine use, in particular making it straightforward to reliably wait on parallel or pipelined goroutines, exiting either when the first error is encountered or waiting for all goroutines to finish regardless of error outcome. Contexts are used to control cancelation. It is modeled on golang.org/x/sync/errgroup and other similar packages. It makes use of cloudeng.io/errors to simplify collecting multiple errors.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type T ¶
type T struct {
// contains filtered or unexported fields
}
T represents a set of goroutines working on some common coordinated sets of tasks.
T may be instantiated directly, in which case, all go routines will run to completion and all errors will be collected and made available vie the Errors field and the return value of Wait. Alternatively WithContext can be used to create Group with an embedded cancel function that will be called once either when the first error occurs or when Wait is called. WithCancel behaves like WithContext but allows both the context and cancel function to be supplied which is required for working with context.WithDeadline and context.WithTimeout.
Example ¶
package main import ( "fmt" "sort" "strings" "cloudeng.io/errors" "cloudeng.io/sync/errgroup" ) func main() { // Wait for all goroutines to finish and catalogue all of their // errors. var g errgroup.T msg := []string{"a", "b", "c"} for _, m := range msg { m := m g.Go(func() error { return errors.New(m) }) } err := g.Wait() if err == nil { fmt.Print("no errors - that's an error") } // Sort the error messages for stable output. out := strings.Split(err.Error(), "\n") sort.Strings(out) fmt.Println(strings.Join(out, "\n")) }
Output: --- 1 of 3 errors --- 2 of 3 errors --- 3 of 3 errors a b c
Example (Parallel) ¶
package main import ( "fmt" "sort" "strings" "cloudeng.io/sync/errgroup" ) func main() { // Execute a set of gourtines in parallel. var g errgroup.T msg := []string{"a", "b", "c"} out := make([]string, len(msg)) for i, m := range msg { i, m := i, m g.Go(func() error { out[i] = m return nil }) } if err := g.Wait(); err != nil { fmt.Printf("failed: %v", err) } // Sort the error messages for stable output. sort.Strings(out) fmt.Println(strings.Join(out, "\n")) }
Output: a b c
Example (Pipeline) ¶
package main import ( "context" "fmt" "math/rand" "sync" "sync/atomic" "time" "cloudeng.io/errors" "cloudeng.io/sync/errgroup" ) func main() { // A pipeline to generate random numbers and measure the uniformity of // their distribution. The pipeline runs for 2 seconds. // The use of errgroup.T ensures that on return all of the goroutines // have completed and the channels used are closed. ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) g := errgroup.WithCancel(cancel) numGenerators, numCounters := 4, 8 numCh := make(chan int64) src := rand.New(rand.NewSource(1234)) var srcMu sync.Mutex // numGenerators goroutines produce random numbers in the range of 0..99. for i := 0; i < numGenerators; i++ { g.Go(func() error { for { srcMu.Lock() n := src.Int63n(100) srcMu.Unlock() select { case numCh <- n: case <-ctx.Done(): err := ctx.Err() if errors.Is(err, context.DeadlineExceeded) { return nil } return err default: break } } }) } counters := make([]int64, 10) var total int64 // numCounters consume the random numbers and count which decile // each one falls into. for i := 0; i < numCounters; i++ { g.Go(func() error { for { select { case num := <-numCh: atomic.AddInt64(&counters[num%10], 1) atomic.AddInt64(&total, 1) case <-ctx.Done(): err := ctx.Err() if errors.Is(err, context.DeadlineExceeded) { return nil } return err } } }) } go func() { if err := g.Wait(); err != nil { panic(err) } close(numCh) }() if err := g.Wait(); err != nil { fmt.Printf("failed: %v", err) } // After some time, measure the normalized number of random numbers // per decile with appropriate rounding. Print the distribution // to verify the expected values. for i, v := range counters { ratio := total / v if ratio >= 8 || ratio <= 12 { // 8..12 is close enough to an even distribution so round // it up to 10. ratio = 10 } fmt.Printf("%v: %v\n", i, ratio) } }
Output: 0: 10 1: 10 2: 10 3: 10 4: 10 5: 10 6: 10 7: 10 8: 10 9: 10
func WithCancel ¶
func WithCancel(cancel func()) *T
WithCancel returns a new T that will call the supplied cancel function once on either a first non-nil error being returned or when Wait is called.
Example ¶
package main import ( "context" "fmt" "sort" "strings" "time" "cloudeng.io/sync/errgroup" ) func main() { // Exit all goroutines when a deadline has passed. ctx, cancel := context.WithDeadline(context.Background(), time.Now()) g := errgroup.WithCancel(cancel) var msg = []string{"a", "b", "c"} for _, m := range msg { m := m g.Go(func() error { ctx.Done() // deadline is already past. return fmt.Errorf("%v: %w", m, ctx.Err()) }) } err := g.Wait() if err == nil { fmt.Print("no errors - that's an error") } // Sort the error messages for stable output. out := strings.Split(err.Error(), "\n") sort.Strings(out) fmt.Println(strings.Join(out, "\n")) }
Output: --- 1 of 3 errors --- 2 of 3 errors --- 3 of 3 errors a: context deadline exceeded b: context deadline exceeded c: context deadline exceeded
func WithConcurrency ¶ added in v0.0.5
WithConcurrency returns a new Group that will limit the number of goroutines to n. Note that the Go method will block when this limit is reached. A value of 0 for n implies no limit on the number of goroutines to use.
func WithContext ¶
WithContext returns a new Group that will call the cancel function derived from the supplied context once on either a first non-nil error being returned by a goroutine or when Wait is called.
Example ¶
package main import ( "context" "fmt" "sort" "strings" "cloudeng.io/errors" "cloudeng.io/sync/errgroup" ) func main() { // Terminate all remaining goroutines after a single error is encountered. g, ctx := errgroup.WithContext(context.Background()) var msg = []string{"a", "b", "c"} for i, m := range msg { i, m := i, m g.Go(func() error { if i == 1 { return errors.New("first") } <-ctx.Done() return fmt.Errorf("%v: %w", m, ctx.Err()) }) } err := g.Wait() if err == nil { fmt.Print("no errors - that's an error") } // Sort the error messages for stable output. out := strings.Split(err.Error(), "\n") sort.Strings(out) fmt.Println(strings.Join(out, "\n")) }
Output: --- 1 of 3 errors --- 2 of 3 errors --- 3 of 3 errors a: context canceled c: context canceled first
func (*T) Go ¶
Go runs the supplied function from a goroutine. If this group was created using WithLimit then Go will block until a goroutine is available.