Commit ee71bd1a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a35297b6
...@@ -20,6 +20,8 @@ import ( ...@@ -20,6 +20,8 @@ import (
"net" "net"
"sync" "sync"
"unsafe" "unsafe"
"fmt"
) )
// NodeLink is a node-node link in NEO // NodeLink is a node-node link in NEO
...@@ -50,7 +52,9 @@ type NodeLink struct { ...@@ -50,7 +52,9 @@ type NodeLink struct {
connTab map[uint32]*Conn // connId -> Conn associated with connId connTab map[uint32]*Conn // connId -> Conn associated with connId
nextConnId uint32 // next connId to use for Conn initiated by us nextConnId uint32 // next connId to use for Conn initiated by us
handleNewConn func(conn *Conn) // handler for new connections XXX -> ConnHandler (a-la Handler in net/http) ? serveWg sync.WaitGroup
handleNewConn func(conn *Conn) // handler for new connections
txreq chan txReq // tx requests from Conns go via here txreq chan txReq // tx requests from Conns go via here
closed chan struct{} closed chan struct{}
...@@ -95,6 +99,10 @@ type ConnRole int ...@@ -95,6 +99,10 @@ type ConnRole int
const ( const (
ConnServer ConnRole = iota // connection created as server ConnServer ConnRole = iota // connection created as server
ConnClient // connection created as client ConnClient // connection created as client
// for testing: do not spawn serveRecv & serveSend
connNoRecvSend ConnRole = 1<<16
connFlagsMask ConnRole = (1<<32 - 1) << 16
) )
// Make a new NodeLink from already established net.Conn // Make a new NodeLink from already established net.Conn
...@@ -106,7 +114,7 @@ const ( ...@@ -106,7 +114,7 @@ const (
// net.Listen/net.Accept and client role for connections created via net.Dial. // net.Listen/net.Accept and client role for connections created via net.Dial.
func NewNodeLink(conn net.Conn, role ConnRole) *NodeLink { func NewNodeLink(conn net.Conn, role ConnRole) *NodeLink {
var nextConnId uint32 var nextConnId uint32
switch role { switch role&^connFlagsMask {
case ConnServer: case ConnServer:
nextConnId = 0 // all initiated by us connId will be even nextConnId = 0 // all initiated by us connId will be even
case ConnClient: case ConnClient:
...@@ -122,8 +130,11 @@ func NewNodeLink(conn net.Conn, role ConnRole) *NodeLink { ...@@ -122,8 +130,11 @@ func NewNodeLink(conn net.Conn, role ConnRole) *NodeLink {
txreq: make(chan txReq), txreq: make(chan txReq),
closed: make(chan struct{}), closed: make(chan struct{}),
} }
// TODO go nl.serveRecv() if role&connNoRecvSend == 0 {
// TODO go nl.serveSend() nl.serveWg.Add(2)
go nl.serveRecv()
go nl.serveSend()
}
return &nl return &nl
} }
...@@ -133,17 +144,16 @@ func (nl *NodeLink) Close() error { ...@@ -133,17 +144,16 @@ func (nl *NodeLink) Close() error {
close(nl.closed) close(nl.closed)
err := nl.peerLink.Close() err := nl.peerLink.Close()
// TODO wait for serve{Send,Recv} to complete // wait for serve{Send,Recv} to complete
//nl.wg.Wait() fmt.Printf("%p serveWg.Wait ...\n", nl)
nl.serveWg.Wait()
fmt.Printf("%p\t (wait) -> woken up\n", nl)
// close active Conns // close active Conns
nl.connMu.Lock() nl.connMu.Lock()
defer nl.connMu.Unlock() defer nl.connMu.Unlock()
for _, conn := range nl.connTab { for _, conn := range nl.connTab {
// FIXME it also wants to lock conntab -> conn.close() ? conn.close()
println("conn", conn.connId, " -> closing ...")
// XXX only interrupt, not close? Or allow multiple conn.Close() ?
conn.close() // XXX err -> errv
} }
nl.connTab = nil // clear + mark closed nl.connTab = nil // clear + mark closed
return err return err
...@@ -233,9 +243,12 @@ func (nl *NodeLink) NewConn() *Conn { ...@@ -233,9 +243,12 @@ func (nl *NodeLink) NewConn() *Conn {
// serveRecv handles incoming packets routing them to either appropriate // serveRecv handles incoming packets routing them to either appropriate
// already-established connection or to new serving goroutine. // already-established connection or to new serving goroutine.
func (nl *NodeLink) serveRecv() { func (nl *NodeLink) serveRecv() {
defer nl.serveWg.Done()
for { for {
// receive 1 packet // receive 1 packet
println(nl, "serveRecv -> recv...")
pkt, err := nl.recvPkt() pkt, err := nl.recvPkt()
fmt.Printf("%p\t (recv) -> %v\n", nl, err)
if err != nil { if err != nil {
// this might be just error on close - simply stop in such case // this might be just error on close - simply stop in such case
select { select {
...@@ -248,13 +261,15 @@ func (nl *NodeLink) serveRecv() { ...@@ -248,13 +261,15 @@ func (nl *NodeLink) serveRecv() {
// pkt.ConnId -> Conn // pkt.ConnId -> Conn
connId := ntoh32(pkt.Header().ConnId) connId := ntoh32(pkt.Header().ConnId)
accept := false var handleNewConn func(conn *Conn)
nl.connMu.Lock() nl.connMu.Lock()
conn := nl.connTab[connId] conn := nl.connTab[connId]
if conn == nil && nl.handleNewConn != nil { if conn == nil {
handleNewConn = nl.handleNewConn
if handleNewConn != nil {
conn = nl.newConn(connId) conn = nl.newConn(connId)
accept = true }
} }
nl.connMu.Unlock() nl.connMu.Unlock()
...@@ -266,10 +281,10 @@ func (nl *NodeLink) serveRecv() { ...@@ -266,10 +281,10 @@ func (nl *NodeLink) serveRecv() {
// we are accepting new incoming connection - spawn // we are accepting new incoming connection - spawn
// connection-serving goroutine // connection-serving goroutine
if accept { if handleNewConn != nil {
// TODO avoid spawning goroutine for each new Ask request - // TODO avoid spawning goroutine for each new Ask request -
// - by keeping pool of read inactive goroutine / conn pool ? // - by keeping pool of read inactive goroutine / conn pool ?
go nl.handleNewConn(conn) go handleNewConn(conn)
} }
// route packet to serving goroutine handler // route packet to serving goroutine handler
...@@ -287,12 +302,17 @@ type txReq struct { ...@@ -287,12 +302,17 @@ type txReq struct {
// serveSend handles requests to transmit packets from client connections and // serveSend handles requests to transmit packets from client connections and
// serially executes them over associated node link. // serially executes them over associated node link.
func (nl *NodeLink) serveSend() { func (nl *NodeLink) serveSend() {
defer nl.serveWg.Done()
runloop:
for { for {
fmt.Printf("%p serveSend -> select ...\n", nl)
select { select {
case <-nl.closed: case <-nl.closed:
break fmt.Printf("%p\t (send) -> closed\n", nl)
break runloop
case txreq := <-nl.txreq: case txreq := <-nl.txreq:
fmt.Printf("%p\t (send) -> txreq\n", nl)
err := nl.sendPkt(txreq.pkt) err := nl.sendPkt(txreq.pkt)
if err != nil { if err != nil {
// XXX also close whole nodeLink since tx framing now can be broken? // XXX also close whole nodeLink since tx framing now can be broken?
...@@ -300,12 +320,15 @@ func (nl *NodeLink) serveSend() { ...@@ -300,12 +320,15 @@ func (nl *NodeLink) serveSend() {
txreq.errch <- err txreq.errch <- err
} }
} }
fmt.Printf("%p\t (send) -> exit\n", nl)
} }
// XXX move to NodeLink ctor // XXX move to NodeLink ctor ?
// Set handler for new incoming connections // Set handler for new incoming connections
func (nl *NodeLink) HandleNewConn(h func(*Conn)) { func (nl *NodeLink) HandleNewConn(h func(*Conn)) {
// XXX locking nl.connMu.Lock()
defer nl.connMu.Unlock()
nl.handleNewConn = h // NOTE can change handler at runtime XXX do we need this? nl.handleNewConn = h // NOTE can change handler at runtime XXX do we need this?
} }
......
...@@ -88,7 +88,7 @@ func xwait(w interface { Wait() error }) { ...@@ -88,7 +88,7 @@ func xwait(w interface { Wait() error }) {
} }
// Prepare PktBuf with content // Prepare PktBuf with content
func mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf { func _mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf {
pkt := &PktBuf{make([]byte, PktHeadLen + len(payload))} pkt := &PktBuf{make([]byte, PktHeadLen + len(payload))}
h := pkt.Header() h := pkt.Header()
h.ConnId = hton32(connid) h.ConnId = hton32(connid)
...@@ -98,6 +98,11 @@ func mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf { ...@@ -98,6 +98,11 @@ func mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf {
return pkt return pkt
} }
func mkpkt(msgcode uint16, payload []byte) *PktBuf {
// in Conn exchange connid is automatically set by Conn.Send
return _mkpkt(0, msgcode, payload)
}
// Verify PktBuf is as expected // Verify PktBuf is as expected
func xverifyPkt(pkt *PktBuf, connid uint32, msgcode uint16, payload []byte) { func xverifyPkt(pkt *PktBuf, connid uint32, msgcode uint16, payload []byte) {
errv := xerr.Errorv{} errv := xerr.Errorv{}
...@@ -126,13 +131,16 @@ func tdelay() { ...@@ -126,13 +131,16 @@ func tdelay() {
time.Sleep(1*time.Millisecond) time.Sleep(1*time.Millisecond)
} }
// create NodeLinks connected via net.Pipe func _nodeLinkPipe(flags ConnRole) (nl1, nl2 *NodeLink) {
func nodeLinkPipe() (nl1, nl2 *NodeLink) {
node1, node2 := net.Pipe() node1, node2 := net.Pipe()
nl1 = NewNodeLink(node1, ConnClient) nl1 = NewNodeLink(node1, ConnClient | flags)
nl2 = NewNodeLink(node2, ConnServer) nl2 = NewNodeLink(node2, ConnServer | flags)
return nl1, nl2 return nl1, nl2
} }
// create NodeLinks connected via net.Pipe
func nodeLinkPipe() (nl1, nl2 *NodeLink) {
return _nodeLinkPipe(0)
}
func TestNodeLink(t *testing.T) { func TestNodeLink(t *testing.T) {
...@@ -141,6 +149,7 @@ func TestNodeLink(t *testing.T) { ...@@ -141,6 +149,7 @@ func TestNodeLink(t *testing.T) {
nl1, nl2 := nodeLinkPipe() nl1, nl2 := nodeLinkPipe()
// Close vs recvPkt // Close vs recvPkt
println("111")
wg := WorkGroup() wg := WorkGroup()
wg.Gox(func() { wg.Gox(func() {
tdelay() tdelay()
...@@ -153,6 +162,7 @@ func TestNodeLink(t *testing.T) { ...@@ -153,6 +162,7 @@ func TestNodeLink(t *testing.T) {
xwait(wg) xwait(wg)
// Close vs sendPkt // Close vs sendPkt
println("222")
wg = WorkGroup() wg = WorkGroup()
wg.Gox(func() { wg.Gox(func() {
tdelay() tdelay()
...@@ -166,12 +176,13 @@ func TestNodeLink(t *testing.T) { ...@@ -166,12 +176,13 @@ func TestNodeLink(t *testing.T) {
xwait(wg) xwait(wg)
// check raw exchange works // check raw exchange works
nl1, nl2 = nodeLinkPipe() println("333")
nl1, nl2 = _nodeLinkPipe(connNoRecvSend)
wg, ctx := WorkGroupCtx(context.Background()) wg, ctx := WorkGroupCtx(context.Background())
wg.Gox(func() { wg.Gox(func() {
// send ping; wait for pong // send ping; wait for pong
pkt := mkpkt(1, 2, []byte("ping")) pkt := _mkpkt(1, 2, []byte("ping"))
xsendPkt(nl1, pkt) xsendPkt(nl1, pkt)
pkt = xrecvPkt(nl1) pkt = xrecvPkt(nl1)
xverifyPkt(pkt, 3, 4, []byte("pong")) xverifyPkt(pkt, 3, 4, []byte("pong"))
...@@ -180,7 +191,7 @@ func TestNodeLink(t *testing.T) { ...@@ -180,7 +191,7 @@ func TestNodeLink(t *testing.T) {
// wait for ping; send pong // wait for ping; send pong
pkt = xrecvPkt(nl2) pkt = xrecvPkt(nl2)
xverifyPkt(pkt, 1, 2, []byte("ping")) xverifyPkt(pkt, 1, 2, []byte("ping"))
pkt = mkpkt(3, 4, []byte("pong")) pkt = _mkpkt(3, 4, []byte("pong"))
xsendPkt(nl2, pkt) xsendPkt(nl2, pkt)
}) })
...@@ -196,7 +207,8 @@ func TestNodeLink(t *testing.T) { ...@@ -196,7 +207,8 @@ func TestNodeLink(t *testing.T) {
xwait(wgclose) xwait(wgclose)
// test channels on top of nodelink // Test connections on top of nodelink
println("444")
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
// Close vs Recv // Close vs Recv
...@@ -250,19 +262,22 @@ func TestNodeLink(t *testing.T) { ...@@ -250,19 +262,22 @@ func TestNodeLink(t *testing.T) {
xclose(c12) xclose(c12)
xclose(nl2) // for completeness xclose(nl2) // for completeness
// Conn accept + exchange
/* nl1, nl2 = nodeLinkPipe()
nl2.HandleNewConn(func(c *Conn) { nl2.HandleNewConn(func(c *Conn) {
pkt := xrecv(c) // XXX t.Fatal() must be called from main goroutine -> context.Cancel ? // TODO raised err -> errch
// change pkt a bit (TODO) and send it back pkt := xrecv(c)
err = c.Send(pkt) // XXX err xverifyPkt(pkt, c.connId, 33, []byte("ping"))
c.Close() // XXX err
// change pkt a bit and send it back
xsend(c, mkpkt(34, []byte("pong")))
xclose(c)
}) })
c1.Send(pkt) // XXX err c1 := nl1.NewConn()
pkt2 := c1.Recv() // XXX err pkt = mkpkt(33, []byte("ping"))
// TODO check pkt2 is pkt1 + small modification xsend(c1, pkt)
pkt2 := xrecv(c1)
xverifyPkt(pkt2, c1.connId, 34, []byte("pong"))
// test 2 channels with replies comming in reversed time order // test 2 channels with replies comming in reversed time order
*/
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment