Commit f9293ea7 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d129e07f
...@@ -21,7 +21,7 @@ import ( ...@@ -21,7 +21,7 @@ import (
"sync" "sync"
"unsafe" "unsafe"
"fmt" //"fmt"
) )
// NodeLink is a node-node link in NEO // NodeLink is a node-node link in NEO
...@@ -34,7 +34,7 @@ import ( ...@@ -34,7 +34,7 @@ import (
// New connection can be created with .NewConn() . Once connection is // New connection can be created with .NewConn() . Once connection is
// created and data is sent over it, on peer's side another corresponding // created and data is sent over it, on peer's side another corresponding
// new connection will be created - accepting first packet "request" - and all // new connection will be created - accepting first packet "request" - and all
// further communication send/receive exchange will be happenning in between // further communication send/receive exchange will be happening in between
// those 2 connections. // those 2 connections.
// //
// For a node to be able to accept new incoming connection it has to register // For a node to be able to accept new incoming connection it has to register
...@@ -108,7 +108,7 @@ const ( ...@@ -108,7 +108,7 @@ const (
// Make a new NodeLink from already established net.Conn // Make a new NodeLink from already established net.Conn
// //
// role specifies how to treat conn - either as server or client one. // role specifies how to treat conn - either as server or client one.
// The differrence in between client and server roles are in connid % 2 XXX text // The difference in between client and server roles are in connid % 2 XXX text
// //
// Usually server role should be used for connections created via // Usually server role should be used for connections created via
// net.Listen/net.Accept and client role for connections created via net.Dial. // net.Listen/net.Accept and client role for connections created via net.Dial.
...@@ -141,21 +141,24 @@ func NewNodeLink(conn net.Conn, role ConnRole) *NodeLink { ...@@ -141,21 +141,24 @@ func NewNodeLink(conn net.Conn, role ConnRole) *NodeLink {
// Close node-node link. // Close node-node link.
// IO on connections established over it is automatically interrupted with an error. // IO on connections established over it is automatically interrupted with an error.
func (nl *NodeLink) Close() error { func (nl *NodeLink) Close() error {
close(nl.closed) // mark all active Conns as closed
err := nl.peerLink.Close()
// wait for serve{Send,Recv} to complete
fmt.Printf("%p serveWg.Wait ...\n", nl)
nl.serveWg.Wait()
fmt.Printf("%p\t (wait) -> woken up\n", nl)
// close active Conns
nl.connMu.Lock() nl.connMu.Lock()
defer nl.connMu.Unlock() defer nl.connMu.Unlock()
for _, conn := range nl.connTab { for _, conn := range nl.connTab {
conn.close() conn.close()
} }
nl.connTab = nil // clear + mark closed nl.connTab = nil // clear + mark closed
// close actual link to peer
// this will wakeup serve{Send,Recv}
close(nl.closed)
err := nl.peerLink.Close()
// wait for serve{Send,Recv} to complete
//fmt.Printf("%p serveWg.Wait ...\n", nl)
nl.serveWg.Wait()
//fmt.Printf("%p\t (wait) -> woken up\n", nl)
return err return err
} }
...@@ -246,9 +249,9 @@ func (nl *NodeLink) serveRecv() { ...@@ -246,9 +249,9 @@ func (nl *NodeLink) serveRecv() {
defer nl.serveWg.Done() defer nl.serveWg.Done()
for { for {
// receive 1 packet // receive 1 packet
println(nl, "serveRecv -> recv...") //println(nl, "serveRecv -> recv...")
pkt, err := nl.recvPkt() pkt, err := nl.recvPkt()
fmt.Printf("%p\t (recv) -> %v\n", nl, err) //fmt.Printf("%p\t (recv) -> %v\n", nl, err)
if err != nil { if err != nil {
// this might be just error on close - simply stop in such case // this might be just error on close - simply stop in such case
select { select {
...@@ -305,16 +308,16 @@ func (nl *NodeLink) serveSend() { ...@@ -305,16 +308,16 @@ func (nl *NodeLink) serveSend() {
defer nl.serveWg.Done() defer nl.serveWg.Done()
runloop: runloop:
for { for {
fmt.Printf("%p serveSend -> select ...\n", nl) //fmt.Printf("%p serveSend -> select ...\n", nl)
select { select {
case <-nl.closed: case <-nl.closed:
fmt.Printf("%p\t (send) -> closed\n", nl) //fmt.Printf("%p\t (send) -> closed\n", nl)
break runloop break runloop
case txreq := <-nl.txreq: case txreq := <-nl.txreq:
fmt.Printf("%p\t (send) -> txreq\n", nl) //fmt.Printf("%p\t (send) -> txreq\n", nl)
err := nl.sendPkt(txreq.pkt) err := nl.sendPkt(txreq.pkt)
fmt.Printf("%p\t (send) -> err: %v\n", nl, err) //fmt.Printf("%p\t (send) -> err: %v\n", nl, err)
if err != nil { if err != nil {
// XXX also close whole nodeLink since tx framing now can be broken? // XXX also close whole nodeLink since tx framing now can be broken?
} }
...@@ -322,7 +325,7 @@ runloop: ...@@ -322,7 +325,7 @@ runloop:
} }
} }
fmt.Printf("%p\t (send) -> exit\n", nl) //fmt.Printf("%p\t (send) -> exit\n", nl)
} }
// XXX move to NodeLink ctor ? // XXX move to NodeLink ctor ?
...@@ -351,7 +354,7 @@ func (c *Conn) Send(pkt *PktBuf) error { ...@@ -351,7 +354,7 @@ func (c *Conn) Send(pkt *PktBuf) error {
select { select {
// tx request was sent to serveSend and is being transmitted on the wire. // tx request was sent to serveSend and is being transmitted on the wire.
// the transmission may block for indefinitely long though and // the transmission may block for indefinitely long though and
// we cannot interrupt it as the only way to interrup is // we cannot interrupt it as the only way to interrupt is
// .nodeLink.Close() which will close all other Conns. // .nodeLink.Close() which will close all other Conns.
// //
// That's why we are also checking for c.closed while waiting // That's why we are also checking for c.closed while waiting
...@@ -366,9 +369,12 @@ func (c *Conn) Send(pkt *PktBuf) error { ...@@ -366,9 +369,12 @@ func (c *Conn) Send(pkt *PktBuf) error {
} }
} }
// if we got transmission error maybe it was due to underlying NodeLink // if we got transmission error chances are it was due to underlying NodeLink
// being closed. If our Conn was also requested to be closed adjust err // being closed. If our Conn was also requested to be closed adjust err
// to ErrClosedConn along the way. // to ErrClosedConn along the way.
//
// ( reaching here is theoretically possible if both c.closed and
// c.txerr are ready above )
if err != nil { if err != nil {
select { select {
case <-c.closed: case <-c.closed:
...@@ -398,7 +404,7 @@ func (c *Conn) Recv() (*PktBuf, error) { ...@@ -398,7 +404,7 @@ func (c *Conn) Recv() (*PktBuf, error) {
// worker for Close() & co // worker for Close() & co
func (c *Conn) close() { func (c *Conn) close() {
c.closeOnce.Do(func() { c.closeOnce.Do(func() {
fmt.Printf("%p Conn.close\n", c) //fmt.Printf("%p Conn.close\n", c)
close(c.closed) // XXX better just close c.rxq + ??? for tx close(c.closed) // XXX better just close c.rxq + ??? for tx
}) })
} }
......
...@@ -238,8 +238,6 @@ func TestNodeLink(t *testing.T) { ...@@ -238,8 +238,6 @@ func TestNodeLink(t *testing.T) {
} }
xwait(wg) xwait(wg)
return
// NodeLink.Close vs Conn.Send/Recv // NodeLink.Close vs Conn.Send/Recv
c11 := nl1.NewConn() c11 := nl1.NewConn()
c12 := nl1.NewConn() c12 := nl1.NewConn()
...@@ -281,5 +279,9 @@ func TestNodeLink(t *testing.T) { ...@@ -281,5 +279,9 @@ func TestNodeLink(t *testing.T) {
pkt2 := xrecv(c1) pkt2 := xrecv(c1)
xverifyPkt(pkt2, c1.connId, 34, []byte("pong")) xverifyPkt(pkt2, c1.connId, 34, []byte("pong"))
xclose(c1)
xclose(nl1)
xclose(nl2)
// test 2 channels with replies comming in reversed time order // test 2 channels with replies comming in reversed time order
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment