Commit 190c83b5 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 0045b444
...@@ -20,99 +20,123 @@ import ( ...@@ -20,99 +20,123 @@ import (
"bytes" "bytes"
"context" "context"
"io" "io"
"fmt" //"fmt"
"net" "net"
"testing" "testing"
"time" "time"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/myname"
"lab.nexedi.com/kirr/go123/xerr"
) )
// run function which raises exception, and return exception as error, if any
// XXX -> exc.Runx ?
func runx(xf func()) (err error) {
//here := my.FuncName()
here := myname.Func()
defer exc.Catch(func(e *exc.Error) {
err = exc.Addcallingcontext(here, e)
})
xf()
return
}
// XXX move me out of here ?
type workGroup struct {
*errgroup.Group
}
// like errgroup.Go but translates exceptions to errors
func (wg *workGroup) Gox(xf func ()) {
wg.Go(func() error {
return runx(xf)
})
}
func WorkGroup() *workGroup {
return &workGroup{&errgroup.Group{}}
}
func WorkGroupCtx(ctx context.Context) (*workGroup, context.Context) {
g, ctx := errgroup.WithContext(ctx)
return &workGroup{g}, ctx
}
////////////////////////////////////////
func xclose(c io.Closer) {
err := c.Close()
exc.Raiseif(err)
}
func xsend(c *Conn, pkt *PktBuf) { func xsend(c *Conn, pkt *PktBuf) {
err := c.Send(pkt) err := c.Send(pkt)
if err != nil { exc.Raiseif(err)
//t.Fatal(err) // XXX make sure this happens in main goroutine
panic("TODO")
}
} }
func xrecv(c *Conn) *PktBuf { func xrecv(c *Conn) *PktBuf {
pkt, err := c.Recv() pkt, err := c.Recv()
if err != nil { exc.Raiseif(err)
//t.Fatal(err) // XXX make sure this happens in main goroutine
panic("TODO")
}
return pkt return pkt
} }
func xsendPkt(nl *NodeLink, pkt *PktBuf) { func xsendPkt(nl *NodeLink, pkt *PktBuf) {
err := nl.sendPkt(pkt) err := nl.sendPkt(pkt)
if err != nil { exc.Raiseif(err)
panic("TODO")
}
} }
func xrecvPkt(nl *NodeLink) *PktBuf { func xrecvPkt(nl *NodeLink) *PktBuf {
pkt, err := nl.recvPkt() pkt, err := nl.recvPkt()
if err != nil { exc.Raiseif(err)
panic("TODO")
}
return pkt return pkt
} }
/*
func xspawn(funcv ...func()) {
for f := range funcv {
go func() {
defer func() {
e := recover()
if e == nil {
return
}
}
}
}
}
*/
// // run f in a goroutine; check its return error; if != nil -> t.Fatal in main goroutine
// func xgo(t *testing.T, f func() error) {
// var err error
// done := make(chan struct{})
// go func() {
// err = f()
// close(done)
// if err != nil {
// panic(err) // XXX temp - see vvv
// }
// }()
// /* FIXME below just blocks main goroutine waiting for f() to complete
// <-done
// if err != nil {
// t.Fatal(err) // TODO adjust lineno (report calling location, not here)
// }
// */
// }
func xwait(t *testing.T, w interface { Wait() error }) { func xwait(t *testing.T, w interface { Wait() error }) {
err := w.Wait() err := w.Wait()
if err != nil { exc.Raiseif(err)
/*
if err != nil { // XXX -> exc.Raise ?
t.Fatal(err) // TODO include caller location t.Fatal(err) // TODO include caller location
} }
*/
} }
// Prepare PktBuf with content // Prepare PktBuf with content
func mkpkt(msgid uint32, msgcode uint16, payload []byte) *PktBuf { func mkpkt(msgid uint32, msgcode uint16, payload []byte) *PktBuf {
pkt := &PktBuf{make([]byte, PktHeadLen + len(payload))} pkt := &PktBuf{make([]byte, PktHeadLen + len(payload))}
pkth := pkt.Header() h := pkt.Header()
pkth.MsgId = hton32(msgid) h.MsgId = hton32(msgid)
pkth.MsgCode = hton16(msgcode) h.MsgCode = hton16(msgcode)
pkth.Len = hton32(PktHeadLen + 4) h.Len = hton32(PktHeadLen + 4)
copy(pkt.Payload(), payload) copy(pkt.Payload(), payload)
return pkt return pkt
} }
// Verify PktBuf is as expected
func xverifyPkt(pkt *PktBuf, msgid uint32, msgcode uint16, payload []byte) {
errv := xerr.Errorv{}
h := pkt.Header()
// TODO include caller location
if ntoh32(h.MsgId) != msgid {
errv.Appendf("header: unexpected msgid %v (want %v)", ntoh32(h.MsgId), msgid)
}
if ntoh16(h.MsgCode) != msgcode {
errv.Appendf("header: unexpected msgcode %v (want %v)", ntoh16(h.MsgCode), msgcode)
}
if ntoh32(h.Len) != uint32(PktHeadLen + len(payload)) {
errv.Appendf("header: unexpected length %v (want %v)", ntoh32(h.Len), PktHeadLen + len(payload))
}
if !bytes.Equal(pkt.Payload(), payload) {
errv.Appendf("payload differ") // XXX also print payload ?
}
exc.Raiseif( errv.Err() )
}
// delay a bit // delay a bit
// needed e.g. to test Close interaction with waiting read or write // needed e.g. to test Close interaction with waiting read or write
// (we cannot easily sync and make sure e.g. read is started and became asleep) // (we cannot easily sync and make sure e.g. read is started and became asleep)
...@@ -130,32 +154,34 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) { ...@@ -130,32 +154,34 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) {
func TestNodeLink(t *testing.T) { func TestNodeLink(t *testing.T) {
// TODO catch exception -> add proper location from it -> t.Fatal (see git-backup)
nl1, nl2 := nodeLinkPipe() nl1, nl2 := nodeLinkPipe()
// Close vs recvPkt // Close vs recvPkt
g := &errgroup.Group{} wg := WorkGroup()
g.Go(func() error { wg.Gox(func() {
tdelay() tdelay()
return nl1.Close() xclose(nl1)
}) })
pkt, err := nl1.recvPkt() pkt, err := nl1.recvPkt()
if !(pkt == nil && err == io.ErrClosedPipe) { if !(pkt == nil && err == io.ErrClosedPipe) {
t.Fatalf("NodeLink.recvPkt() after close: pkt = %v err = %v", pkt, err) t.Fatalf("NodeLink.recvPkt() after close: pkt = %v err = %v", pkt, err)
} }
xwait(t, g) xwait(t, wg)
// Close vs sendPkt // Close vs sendPkt
g = &errgroup.Group{} wg = WorkGroup()
g.Go(func() error { wg.Gox(func() {
tdelay() tdelay()
return nl2.Close() xclose(nl2)
}) })
pkt = &PktBuf{[]byte("data")} pkt = &PktBuf{[]byte("data")}
err = nl2.sendPkt(pkt) err = nl2.sendPkt(pkt)
if err != io.ErrClosedPipe { if err != io.ErrClosedPipe {
t.Fatalf("NodeLink.sendPkt() after close: err = %v", err) t.Fatalf("NodeLink.sendPkt() after close: err = %v", err)
} }
xwait(t, g) xwait(t, wg)
// TODO (?) every func: run with exception catcher (including t.Fatal) // TODO (?) every func: run with exception catcher (including t.Fatal)
// if caught: // if caught:
...@@ -165,10 +191,14 @@ func TestNodeLink(t *testing.T) { ...@@ -165,10 +191,14 @@ func TestNodeLink(t *testing.T) {
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
// check raw exchange works // check raw exchange works
g, ctx := errgroup.WithContext(context.Background()) wg, ctx := WorkGroupCtx(context.Background())
g.Go(func() error { wg.Gox(func() {
// send ping; wait for pong // send ping; wait for pong
pkt := mkpkt(1, 2, []byte("ping")) pkt := mkpkt(1, 2, []byte("ping"))
xsendPkt(nl1, pkt)
pkt = xrecvPkt(nl1)
xverifyPkt(pkt, 3, 4, []byte("pong"))
/*
err := nl1.sendPkt(pkt) err := nl1.sendPkt(pkt)
if err != nil { if err != nil {
t.Errorf("nl1.sendPkt: %v", err) t.Errorf("nl1.sendPkt: %v", err)
...@@ -179,28 +209,28 @@ func TestNodeLink(t *testing.T) { ...@@ -179,28 +209,28 @@ func TestNodeLink(t *testing.T) {
t.Errorf("nl1.recvPkt: %v", err) t.Errorf("nl1.recvPkt: %v", err)
return err return err
} }
// TODO vvv also check msgid, msgcode err = verifyPkt("nl1 received", t, pkt, 3, 4, []byte("pong"))
if !bytes.Equal(pkt.Payload(), []byte("pong")) { if err != nil {
// XXX vvv -> util ? return err
e := fmt.Errorf("nl1 received: %v ; want \"pong\"", pkt.Data)
t.Error(e)
return e
} }
return nil return nil
*/
}) })
g.Go(func() error { wg.Gox(func() {
// wait for ping; send pong // wait for ping; send pong
pkt = xrecvPkt(nl2)
xverifyPkt(pkt, 1, 2, []byte("ping"))
pkt = mkpkt(3, 4, []byte("pong"))
xsendPkt(nl2, pkt)
/*
pkt, err := nl2.recvPkt() pkt, err := nl2.recvPkt()
if err != nil { if err != nil {
t.Errorf("nl2.recvPkt: %v", err) t.Errorf("nl2.recvPkt: %v", err)
return err return err
} }
// TODO vvv also check msgid, msgcode err = verifyPkt("nl2 received", t, pkt, 1, 2, []byte("ping"))
if !bytes.Equal(pkt.Payload(), []byte("ping")) { if err != nil {
// XXX vvv -> util ? return err
e := fmt.Errorf("nl2 received: %v ; want \"ping\"", pkt.Data)
t.Error(e)
return e
} }
pkt = mkpkt(3, 4, []byte("pong")) pkt = mkpkt(3, 4, []byte("pong"))
err = nl2.sendPkt(pkt) err = nl2.sendPkt(pkt)
...@@ -209,47 +239,20 @@ func TestNodeLink(t *testing.T) { ...@@ -209,47 +239,20 @@ func TestNodeLink(t *testing.T) {
return err return err
} }
return nil return nil
})
g2 := &errgroup.Group{}
g2.Go(func() error {
<-ctx.Done()
ev := xerror.Errorv{}
ev.addif( nl1.Close() )
ev.addif( nl2.Close() )
return ev.Reduce() // nil if len==0; [0] if len==1; [...] otherwise
/*
for c := range []io.Closer{nl1, nl2} {
err := c.Close()
if err != nil {
ev = append(ev, err)
}
}
*/
/*
err := nl1.Close()
err2 := nl2.Close()
if err == nil {
err = err2
}
*/ */
}) })
xwait(g)
xwait(g2)
// close nodelinks either when checks are done, or upon first error // close nodelinks either when checks are done, or upon first error
closed := make(chan struct{}) wgclose := WorkGroup()
go func() { wgclose.Gox(func() {
<-ctx.Done() <-ctx.Done()
nl1.Close() // XXX err xclose(nl1)
nl2.Close() // XXX err xclose(nl2)
close(closed) })
}()
xwait(t, wg)
xwait(t, wgclose)
xwait(t, g)
<-closed
/* /*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment