Commit fbe26454 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3af68694
...@@ -101,9 +101,12 @@ func NewNodeLink(c net.Conn) *NodeLink { ...@@ -101,9 +101,12 @@ func NewNodeLink(c net.Conn) *NodeLink {
// Close node-node link. // Close node-node link.
// IO on connections established over it is automatically interrupted with an error. // IO on connections established over it is automatically interrupted with an error.
func (nl *NodeLink) Close() error { func (nl *NodeLink) Close() error {
// TODO adjust connTab & friends
close(nl.closed) close(nl.closed)
return nl.peerLink.Close() err := nl.peerLink.Close()
// TODO close active Conns
// XXX wait for serve{Send,Recv} to complete
nl.wg.Wait()
} }
// send raw packet to peer // send raw packet to peer
...@@ -190,11 +193,12 @@ func (nl *NodeLink) serveRecv() { ...@@ -190,11 +193,12 @@ func (nl *NodeLink) serveRecv() {
panic(err) // XXX err panic(err) // XXX err
} }
// if we don't yet have connection established for pkt.MsgId spawn connection-serving goroutine // if we don't yet have connection established for pkt.MsgId -
// spawn connection-serving goroutine
// XXX connTab locking // XXX connTab locking
conn := nl.connTab[ntoh32(pkt.Header().MsgId)] conn := nl.connTab[ntoh32(pkt.Header().MsgId)]
if conn == nil { if conn == nil {
if nl.handleNewConn == nil { // TODO check != nil in ctor, not here if nl.handleNewConn == nil {
// we are not accepting incoming connections - ignore packet // we are not accepting incoming connections - ignore packet
// XXX also log? // XXX also log?
continue continue
...@@ -224,9 +228,10 @@ func (nl *NodeLink) serveSend() { ...@@ -224,9 +228,10 @@ func (nl *NodeLink) serveSend() {
for { for {
select { select {
case <-nl.closed: case <-nl.closed:
return break
case txreq := <-nl.txreq: case txreq := <-nl.txreq:
pkt.Header().MsgId = hton32(0) // TODO next msgid, or using same msgid as received
err := nl.sendPkt(txreq.pkt) err := nl.sendPkt(txreq.pkt)
if err != nil { if err != nil {
// XXX also close whole nodeLink since tx framing now can be broken? // XXX also close whole nodeLink since tx framing now can be broken?
...@@ -249,7 +254,6 @@ var ErrClosedConn = errors.New("read/write on closed connection") ...@@ -249,7 +254,6 @@ var ErrClosedConn = errors.New("read/write on closed connection")
// Send packet via connection // Send packet via connection
func (c *Conn) Send(pkt *PktBuf) error { func (c *Conn) Send(pkt *PktBuf) error {
pkt.Header().MsgId = hton32(0) // TODO next msgid, or using same msgid as received
select { select {
case <-c.closed: case <-c.closed:
return ErrClosedConn return ErrClosedConn
......
...@@ -226,7 +226,27 @@ func TestNodeLink(t *testing.T) { ...@@ -226,7 +226,27 @@ func TestNodeLink(t *testing.T) {
} }
xwait(wg) xwait(wg)
// TODO check NodeLink.Close -> aborts Conn.Send/Recv // NodeLink.Close vs Conn.Send/Recv
c11 := nl1.NewConn()
c12 := nl1.NewConn()
wg = WorkGroup()
wg.Gox(func() {
pkt, err := c11.Recv()
if !(pkt == nil && err == ErrClosedConn) {
exc.Raisef("Conn.Recv() after NodeLink.close: pkt = %v err = %v", pkt, err)
}
})
wg.Gox(func() {
pkt := &PktBuf{[]byte("data")}
err := c12.Send(pkt)
if err != ErrClosedConn {
exc.Raisef("Conn.Send() after close: err = %v", err)
}
})
tdelay()
xclose(nl1)
xwait(wg)
xclose(nl2) // for completeness
/* /*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment