Commit 515a6d14 authored by Kirill Smelkov's avatar Kirill Smelkov

xsync: New package with xsync.WorkGroup

Add xsync.WorkGroup that is similar to https://godoc.org/golang.org/x/sync/errgroup,
but amends it with a bit better (imho) design where work context is
explicitly passed into worker (see also [1]). This mirrors sync.WorkGroup that we
already have at Pygolang side [2].

[1] https://github.com/golang/go/issues/34510
[2] https://pypi.org/project/pygolang/#concurrency
parent 37584a0e
// Copyright (C) 2019-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package xsync complements standard package sync.
//
// - `WorkGroup` allows to spawn group of goroutines working on a common task.
//
// Functionality provided by xsync package is also provided by Pygolang in its
// standard package sync.
package xsync
import (
"context"
"sync"
)
// WorkGroup represents group of goroutines working on a common task.
//
// Use .Go() to spawn goroutines, and .Wait() to wait for all of them to
// complete, for example:
//
// wg := xsync.NewWorkGroup(ctx)
// wg.Go(f1)
// wg.Go(f2)
// err := wg.Wait()
//
// Every spawned function accepts context related to the whole work and derived
// from ctx used to initialize WorkGroup, for example:
//
// func f1(ctx context.Context) error {
// ...
// }
//
// Whenever a function returns error, the work context is canceled indicating
// to other spawned goroutines that they have to cancel their work. .Wait()
// waits for all spawned goroutines to complete and returns error, if any, from
// the first failed subtask.
//
// NOTE if spawned function panics, the panic is currently _not_ propagated to .Wait().
//
// WorkGroup is modelled after https://godoc.org/golang.org/x/sync/errgroup but
// is not equal to it.
type WorkGroup struct {
ctx context.Context // workers are spawned under ctx
cancel func() // aborts ctx
waitg sync.WaitGroup // wait group for workers
mu sync.Mutex
err error // error of the first failed worker
}
// NewWorkGroup creates new WorkGroup working under ctx.
//
// See WorkGroup documentation for details.
func NewWorkGroup(ctx context.Context) *WorkGroup {
g := &WorkGroup{}
g.ctx, g.cancel = context.WithCancel(ctx)
return g
}
// Go spawns new worker under workgroup.
//
// See WorkGroup documentation for details.
func (g *WorkGroup) Go(f func(context.Context) error) {
g.waitg.Add(1)
go func() {
defer g.waitg.Done()
err := f(g.ctx)
if err == nil {
return
}
g.mu.Lock()
defer g.mu.Unlock()
if g.err == nil {
// this goroutine is the first failed task
g.err = err
g.cancel()
}
}()
}
// Wait waits for all spawned workers to complete.
//
// It returns the error, if any, from the first failed worker.
// See WorkGroup documentation for details.
func (g *WorkGroup) Wait() error {
g.waitg.Wait()
g.cancel()
return g.err
}
// Copyright (C) 2019-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package xsync
import (
"context"
"fmt"
"reflect"
"testing"
)
func TestWorkGroup(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_ = cancel
var l []int // l[i] is data state for i'th worker
var wg *WorkGroup
// xwait waits for wg to complete, and asserts on returned error and state of l
xwait := func(eok string, lok ...int) {
t.Helper()
err := wg.Wait()
if eok == "" {
if err != nil {
t.Fatalf("xwait: failed: %q", err)
}
} else {
estr := ""
if err != nil {
estr = err.Error()
}
if estr != eok {
t.Fatalf("xwait: unexpected errror:\nhave: %q\nwant: %q", estr, eok)
}
}
if !reflect.DeepEqual(l, lok) {
t.Fatalf("xwait: unexpected l:\nhave: %v\nwant: %v", l, lok)
}
}
// t1=ok, t2=ok
wg = NewWorkGroup(ctx)
l = []int{0, 0}
for i := 0; i < 2; i++ {
i := i
wg.Go(func(ctx context.Context) error {
l[i] = i+1
return nil
})
}
xwait("", 1, 2)
// t1=fail, t2=ok, does not look at ctx
wg = NewWorkGroup(ctx)
l = []int{0, 0}
for i := 0; i < 2; i++ {
i := i
wg.Go(func(ctx context.Context) error {
l[i] = i+1
if i == 0 {
return fmt.Errorf("aaa")
}
return nil
})
}
xwait("aaa", 1, 2)
// t1=fail, t2=wait cancel, fail
wg = NewWorkGroup(ctx)
l = []int{0, 0}
for i := 0; i < 2; i++ {
i := i
wg.Go(func(ctx context.Context) error {
l[i] = i+1
if i == 0 {
return fmt.Errorf("bbb")
}
if i == 1 {
<-ctx.Done()
return fmt.Errorf("ccc")
}
panic("unreachable")
})
}
xwait("bbb", 1, 2)
// t1=ok,wait cancel t2=ok,wait cancel
// cancel parent
wg = NewWorkGroup(ctx)
l = []int{0, 0}
for i := 0; i < 2; i++ {
i := i
wg.Go(func(ctx context.Context) error {
l[i] = i+1
<-ctx.Done()
return nil
})
}
cancel() // parent cancel - must be propagated into workgroup
xwait("", 1, 2)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment