Commit 68353ca2 authored by Kirill Smelkov's avatar Kirill Smelkov

X Move WorkGroup to xcommon/xsync

Not yet to go123/xsync as maybe placing exceptions somewhere besides
tests is not so good - I will keep on thinking.
parent 5c4b5fea
...@@ -26,35 +26,12 @@ import ( ...@@ -26,35 +26,12 @@ import (
"testing" "testing"
"time" "time"
"golang.org/x/sync/errgroup" "../xcommon/xsync"
"lab.nexedi.com/kirr/go123/exc" "lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
) )
// XXX move me out of here ?
type workGroup struct {
*errgroup.Group
}
// like errgroup.Go but translates exceptions to errors
func (wg *workGroup) Gox(xf func ()) {
wg.Go(func() error {
return exc.Runx(xf)
})
}
func WorkGroup() *workGroup {
return &workGroup{&errgroup.Group{}}
}
func WorkGroupCtx(ctx context.Context) (*workGroup, context.Context) {
g, ctx := errgroup.WithContext(ctx)
return &workGroup{g}, ctx
}
////////////////////////////////////////
func xclose(c io.Closer) { func xclose(c io.Closer) {
err := c.Close() err := c.Close()
exc.Raiseif(err) exc.Raiseif(err)
...@@ -176,20 +153,13 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) { ...@@ -176,20 +153,13 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) {
// XXX temp for cluster_test.go // XXX temp for cluster_test.go
var NodeLinkPipe = nodeLinkPipe var NodeLinkPipe = nodeLinkPipe
func gox(wg *errgroup.Group, f func()) {
wg.Go(exc.Runx(f))
}
func TestNodeLink(t *testing.T) { func TestNodeLink(t *testing.T) {
// TODO catch exception -> add proper location from it -> t.Fatal (see git-backup) // TODO catch exception -> add proper location from it -> t.Fatal (see git-backup)
// Close vs recvPkt // Close vs recvPkt
nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
//wg := WorkGroup() wg := &xsync.WorkGroup{}
//wg.Gox(func() { wg.Gox(func() {
wg := &errgroup.Group{}
wg := &xsync.ErrGroup{}
gox(wg, func() {
tdelay() tdelay()
xclose(nl1) xclose(nl1)
}) })
...@@ -202,7 +172,7 @@ func TestNodeLink(t *testing.T) { ...@@ -202,7 +172,7 @@ func TestNodeLink(t *testing.T) {
// Close vs sendPkt // Close vs sendPkt
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = WorkGroup() wg = &xsync.WorkGroup{}
wg.Gox(func() { wg.Gox(func() {
tdelay() tdelay()
xclose(nl1) xclose(nl1)
...@@ -217,7 +187,7 @@ func TestNodeLink(t *testing.T) { ...@@ -217,7 +187,7 @@ func TestNodeLink(t *testing.T) {
// Close vs Accept // Close vs Accept
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = WorkGroup() wg = &xsync.WorkGroup{}
wg.Gox(func() { wg.Gox(func() {
tdelay() tdelay()
xclose(nl2) xclose(nl2)
...@@ -236,7 +206,7 @@ func TestNodeLink(t *testing.T) { ...@@ -236,7 +206,7 @@ func TestNodeLink(t *testing.T) {
// Close vs recvPkt on another side // Close vs recvPkt on another side
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = WorkGroup() wg = &xsync.WorkGroup{}
wg.Gox(func() { wg.Gox(func() {
tdelay() tdelay()
xclose(nl2) xclose(nl2)
...@@ -250,7 +220,7 @@ func TestNodeLink(t *testing.T) { ...@@ -250,7 +220,7 @@ func TestNodeLink(t *testing.T) {
// Close vs sendPkt on another side // Close vs sendPkt on another side
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = WorkGroup() wg = &xsync.WorkGroup{}
wg.Gox(func() { wg.Gox(func() {
tdelay() tdelay()
xclose(nl2) xclose(nl2)
...@@ -266,7 +236,7 @@ func TestNodeLink(t *testing.T) { ...@@ -266,7 +236,7 @@ func TestNodeLink(t *testing.T) {
// raw exchange // raw exchange
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg, ctx := WorkGroupCtx(context.Background()) wg, ctx := xsync.WorkGroupCtx(context.Background())
wg.Gox(func() { wg.Gox(func() {
// send ping; wait for pong // send ping; wait for pong
pkt := _mkpkt(1, 2, []byte("ping")) pkt := _mkpkt(1, 2, []byte("ping"))
...@@ -283,7 +253,7 @@ func TestNodeLink(t *testing.T) { ...@@ -283,7 +253,7 @@ func TestNodeLink(t *testing.T) {
}) })
// close nodelinks either when checks are done, or upon first error // close nodelinks either when checks are done, or upon first error
wgclose := WorkGroup() wgclose := &xsync.WorkGroup{}
wgclose.Gox(func() { wgclose.Gox(func() {
<-ctx.Done() <-ctx.Done()
xclose(nl1) xclose(nl1)
...@@ -299,7 +269,7 @@ func TestNodeLink(t *testing.T) { ...@@ -299,7 +269,7 @@ func TestNodeLink(t *testing.T) {
// Close vs recvPkt // Close vs recvPkt
nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
c = xnewconn(nl1) c = xnewconn(nl1)
wg = WorkGroup() wg = &xsync.WorkGroup{}
wg.Gox(func() { wg.Gox(func() {
tdelay() tdelay()
xclose(c) xclose(c)
...@@ -315,7 +285,7 @@ func TestNodeLink(t *testing.T) { ...@@ -315,7 +285,7 @@ func TestNodeLink(t *testing.T) {
// Close vs sendPkt // Close vs sendPkt
nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
c = xnewconn(nl1) c = xnewconn(nl1)
wg = WorkGroup() wg = &xsync.WorkGroup{}
wg.Gox(func() { wg.Gox(func() {
tdelay() tdelay()
xclose(c) xclose(c)
...@@ -330,7 +300,7 @@ func TestNodeLink(t *testing.T) { ...@@ -330,7 +300,7 @@ func TestNodeLink(t *testing.T) {
// NodeLink.Close vs Conn.sendPkt/recvPkt // NodeLink.Close vs Conn.sendPkt/recvPkt
c11 := xnewconn(nl1) c11 := xnewconn(nl1)
c12 := xnewconn(nl1) c12 := xnewconn(nl1)
wg = WorkGroup() wg = &xsync.WorkGroup{}
wg.Gox(func() { wg.Gox(func() {
pkt, err := c11.recvPkt() pkt, err := c11.recvPkt()
if !(pkt == nil && xconnError(err) == ErrLinkClosed) { if !(pkt == nil && xconnError(err) == ErrLinkClosed) {
...@@ -356,7 +326,7 @@ func TestNodeLink(t *testing.T) { ...@@ -356,7 +326,7 @@ func TestNodeLink(t *testing.T) {
c21 := xnewconn(nl2) c21 := xnewconn(nl2)
c22 := xnewconn(nl2) c22 := xnewconn(nl2)
c23 := xnewconn(nl2) c23 := xnewconn(nl2)
wg = WorkGroup() wg = &xsync.WorkGroup{}
var errRecv error var errRecv error
wg.Gox(func() { wg.Gox(func() {
pkt, err := c21.recvPkt() pkt, err := c21.recvPkt()
...@@ -469,7 +439,7 @@ func TestNodeLink(t *testing.T) { ...@@ -469,7 +439,7 @@ func TestNodeLink(t *testing.T) {
// Conn accept + exchange // Conn accept + exchange
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
wg = WorkGroup() wg = &xsync.WorkGroup{}
wg.Gox(func() { wg.Gox(func() {
c := xaccept(nl2) c := xaccept(nl2)
...@@ -501,7 +471,7 @@ func TestNodeLink(t *testing.T) { ...@@ -501,7 +471,7 @@ func TestNodeLink(t *testing.T) {
// test 2 channels with replies coming in reversed time order // test 2 channels with replies coming in reversed time order
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
wg = WorkGroup() wg = &xsync.WorkGroup{}
replyOrder := map[uint16]struct { // "order" in which to process requests replyOrder := map[uint16]struct { // "order" in which to process requests
start chan struct{} // processing starts when start chan is ready start chan struct{} // processing starts when start chan is ready
next uint16 // after processing this switch to next next uint16 // after processing this switch to next
...@@ -559,7 +529,7 @@ func TestHandshake(t *testing.T) { ...@@ -559,7 +529,7 @@ func TestHandshake(t *testing.T) {
bg := context.Background() bg := context.Background()
// handshake ok // handshake ok
p1, p2 := net.Pipe() p1, p2 := net.Pipe()
wg := WorkGroup() wg := &xsync.WorkGroup{}
wg.Gox(func() { wg.Gox(func() {
xhandshake(bg, p1, 1) xhandshake(bg, p1, 1)
}) })
...@@ -573,7 +543,7 @@ func TestHandshake(t *testing.T) { ...@@ -573,7 +543,7 @@ func TestHandshake(t *testing.T) {
// version mismatch // version mismatch
p1, p2 = net.Pipe() p1, p2 = net.Pipe()
var err1, err2 error var err1, err2 error
wg = WorkGroup() wg = &xsync.WorkGroup{}
wg.Gox(func() { wg.Gox(func() {
err1 = handshake(bg, p1, 1) err1 = handshake(bg, p1, 1)
}) })
...@@ -597,7 +567,7 @@ func TestHandshake(t *testing.T) { ...@@ -597,7 +567,7 @@ func TestHandshake(t *testing.T) {
// tx & rx problem // tx & rx problem
p1, p2 = net.Pipe() p1, p2 = net.Pipe()
err1, err2 = nil, nil err1, err2 = nil, nil
wg = WorkGroup() wg = &xsync.WorkGroup{}
wg.Gox(func() { wg.Gox(func() {
err1 = handshake(bg, p1, 1) err1 = handshake(bg, p1, 1)
}) })
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Open Source Initiative approved licenses and Convey
// the resulting work. Corresponding source of such a combination shall include
// the source code for all other software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// Package xsync provides addons to packages "sync" and "golang.org/x/sync"
package xsync
import (
"context"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/exc"
)
// WorkGroup is like x/sync/errgroup.Group but also supports exceptions
type WorkGroup struct {
errgroup.Group
}
// Gox calls the given function in a new goroutine and handles exceptions
//
// it translates exception raised, if any, to as if it was regular error
// returned for a function under Go call.
//
// see errgroup.Group.Go documentation for details on how error from spawned
// goroutines are handled group-wise.
func (g *WorkGroup) Gox(xf func()) {
g.Go(func() error {
return exc.Runx(xf)
})
}
// WorkGroupCtx returns new WorkGroup and associated context derived from ctx
// see errgroup.WithContext for semantic description and details.
func WorkGroupCtx(ctx context.Context) (*WorkGroup, context.Context) {
g, ctx := errgroup.WithContext(ctx)
return &WorkGroup{*g}, ctx
}
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Open Source Initiative approved licenses and Convey
// the resulting work. Corresponding source of such a combination shall include
// the source code for all other software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
package xsync
import (
"context"
"testing"
"lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/my"
)
func TestWorkGroup(t *testing.T) {
g := WorkGroup{}
g.Gox(func() {
exc.Raise(1)
})
err := g.Wait()
e, ok := err.(*exc.Error)
want := my.FuncName() + ".func1: 1"
if !(ok && e.Error() == want) {
t.Fatalf("gox:\nhave: %v\nwant: %v", err, want)
}
g2, ctx := WorkGroupCtx(context.Background())
g2.Gox(func() {
exc.Raise(2)
})
<-ctx.Done()
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment