Commit 4c098a14 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 4f871a72
...@@ -51,6 +51,16 @@ type T struct { ...@@ -51,6 +51,16 @@ type T struct {
enc proto.Encoding // encoding to use for messages exchange enc proto.Encoding // encoding to use for messages exchange
} }
// bin returns payload for raw binary data as would-be encoded by t.enc .
// XXX place
func (t *T) bin(data string) []byte {
switch t.enc {
case 'N': return []byte(data)
case 'M': return msgp.AppendBytes(nil, []byte(data))
default: panic("bug")
}
}
// Verify tests f for all possible environments. // Verify tests f for all possible environments.
func Verify(t *testing.T, f func(*T)) { func Verify(t *testing.T, f func(*T)) {
// for each encoding // for each encoding
...@@ -139,10 +149,11 @@ func _mkpkt(enc proto.Encoding, connid uint32, msgcode uint16, payload []byte) * ...@@ -139,10 +149,11 @@ func _mkpkt(enc proto.Encoding, connid uint32, msgcode uint16, payload []byte) *
b = msgp.AppendArrayHeader (b, 3) b = msgp.AppendArrayHeader (b, 3)
b = msgp.AppendUint32 (b, connid) b = msgp.AppendUint32 (b, connid)
b = msgp.AppendUint16 (b, msgcode) b = msgp.AppendUint16 (b, msgcode)
// NOTE payload is appended wrapped into bin object. We need // // NOTE payload is appended wrapped into bin object. We need
// this not to break framing, because in M-encoding whole // // this not to break framing, because in M-encoding whole
// packet must be a valid msgpack object. // // packet must be a valid msgpack object.
b = msgp.AppendBytes (b, payload) // b = msgp.AppendBytes (b, payload)
b = append (b, payload...)
return &pktBuf{b} return &pktBuf{b}
default: default:
...@@ -170,10 +181,10 @@ func (t *T) xverifyPkt(pkt *pktBuf, connid uint32, msgcode uint16, payload []byt ...@@ -170,10 +181,10 @@ func (t *T) xverifyPkt(pkt *pktBuf, connid uint32, msgcode uint16, payload []byt
errv.Appendf("header: unexpected msgcode %v (want %v)", pktMsgCode, msgcode) errv.Appendf("header: unexpected msgcode %v (want %v)", pktMsgCode, msgcode)
} }
// M-encoding -> wrap payloadOK into bin (see _mkpkt ^^^ for why) // // M-encoding -> wrap payloadOK into bin (see _mkpkt ^^^ for why)
if t.enc == 'M' { // if t.enc == 'M' {
payload = msgp.AppendBytes(nil, payload) // payload = msgp.AppendBytes(nil, payload)
} // }
if !bytes.Equal(pktPayload, payload) { if !bytes.Equal(pktPayload, payload) {
errv.Appendf("payload differ:\n%s", errv.Appendf("payload differ:\n%s",
pretty.Compare(string(payload), string(pktPayload))) pretty.Compare(string(payload), string(pktPayload)))
...@@ -220,6 +231,7 @@ func TestNodeLink(t *testing.T) { ...@@ -220,6 +231,7 @@ func TestNodeLink(t *testing.T) {
} }
func _TestNodeLink(t *T) { func _TestNodeLink(t *T) {
// TODO catch exception -> add proper location from it -> t.Fatal (see git-backup) // TODO catch exception -> add proper location from it -> t.Fatal (see git-backup)
b := t.bin
bg := context.Background() bg := context.Background()
// Close vs recvPkt // Close vs recvPkt
...@@ -315,17 +327,17 @@ func _TestNodeLink(t *T) { ...@@ -315,17 +327,17 @@ func _TestNodeLink(t *T) {
okch := make(chan int, 2) okch := make(chan int, 2)
gox(wg, func(_ context.Context) { gox(wg, func(_ context.Context) {
// send ping; wait for pong // send ping; wait for pong
pkt := _mkpkt(t.enc, 1, 2, []byte("ping")) pkt := _mkpkt(t.enc, 1, 2, b("ping"))
xsendPkt(nl1, pkt) xsendPkt(nl1, pkt)
pkt = xrecvPkt(nl1) pkt = xrecvPkt(nl1)
t.xverifyPkt(pkt, 3, 4, []byte("pong")) t.xverifyPkt(pkt, 3, 4, b("pong"))
okch <- 1 okch <- 1
}) })
gox(wg, func(_ context.Context) { gox(wg, func(_ context.Context) {
// wait for ping; send pong // wait for ping; send pong
pkt = xrecvPkt(nl2) pkt = xrecvPkt(nl2)
t.xverifyPkt(pkt, 1, 2, []byte("ping")) t.xverifyPkt(pkt, 1, 2, b("ping"))
pkt = _mkpkt(t.enc, 3, 4, []byte("pong")) pkt = _mkpkt(t.enc, 3, 4, b("pong"))
xsendPkt(nl2, pkt) xsendPkt(nl2, pkt)
okch <- 2 okch <- 2
}) })
...@@ -378,7 +390,7 @@ func _TestNodeLink(t *T) { ...@@ -378,7 +390,7 @@ func _TestNodeLink(t *T) {
tdelay() tdelay()
xclose(c) xclose(c)
}) })
pkt = c.mkpkt(0, []byte("data")) pkt = c.mkpkt(0, b("data"))
err = c.sendPkt(pkt) err = c.sendPkt(pkt)
if xconnError(err) != ErrClosedConn { if xconnError(err) != ErrClosedConn {
t.Fatalf("Conn.sendPkt() after close: err = %v", err) t.Fatalf("Conn.sendPkt() after close: err = %v", err)
...@@ -396,7 +408,7 @@ func _TestNodeLink(t *T) { ...@@ -396,7 +408,7 @@ func _TestNodeLink(t *T) {
} }
}) })
gox(wg, func(_ context.Context) { gox(wg, func(_ context.Context) {
pkt := c12.mkpkt(0, []byte("data")) pkt := c12.mkpkt(0, b("data"))
err := c12.sendPkt(pkt) err := c12.sendPkt(pkt)
if xconnError(err) != ErrLinkClosed { if xconnError(err) != ErrLinkClosed {
exc.Raisef("Conn.sendPkt() after NodeLink close: err = %v", err) exc.Raisef("Conn.sendPkt() after NodeLink close: err = %v", err)
...@@ -428,7 +440,7 @@ func _TestNodeLink(t *T) { ...@@ -428,7 +440,7 @@ func _TestNodeLink(t *T) {
errRecv = cerr errRecv = cerr
}) })
gox(wg, func(_ context.Context) { gox(wg, func(_ context.Context) {
pkt := c22.mkpkt(0, []byte("data")) pkt := c22.mkpkt(0, b("data"))
err := c22.sendPkt(pkt) err := c22.sendPkt(pkt)
want := io.ErrClosedPipe // always this in both due to peer close or recvPkt waking up and closing nl2 want := io.ErrClosedPipe // always this in both due to peer close or recvPkt waking up and closing nl2
if xconnError(err) != want { if xconnError(err) != want {
...@@ -465,7 +477,7 @@ func _TestNodeLink(t *T) { ...@@ -465,7 +477,7 @@ func _TestNodeLink(t *T) {
if !(pkt == nil && xconnError(err) == errRecv) { if !(pkt == nil && xconnError(err) == errRecv) {
t.Fatalf("Conn.recvPkt 2 after peer NodeLink shutdown: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.recvPkt 2 after peer NodeLink shutdown: pkt = %v err = %v", pkt, err)
} }
err = c23.sendPkt(c23.mkpkt(0, []byte("data"))) err = c23.sendPkt(c23.mkpkt(0, b("data")))
if xconnError(err) != ErrLinkDown { if xconnError(err) != ErrLinkDown {
t.Fatalf("Conn.sendPkt 2 after peer NodeLink shutdown: %v", err) t.Fatalf("Conn.sendPkt 2 after peer NodeLink shutdown: %v", err)
} }
...@@ -475,7 +487,7 @@ func _TestNodeLink(t *T) { ...@@ -475,7 +487,7 @@ func _TestNodeLink(t *T) {
if !(pkt == nil && xconnError(err) == ErrLinkDown) { if !(pkt == nil && xconnError(err) == ErrLinkDown) {
t.Fatalf("Conn.recvPkt after NodeLink shutdown: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.recvPkt after NodeLink shutdown: pkt = %v err = %v", pkt, err)
} }
err = c22.sendPkt(c22.mkpkt(0, []byte("data"))) err = c22.sendPkt(c22.mkpkt(0, b("data")))
if xconnError(err) != ErrLinkDown { if xconnError(err) != ErrLinkDown {
t.Fatalf("Conn.sendPkt after NodeLink shutdown: %v", err) t.Fatalf("Conn.sendPkt after NodeLink shutdown: %v", err)
} }
...@@ -486,7 +498,7 @@ func _TestNodeLink(t *T) { ...@@ -486,7 +498,7 @@ func _TestNodeLink(t *T) {
if !(pkt == nil && xconnError(err) == ErrClosedConn) { if !(pkt == nil && xconnError(err) == ErrClosedConn) {
t.Fatalf("Conn.recvPkt after close but only stopped NodeLink: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.recvPkt after close but only stopped NodeLink: pkt = %v err = %v", pkt, err)
} }
err = c23.sendPkt(c23.mkpkt(0, []byte("data"))) err = c23.sendPkt(c23.mkpkt(0, b("data")))
if xconnError(err) != ErrClosedConn { if xconnError(err) != ErrClosedConn {
t.Fatalf("Conn.sendPkt after close but only stopped NodeLink: %v", err) t.Fatalf("Conn.sendPkt after close but only stopped NodeLink: %v", err)
} }
...@@ -497,7 +509,7 @@ func _TestNodeLink(t *T) { ...@@ -497,7 +509,7 @@ func _TestNodeLink(t *T) {
if !(pkt == nil && xconnError(err) == ErrLinkClosed) { if !(pkt == nil && xconnError(err) == ErrLinkClosed) {
t.Fatalf("Conn.recvPkt after NodeLink shutdown: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.recvPkt after NodeLink shutdown: pkt = %v err = %v", pkt, err)
} }
err = c22.sendPkt(c22.mkpkt(0, []byte("data"))) err = c22.sendPkt(c22.mkpkt(0, b("data")))
if xconnError(err) != ErrLinkClosed { if xconnError(err) != ErrLinkClosed {
t.Fatalf("Conn.sendPkt after NodeLink shutdown: %v", err) t.Fatalf("Conn.sendPkt after NodeLink shutdown: %v", err)
} }
...@@ -518,7 +530,7 @@ func _TestNodeLink(t *T) { ...@@ -518,7 +530,7 @@ func _TestNodeLink(t *T) {
if !(pkt == nil && xconnError(err) == ErrClosedConn) { if !(pkt == nil && xconnError(err) == ErrClosedConn) {
t.Fatalf("Conn.recvPkt after close and NodeLink close: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.recvPkt after close and NodeLink close: pkt = %v err = %v", pkt, err)
} }
err = c22.sendPkt(c22.mkpkt(0, []byte("data"))) err = c22.sendPkt(c22.mkpkt(0, b("data")))
if xconnError(err) != ErrClosedConn { if xconnError(err) != ErrClosedConn {
t.Fatalf("Conn.sendPkt after close and NodeLink close: %v", err) t.Fatalf("Conn.sendPkt after close and NodeLink close: %v", err)
} }
...@@ -536,15 +548,15 @@ func _TestNodeLink(t *T) { ...@@ -536,15 +548,15 @@ func _TestNodeLink(t *T) {
c := xaccept(nl2) c := xaccept(nl2)
pkt := xrecvPkt(c) pkt := xrecvPkt(c)
t.xverifyPkt(pkt, c.connId, 33, []byte("ping")) t.xverifyPkt(pkt, c.connId, 33, b("ping"))
// change pkt a bit and send it back // change pkt a bit and send it back
xsendPkt(c, c.mkpkt(34, []byte("pong"))) xsendPkt(c, c.mkpkt(34, b("pong")))
// one more time // one more time
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
t.xverifyPkt(pkt, c.connId, 35, []byte("ping2")) t.xverifyPkt(pkt, c.connId, 35, b("ping2"))
xsendPkt(c, c.mkpkt(36, []byte("pong2"))) xsendPkt(c, c.mkpkt(36, b("pong2")))
xclose(c) xclose(c)
closed <- 1 closed <- 1
...@@ -552,18 +564,18 @@ func _TestNodeLink(t *T) { ...@@ -552,18 +564,18 @@ func _TestNodeLink(t *T) {
// once again as ^^^ but finish only with CloseRecv // once again as ^^^ but finish only with CloseRecv
c2 := xaccept(nl2) c2 := xaccept(nl2)
pkt = xrecvPkt(c2) pkt = xrecvPkt(c2)
t.xverifyPkt(pkt, c2.connId, 41, []byte("ping5")) t.xverifyPkt(pkt, c2.connId, 41, b("ping5"))
xsendPkt(c2, c2.mkpkt(42, []byte("pong5"))) xsendPkt(c2, c2.mkpkt(42, b("pong5")))
c2.CloseRecv() c2.CloseRecv()
closed <- 2 closed <- 2
// "connection refused" when trying to connect to not-listening peer // "connection refused" when trying to connect to not-listening peer
c = xnewconn(nl2) // XXX should get error here? c = xnewconn(nl2) // XXX should get error here?
xsendPkt(c, c.mkpkt(38, []byte("pong3"))) xsendPkt(c, c.mkpkt(38, b("pong3")))
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
t.xverifyPktMsg(pkt, c.connId, errConnRefused) t.xverifyPktMsg(pkt, c.connId, errConnRefused)
xsendPkt(c, c.mkpkt(40, []byte("pong4"))) // once again xsendPkt(c, c.mkpkt(40, b("pong4"))) // once again
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
t.xverifyPktMsg(pkt, c.connId, errConnRefused) t.xverifyPktMsg(pkt, c.connId, errConnRefused)
...@@ -572,30 +584,30 @@ func _TestNodeLink(t *T) { ...@@ -572,30 +584,30 @@ func _TestNodeLink(t *T) {
}) })
c1 := xnewconn(nl1) c1 := xnewconn(nl1)
xsendPkt(c1, c1.mkpkt(33, []byte("ping"))) xsendPkt(c1, c1.mkpkt(33, b("ping")))
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
t.xverifyPkt(pkt, c1.connId, 34, []byte("pong")) t.xverifyPkt(pkt, c1.connId, 34, b("pong"))
xsendPkt(c1, c1.mkpkt(35, []byte("ping2"))) xsendPkt(c1, c1.mkpkt(35, b("ping2")))
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
t.xverifyPkt(pkt, c1.connId, 36, []byte("pong2")) t.xverifyPkt(pkt, c1.connId, 36, b("pong2"))
// "connection closed" after peer closed its end // "connection closed" after peer closed its end
<-closed <-closed
xsendPkt(c1, c1.mkpkt(37, []byte("ping3"))) xsendPkt(c1, c1.mkpkt(37, b("ping3")))
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
t.xverifyPktMsg(pkt, c1.connId, errConnClosed) t.xverifyPktMsg(pkt, c1.connId, errConnClosed)
xsendPkt(c1, c1.mkpkt(39, []byte("ping4"))) // once again xsendPkt(c1, c1.mkpkt(39, b("ping4"))) // once again
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
t.xverifyPktMsg(pkt, c1.connId, errConnClosed) t.xverifyPktMsg(pkt, c1.connId, errConnClosed)
// XXX also should get EOF on recv // XXX also should get EOF on recv
// one more time but now peer does only .CloseRecv() // one more time but now peer does only .CloseRecv()
c2 := xnewconn(nl1) c2 := xnewconn(nl1)
xsendPkt(c2, c2.mkpkt(41, []byte("ping5"))) xsendPkt(c2, c2.mkpkt(41, b("ping5")))
pkt = xrecvPkt(c2) pkt = xrecvPkt(c2)
t.xverifyPkt(pkt, c2.connId, 42, []byte("pong5")) t.xverifyPkt(pkt, c2.connId, 42, b("pong5"))
<-closed <-closed
xsendPkt(c2, c2.mkpkt(41, []byte("ping6"))) xsendPkt(c2, c2.mkpkt(41, b("ping6")))
pkt = xrecvPkt(c2) pkt = xrecvPkt(c2)
t.xverifyPktMsg(pkt, c2.connId, errConnClosed) t.xverifyPktMsg(pkt, c2.connId, errConnClosed)
...@@ -660,13 +672,13 @@ func _TestNodeLink(t *T) { ...@@ -660,13 +672,13 @@ func _TestNodeLink(t *T) {
c1 = xnewconn(nl1) c1 = xnewconn(nl1)
c2 = xnewconn(nl1) c2 = xnewconn(nl1)
xsendPkt(c1, c1.mkpkt(1, []byte(""))) xsendPkt(c1, c1.mkpkt(1, b("")))
xsendPkt(c2, c2.mkpkt(2, []byte(""))) xsendPkt(c2, c2.mkpkt(2, b("")))
// replies must be coming in reverse order // replies must be coming in reverse order
xechoWait := func(c *Conn, msgCode uint16) { xechoWait := func(c *Conn, msgCode uint16) {
pkt := xrecvPkt(c) pkt := xrecvPkt(c)
t.xverifyPkt(pkt, c.connId, msgCode, []byte("")) t.xverifyPkt(pkt, c.connId, msgCode, b(""))
} }
xechoWait(c2, 2) xechoWait(c2, 2)
xechoWait(c1, 1) xechoWait(c1, 1)
......
...@@ -58,14 +58,15 @@ var pktBufPool = sync.Pool{New: func() interface{} { ...@@ -58,14 +58,15 @@ var pktBufPool = sync.Pool{New: func() interface{} {
}} }}
// pktAlloc allocates pktBuf with len=n. // pktAlloc allocates pktBuf with len=n.
//
// n must be >= sizeof(proto.PktHeader).
func pktAlloc(n int) *pktBuf { func pktAlloc(n int) *pktBuf {
if n < proto.PktHeaderLen { // make sure cap >= PktHeaderLen.
panic("pktAlloc: n < sizeof(PktHeader)") // see HeaderN for why
l := n
if l < proto.PktHeaderLen {
l = proto.PktHeaderLen
} }
pkt := pktBufPool.Get().(*pktBuf) pkt := pktBufPool.Get().(*pktBuf)
pkt.data = xbytes.Realloc(pkt.data, n) pkt.data = xbytes.Realloc(pkt.data, l)[:n]
return pkt return pkt
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment