Commit d44994d6 authored by Kirill Smelkov's avatar Kirill Smelkov

go/neo/neonet: errgroup -> xsync.WorkGroup

See go123@515a6d14
parent 2de637bb
...@@ -29,10 +29,9 @@ import ( ...@@ -29,10 +29,9 @@ import (
"testing" "testing"
"time" "time"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/exc" "lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/internal/packed" "lab.nexedi.com/kirr/neo/go/internal/packed"
...@@ -76,8 +75,12 @@ func xwait(w interface{ Wait() error }) { ...@@ -76,8 +75,12 @@ func xwait(w interface{ Wait() error }) {
exc.Raiseif(err) exc.Raiseif(err)
} }
func gox(wg interface{ Go(func() error) }, xf func()) { func gox(wg interface { Go(func(context.Context) error) }, xf func(context.Context)) {
wg.Go(exc.Funcx(xf)) wg.Go(func(ctx context.Context) error {
return exc.Funcx(func() {
xf(ctx)
})()
})
} }
// xlinkError verifies that err is *LinkError and returns err.Err . // xlinkError verifies that err is *LinkError and returns err.Err .
...@@ -171,11 +174,12 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) { ...@@ -171,11 +174,12 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) {
func TestNodeLink(t *testing.T) { func TestNodeLink(t *testing.T) {
// TODO catch exception -> add proper location from it -> t.Fatal (see git-backup) // TODO catch exception -> add proper location from it -> t.Fatal (see git-backup)
bg := context.Background()
// Close vs recvPkt // Close vs recvPkt
nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg := &errgroup.Group{} wg := xsync.NewWorkGroup(bg)
gox(wg, func() { gox(wg, func(_ context.Context) {
tdelay() tdelay()
xclose(nl1) xclose(nl1)
}) })
...@@ -188,8 +192,8 @@ func TestNodeLink(t *testing.T) { ...@@ -188,8 +192,8 @@ func TestNodeLink(t *testing.T) {
// Close vs sendPkt // Close vs sendPkt
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &errgroup.Group{} wg = xsync.NewWorkGroup(bg)
gox(wg, func() { gox(wg, func(_ context.Context) {
tdelay() tdelay()
xclose(nl1) xclose(nl1)
}) })
...@@ -203,8 +207,8 @@ func TestNodeLink(t *testing.T) { ...@@ -203,8 +207,8 @@ func TestNodeLink(t *testing.T) {
// {Close,CloseAccept} vs Accept // {Close,CloseAccept} vs Accept
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &errgroup.Group{} wg = xsync.NewWorkGroup(bg)
gox(wg, func() { gox(wg, func(_ context.Context) {
tdelay() tdelay()
xclose(nl2) xclose(nl2)
}) })
...@@ -212,7 +216,7 @@ func TestNodeLink(t *testing.T) { ...@@ -212,7 +216,7 @@ func TestNodeLink(t *testing.T) {
if !(c == nil && xlinkError(err) == ErrLinkClosed) { if !(c == nil && xlinkError(err) == ErrLinkClosed) {
t.Fatalf("NodeLink.Accept() after close: conn = %v, err = %v", c, err) t.Fatalf("NodeLink.Accept() after close: conn = %v, err = %v", c, err)
} }
gox(wg, func() { gox(wg, func(_ context.Context) {
tdelay() tdelay()
nl1.CloseAccept() nl1.CloseAccept()
}) })
...@@ -231,8 +235,8 @@ func TestNodeLink(t *testing.T) { ...@@ -231,8 +235,8 @@ func TestNodeLink(t *testing.T) {
// Close vs recvPkt on another side // Close vs recvPkt on another side
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &errgroup.Group{} wg = xsync.NewWorkGroup(bg)
gox(wg, func() { gox(wg, func(_ context.Context) {
tdelay() tdelay()
xclose(nl2) xclose(nl2)
}) })
...@@ -245,8 +249,8 @@ func TestNodeLink(t *testing.T) { ...@@ -245,8 +249,8 @@ func TestNodeLink(t *testing.T) {
// Close vs sendPkt on another side // Close vs sendPkt on another side
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &errgroup.Group{} wg = xsync.NewWorkGroup(bg)
gox(wg, func() { gox(wg, func(_ context.Context) {
tdelay() tdelay()
xclose(nl2) xclose(nl2)
}) })
...@@ -261,40 +265,54 @@ func TestNodeLink(t *testing.T) { ...@@ -261,40 +265,54 @@ func TestNodeLink(t *testing.T) {
// raw exchange // raw exchange
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg, ctx := errgroup.WithContext(context.Background()) wg = xsync.NewWorkGroup(bg)
gox(wg, func() { okch := make(chan int, 2)
gox(wg, func(_ context.Context) {
// send ping; wait for pong // send ping; wait for pong
pkt := _mkpkt(1, 2, []byte("ping")) pkt := _mkpkt(1, 2, []byte("ping"))
xsendPkt(nl1, pkt) xsendPkt(nl1, pkt)
pkt = xrecvPkt(nl1) pkt = xrecvPkt(nl1)
xverifyPkt(pkt, 3, 4, []byte("pong")) xverifyPkt(pkt, 3, 4, []byte("pong"))
okch <- 1
}) })
gox(wg, func() { gox(wg, func(_ context.Context) {
// wait for ping; send pong // wait for ping; send pong
pkt = xrecvPkt(nl2) pkt = xrecvPkt(nl2)
xverifyPkt(pkt, 1, 2, []byte("ping")) xverifyPkt(pkt, 1, 2, []byte("ping"))
pkt = _mkpkt(3, 4, []byte("pong")) pkt = _mkpkt(3, 4, []byte("pong"))
xsendPkt(nl2, pkt) xsendPkt(nl2, pkt)
okch <- 2
}) })
// close nodelinks either when checks are done, or upon first error // close nodelinks either when checks are done, or upon first error
wgclose := &errgroup.Group{} gox(wg, func(ctx context.Context) {
gox(wgclose, func() { nok := 0
<-ctx.Done() loop:
for {
select {
case <-ctx.Done():
break loop
case <-okch:
nok++
if nok == 2 {
break loop
}
}
}
xclose(nl1) xclose(nl1)
xclose(nl2) xclose(nl2)
}) })
xwait(wg) xwait(wg)
xwait(wgclose)
// ---- connections on top of nodelink ---- // ---- connections on top of nodelink ----
// Close vs recvPkt // Close vs recvPkt
nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
c = xnewconn(nl1) c = xnewconn(nl1)
wg = &errgroup.Group{} wg = xsync.NewWorkGroup(bg)
gox(wg, func() { gox(wg, func(_ context.Context) {
tdelay() tdelay()
xclose(c) xclose(c)
}) })
...@@ -309,8 +327,8 @@ func TestNodeLink(t *testing.T) { ...@@ -309,8 +327,8 @@ func TestNodeLink(t *testing.T) {
// Close vs sendPkt // Close vs sendPkt
nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
c = xnewconn(nl1) c = xnewconn(nl1)
wg = &errgroup.Group{} wg = xsync.NewWorkGroup(bg)
gox(wg, func() { gox(wg, func(_ context.Context) {
tdelay() tdelay()
xclose(c) xclose(c)
}) })
...@@ -324,14 +342,14 @@ func TestNodeLink(t *testing.T) { ...@@ -324,14 +342,14 @@ func TestNodeLink(t *testing.T) {
// NodeLink.Close vs Conn.sendPkt/recvPkt // NodeLink.Close vs Conn.sendPkt/recvPkt
c11 := xnewconn(nl1) c11 := xnewconn(nl1)
c12 := xnewconn(nl1) c12 := xnewconn(nl1)
wg = &errgroup.Group{} wg = xsync.NewWorkGroup(bg)
gox(wg, func() { gox(wg, func(_ context.Context) {
pkt, err := c11.recvPkt() pkt, err := c11.recvPkt()
if !(pkt == nil && xconnError(err) == ErrLinkClosed) { if !(pkt == nil && xconnError(err) == ErrLinkClosed) {
exc.Raisef("Conn.recvPkt() after NodeLink close: pkt = %v err = %v", pkt, err) exc.Raisef("Conn.recvPkt() after NodeLink close: pkt = %v err = %v", pkt, err)
} }
}) })
gox(wg, func() { gox(wg, func(_ context.Context) {
pkt := c12.mkpkt(0, []byte("data")) pkt := c12.mkpkt(0, []byte("data"))
err := c12.sendPkt(pkt) err := c12.sendPkt(pkt)
if xconnError(err) != ErrLinkClosed { if xconnError(err) != ErrLinkClosed {
...@@ -350,9 +368,9 @@ func TestNodeLink(t *testing.T) { ...@@ -350,9 +368,9 @@ func TestNodeLink(t *testing.T) {
c21 := xnewconn(nl2) c21 := xnewconn(nl2)
c22 := xnewconn(nl2) c22 := xnewconn(nl2)
c23 := xnewconn(nl2) c23 := xnewconn(nl2)
wg = &errgroup.Group{} wg = xsync.NewWorkGroup(bg)
var errRecv error var errRecv error
gox(wg, func() { gox(wg, func(_ context.Context) {
pkt, err := c21.recvPkt() pkt, err := c21.recvPkt()
want1 := io.EOF // if recvPkt wakes up due to peer close want1 := io.EOF // if recvPkt wakes up due to peer close
want2 := io.ErrClosedPipe // if recvPkt wakes up due to sendPkt wakes up first and closes nl1 want2 := io.ErrClosedPipe // if recvPkt wakes up due to sendPkt wakes up first and closes nl1
...@@ -363,7 +381,7 @@ func TestNodeLink(t *testing.T) { ...@@ -363,7 +381,7 @@ func TestNodeLink(t *testing.T) {
errRecv = cerr errRecv = cerr
}) })
gox(wg, func() { gox(wg, func(_ context.Context) {
pkt := c22.mkpkt(0, []byte("data")) pkt := c22.mkpkt(0, []byte("data"))
err := c22.sendPkt(pkt) err := c22.sendPkt(pkt)
want := io.ErrClosedPipe // always this in both due to peer close or recvPkt waking up and closing nl2 want := io.ErrClosedPipe // always this in both due to peer close or recvPkt waking up and closing nl2
...@@ -372,7 +390,7 @@ func TestNodeLink(t *testing.T) { ...@@ -372,7 +390,7 @@ func TestNodeLink(t *testing.T) {
} }
}) })
gox(wg, func() { gox(wg, func(_ context.Context) {
conn, err := nl2.Accept() conn, err := nl2.Accept()
if !(conn == nil && xlinkError(err) == ErrLinkDown) { if !(conn == nil && xlinkError(err) == ErrLinkDown) {
exc.Raisef("Accept after peer NodeLink shutdown: conn = %v err = %v", conn, err) exc.Raisef("Accept after peer NodeLink shutdown: conn = %v err = %v", conn, err)
...@@ -466,9 +484,9 @@ func TestNodeLink(t *testing.T) { ...@@ -466,9 +484,9 @@ func TestNodeLink(t *testing.T) {
// Conn accept + exchange // Conn accept + exchange
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
nl1.CloseAccept() nl1.CloseAccept()
wg = &errgroup.Group{} wg = xsync.NewWorkGroup(bg)
closed := make(chan int) closed := make(chan int)
gox(wg, func() { gox(wg, func(_ context.Context) {
c := xaccept(nl2) c := xaccept(nl2)
pkt := xrecvPkt(c) pkt := xrecvPkt(c)
...@@ -560,7 +578,7 @@ func TestNodeLink(t *testing.T) { ...@@ -560,7 +578,7 @@ func TestNodeLink(t *testing.T) {
// test 2 channels with replies coming in reversed time order // test 2 channels with replies coming in reversed time order
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
wg = &errgroup.Group{} wg = xsync.NewWorkGroup(bg)
replyOrder := map[uint16]struct { // "order" in which to process requests replyOrder := map[uint16]struct { // "order" in which to process requests
start chan struct{} // processing starts when start chan is ready start chan struct{} // processing starts when start chan is ready
next uint16 // after processing this switch to next next uint16 // after processing this switch to next
...@@ -570,11 +588,11 @@ func TestNodeLink(t *testing.T) { ...@@ -570,11 +588,11 @@ func TestNodeLink(t *testing.T) {
} }
close(replyOrder[2].start) close(replyOrder[2].start)
gox(wg, func() { gox(wg, func(_ context.Context) {
for _ = range replyOrder { for _ = range replyOrder {
c := xaccept(nl2) c := xaccept(nl2)
gox(wg, func() { gox(wg, func(_ context.Context) {
pkt := xrecvPkt(c) pkt := xrecvPkt(c)
n := packed.Ntoh16(pkt.Header().MsgCode) n := packed.Ntoh16(pkt.Header().MsgCode)
x := replyOrder[n] x := replyOrder[n]
...@@ -645,11 +663,13 @@ func xverifyMsg(msg1, msg2 proto.Msg) { ...@@ -645,11 +663,13 @@ func xverifyMsg(msg1, msg2 proto.Msg) {
} }
func TestRecv1Mode(t *testing.T) { func TestRecv1Mode(t *testing.T) {
bg := context.Background()
// Send1 // Send1
nl1, nl2 := nodeLinkPipe() nl1, nl2 := nodeLinkPipe()
wg := &errgroup.Group{} wg := xsync.NewWorkGroup(bg)
sync := make(chan int) sync := make(chan int)
gox(wg, func() { gox(wg, func(_ context.Context) {
defer func() { defer func() {
if e := recover(); e != nil { if e := recover(); e != nil {
panic(e) panic(e)
...@@ -685,8 +705,8 @@ func TestRecv1Mode(t *testing.T) { ...@@ -685,8 +705,8 @@ func TestRecv1Mode(t *testing.T) {
xwait(wg) xwait(wg)
// Recv1: further packets with same connid are rejected with "connection closed" // Recv1: further packets with same connid are rejected with "connection closed"
wg = &errgroup.Group{} wg = xsync.NewWorkGroup(bg)
gox(wg, func() { gox(wg, func(_ context.Context) {
c := xnewconn(nl2) c := xnewconn(nl2)
xSend(c, &proto.Ping{}) xSend(c, &proto.Ping{})
...@@ -711,11 +731,11 @@ func TestRecv1Mode(t *testing.T) { ...@@ -711,11 +731,11 @@ func TestRecv1Mode(t *testing.T) {
// bug triggers under -race. // bug triggers under -race.
func TestLightCloseVsLinkShutdown(t *testing.T) { func TestLightCloseVsLinkShutdown(t *testing.T) {
nl1, nl2 := nodeLinkPipe() nl1, nl2 := nodeLinkPipe()
wg := &errgroup.Group{} wg := xsync.NewWorkGroup(context.Background())
c := xnewconn(nl1) c := xnewconn(nl1)
nl1.shutdown() nl1.shutdown()
gox(wg, func() { gox(wg, func(_ context.Context) {
c.lightClose() c.lightClose()
}) })
...@@ -1062,14 +1082,14 @@ func benchmarkLinkRTT(b *testing.B, l1, l2 *NodeLink) { ...@@ -1062,14 +1082,14 @@ func benchmarkLinkRTT(b *testing.B, l1, l2 *NodeLink) {
func xlinkPipe(c1, c2 net.Conn) (*NodeLink, *NodeLink) { func xlinkPipe(c1, c2 net.Conn) (*NodeLink, *NodeLink) {
var l1, l2 *NodeLink var l1, l2 *NodeLink
wg := &errgroup.Group{} wg := xsync.NewWorkGroup(context.Background())
gox(wg, func() { gox(wg, func(ctx context.Context) {
l, err := _Handshake(context.Background(), c1, _LinkClient) l, err := _Handshake(ctx, c1, _LinkClient)
exc.Raiseif(err) exc.Raiseif(err)
l1 = l l1 = l
}) })
gox(wg, func() { gox(wg, func(ctx context.Context) {
l, err := _Handshake(context.Background(), c2, _LinkServer) l, err := _Handshake(ctx, c2, _LinkServer)
exc.Raiseif(err) exc.Raiseif(err)
l2 = l l2 = l
}) })
......
// Copyright (C) 2016-2018 Nexedi SA and Contributors. // Copyright (C) 2016-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -25,9 +25,8 @@ import ( ...@@ -25,9 +25,8 @@ import (
"net" "net"
"testing" "testing"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/exc" "lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/xsync"
) )
func xhandshake(ctx context.Context, c net.Conn, version uint32) { func xhandshake(ctx context.Context, c net.Conn, version uint32) {
...@@ -39,12 +38,12 @@ func TestHandshake(t *testing.T) { ...@@ -39,12 +38,12 @@ func TestHandshake(t *testing.T) {
bg := context.Background() bg := context.Background()
// handshake ok // handshake ok
p1, p2 := net.Pipe() p1, p2 := net.Pipe()
wg := &errgroup.Group{} wg := xsync.NewWorkGroup(bg)
gox(wg, func() { gox(wg, func(ctx context.Context) {
xhandshake(bg, p1, 1) xhandshake(ctx, p1, 1)
}) })
gox(wg, func() { gox(wg, func(ctx context.Context) {
xhandshake(bg, p2, 1) xhandshake(ctx, p2, 1)
}) })
xwait(wg) xwait(wg)
xclose(p1) xclose(p1)
...@@ -53,12 +52,12 @@ func TestHandshake(t *testing.T) { ...@@ -53,12 +52,12 @@ func TestHandshake(t *testing.T) {
// version mismatch // version mismatch
p1, p2 = net.Pipe() p1, p2 = net.Pipe()
var err1, err2 error var err1, err2 error
wg = &errgroup.Group{} wg = xsync.NewWorkGroup(bg)
gox(wg, func() { gox(wg, func(ctx context.Context) {
err1 = handshake(bg, p1, 1) err1 = handshake(ctx, p1, 1)
}) })
gox(wg, func() { gox(wg, func(ctx context.Context) {
err2 = handshake(bg, p2, 2) err2 = handshake(ctx, p2, 2)
}) })
xwait(wg) xwait(wg)
xclose(p1) xclose(p1)
...@@ -77,11 +76,11 @@ func TestHandshake(t *testing.T) { ...@@ -77,11 +76,11 @@ func TestHandshake(t *testing.T) {
// tx & rx problem // tx & rx problem
p1, p2 = net.Pipe() p1, p2 = net.Pipe()
err1, err2 = nil, nil err1, err2 = nil, nil
wg = &errgroup.Group{} wg = xsync.NewWorkGroup(bg)
gox(wg, func() { gox(wg, func(ctx context.Context) {
err1 = handshake(bg, p1, 1) err1 = handshake(ctx, p1, 1)
}) })
gox(wg, func() { gox(wg, func(_ context.Context) {
xclose(p2) xclose(p2)
}) })
xwait(wg) xwait(wg)
...@@ -96,7 +95,8 @@ func TestHandshake(t *testing.T) { ...@@ -96,7 +95,8 @@ func TestHandshake(t *testing.T) {
// ctx cancel // ctx cancel
p1, p2 = net.Pipe() p1, p2 = net.Pipe()
ctx, cancel := context.WithCancel(bg) ctx, cancel := context.WithCancel(bg)
gox(wg, func() { wg = xsync.NewWorkGroup(ctx)
gox(wg, func(ctx context.Context) {
err1 = handshake(ctx, p1, 1) err1 = handshake(ctx, p1, 1)
}) })
tdelay() tdelay()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment