Commit c818ce8a authored by Kirill Smelkov's avatar Kirill Smelkov

X fixup/polish after NodeLink.Accept rework

parent 65b17bdc
...@@ -2,17 +2,21 @@ ...@@ -2,17 +2,21 @@
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 2, or (at your // it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation. // option) any later version, as published by the Free Software Foundation.
// //
// You can also Link and Combine this program with other software covered by
// the terms of any of the Open Source Initiative approved licenses and Convey
// the resulting work. Corresponding source of such a combination shall include
// the source code for all other software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied // This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// //
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// NEO. Connection management
package neo package neo
// Connection management
import ( import (
"context" "context"
...@@ -33,14 +37,11 @@ import ( ...@@ -33,14 +37,11 @@ import (
// //
// New connection can be created with .NewConn() . Once connection is // New connection can be created with .NewConn() . Once connection is
// created and data is sent over it, on peer's side another corresponding // created and data is sent over it, on peer's side another corresponding
// new connection will be created - accepting first packet "request" - and all XXX -> Accept // new connection can be accepted via .Accept(), and all further communication
// further communication send/receive exchange will be happening in between // send/receive exchange will be happening in between those 2 connections.
// those 2 connections.
// //
// For a node to be able to accept new incoming connection it has to register XXX -> Accept // For a node to be able to accept new incoming connection it has to have
// corresponding handler with .HandleNewConn() . Without such handler // "server" role - see NewNodeLink() for details.
// registered the node will be able to only initiate new connections, not
// accept new ones from its peer.
// //
// A NodeLink has to be explicitly closed, once it is no longer needed. // A NodeLink has to be explicitly closed, once it is no longer needed.
// //
...@@ -53,9 +54,6 @@ type NodeLink struct { ...@@ -53,9 +54,6 @@ type NodeLink struct {
nextConnId uint32 // next connId to use for Conn initiated by us nextConnId uint32 // next connId to use for Conn initiated by us
serveWg sync.WaitGroup // for serve{Send,Recv} serveWg sync.WaitGroup // for serve{Send,Recv}
// handleWg sync.WaitGroup // for spawned handlers
// handleNewConn func(conn *Conn) // handler for new connections
acceptq chan *Conn // queue of incoming connections for Accept acceptq chan *Conn // queue of incoming connections for Accept
// = nil if NodeLink is not accepting connections // = nil if NodeLink is not accepting connections
...@@ -95,11 +93,14 @@ const ( ...@@ -95,11 +93,14 @@ const (
// NewNodeLink makes a new NodeLink from already established net.Conn // NewNodeLink makes a new NodeLink from already established net.Conn
// //
// Role specifies how to treat our role on the link - either as client or // Role specifies how to treat our role on the link - either as client or
// server one. The difference in between client and server roles is only in XXX + acceptq // server one. The difference in between client and server roles are in:
// how connection ids are allocated for connections initiated at our side: //
// 1. how connection ids are allocated for connections initiated at our side:
// there is no conflict in identifiers if one side always allocates them as // there is no conflict in identifiers if one side always allocates them as
// even (server) and its peer as odd (client). // even (server) and its peer as odd (client).
// //
// 2. NodeLink.Accept() works only on server side.
//
// Usually server role should be used for connections created via // Usually server role should be used for connections created via
// net.Listen/net.Accept and client role for connections created via net.Dial. // net.Listen/net.Accept and client role for connections created via net.Dial.
func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink { func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink {
...@@ -135,8 +136,6 @@ func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink { ...@@ -135,8 +136,6 @@ func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink {
// Close closes node-node link. // Close closes node-node link.
// IO on connections established over it is automatically interrupted with an error. // IO on connections established over it is automatically interrupted with an error.
func (nl *NodeLink) Close() error { func (nl *NodeLink) Close() error {
// XXX what with .acceptq ?
// mark all active Conns as closed // mark all active Conns as closed
nl.connMu.Lock() nl.connMu.Lock()
defer nl.connMu.Unlock() defer nl.connMu.Unlock()
...@@ -153,9 +152,6 @@ func (nl *NodeLink) Close() error { ...@@ -153,9 +152,6 @@ func (nl *NodeLink) Close() error {
// wait for serve{Send,Recv} to complete // wait for serve{Send,Recv} to complete
nl.serveWg.Wait() nl.serveWg.Wait()
// XXX do we want to also Wait for handlers here?
// (problem is peerLink is closed first so this might cause handlers to see errors)
return err return err
} }
...@@ -233,7 +229,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -233,7 +229,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
func (nl *NodeLink) newConn(connId uint32) *Conn { func (nl *NodeLink) newConn(connId uint32) *Conn {
c := &Conn{nodeLink: nl, c := &Conn{nodeLink: nl,
connId: connId, connId: connId,
rxq: make(chan *PktBuf), rxq: make(chan *PktBuf), // TODO buffering
txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
closed: make(chan struct{}), closed: make(chan struct{}),
} }
...@@ -241,7 +237,7 @@ func (nl *NodeLink) newConn(connId uint32) *Conn { ...@@ -241,7 +237,7 @@ func (nl *NodeLink) newConn(connId uint32) *Conn {
return c return c
} }
// NewConn creates a connection on top of node-node link // NewConn creates new connection on top of node-node link
func (nl *NodeLink) NewConn() *Conn { func (nl *NodeLink) NewConn() *Conn {
nl.connMu.Lock() nl.connMu.Lock()
defer nl.connMu.Unlock() defer nl.connMu.Unlock()
...@@ -253,7 +249,7 @@ func (nl *NodeLink) NewConn() *Conn { ...@@ -253,7 +249,7 @@ func (nl *NodeLink) NewConn() *Conn {
return c return c
} }
// ErrClosedLink is the error indicated for opertions on closed NodeLink // ErrLinkClosed is the error indicated for operations on closed NodeLink
var ErrLinkClosed = errors.New("node link closed") var ErrLinkClosed = errors.New("node link closed")
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections") var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")
...@@ -268,7 +264,6 @@ func (nl *NodeLink) Accept() (*Conn, error) { ...@@ -268,7 +264,6 @@ func (nl *NodeLink) Accept() (*Conn, error) {
case <-nl.closed: case <-nl.closed:
return nil, ErrLinkClosed // XXX + op = Accept ? return nil, ErrLinkClosed // XXX + op = Accept ?
// XXX check acceptq != nil ?
case c := <-nl.acceptq: case c := <-nl.acceptq:
return c, nil return c, nil
} }
...@@ -294,16 +289,15 @@ func (nl *NodeLink) serveRecv() { ...@@ -294,16 +289,15 @@ func (nl *NodeLink) serveRecv() {
// pkt.ConnId -> Conn // pkt.ConnId -> Conn
connId := ntoh32(pkt.Header().ConnId) connId := ntoh32(pkt.Header().ConnId)
// var handleNewConn func(conn *Conn)
nl.connMu.Lock() nl.connMu.Lock()
conn := nl.connTab[connId] conn := nl.connTab[connId]
if conn == nil { if conn == nil {
//handleNewConn = nl.handleNewConn // XXX -> Accept
//if handleNewConn != nil {
if nl.acceptq != nil { if nl.acceptq != nil {
// we are accepting new incoming connection
conn = nl.newConn(connId) conn = nl.newConn(connId)
// XXX what if Accept exited because of just recently close(nl.closed)? // XXX what if Accept exited because of just recently close(nl.closed)?
// -> check nl.closed here too ?
nl.acceptq <- conn nl.acceptq <- conn
} }
} }
...@@ -315,32 +309,13 @@ func (nl *NodeLink) serveRecv() { ...@@ -315,32 +309,13 @@ func (nl *NodeLink) serveRecv() {
continue continue
} }
// // XXX -> accept
// // we are accepting new incoming connection - spawn
// // connection-serving goroutine
// if handleNewConn != nil {
// // TODO avoid spawning goroutine for each new Ask request -
// // - by keeping pool of read inactive goroutine / conn pool ?
// // XXX rework interface for this to be Accept-like ?
// go func() {
// nl.handleWg.Add(1)
// defer nl.handleWg.Done()
// handleNewConn(conn)
// }()
// }
// route packet to serving goroutine handler // route packet to serving goroutine handler
// XXX what if Conn.Recv exited because of just recently close(nl.closed) ? // XXX what if Conn.Recv exited because of just recently close(nl.closed) ?
// -> check nl.closed here too ?
conn.rxq <- pkt conn.rxq <- pkt
} }
} }
// // wait for all handlers spawned for accepted connections to complete
// // XXX naming -> WaitHandlers ?
// func (nl *NodeLink) Wait() {
// nl.handleWg.Wait()
// }
// request to transmit a packet. Result error goes back to errch // request to transmit a packet. Result error goes back to errch
type txReq struct { type txReq struct {
...@@ -369,14 +344,6 @@ runloop: ...@@ -369,14 +344,6 @@ runloop:
} }
} }
// // XXX move to NodeLink ctor ?
// // Set handler for new incoming connections
// func (nl *NodeLink) HandleNewConn(h func(*Conn)) {
// nl.connMu.Lock()
// defer nl.connMu.Unlock()
// nl.handleNewConn = h // NOTE can change handler at runtime XXX do we need this?
// }
// ErrClosedConn is the error indicated for read/write operations on closed Conn // ErrClosedConn is the error indicated for read/write operations on closed Conn
var ErrClosedConn = errors.New("read/write on closed connection") var ErrClosedConn = errors.New("read/write on closed connection")
...@@ -460,7 +427,7 @@ func (c *Conn) Close() error { ...@@ -460,7 +427,7 @@ func (c *Conn) Close() error {
} }
// for convinience: Dial/Listen // for convenience: Dial/Listen
// Dial connects to address on named network and wrap the connection as NodeLink // Dial connects to address on named network and wrap the connection as NodeLink
// TODO +tls.Config // TODO +tls.Config
......
...@@ -2,17 +2,21 @@ ...@@ -2,17 +2,21 @@
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 2, or (at your // it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation. // option) any later version, as published by the Free Software Foundation.
// //
// You can also Link and Combine this program with other software covered by
// the terms of any of the Open Source Initiative approved licenses and Convey
// the resulting work. Corresponding source of such a combination shall include
// the source code for all other software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied // This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// //
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// NEO. Connection management. Tests
package neo package neo
// Connection management. Tests
import ( import (
"bytes" "bytes"
...@@ -56,6 +60,12 @@ func xclose(c io.Closer) { ...@@ -56,6 +60,12 @@ func xclose(c io.Closer) {
exc.Raiseif(err) exc.Raiseif(err)
} }
func xaccept(nl *NodeLink) *Conn {
c, err := nl.Accept()
exc.Raiseif(err)
return c
}
func xsend(c *Conn, pkt *PktBuf) { func xsend(c *Conn, pkt *PktBuf) {
err := c.Send(pkt) err := c.Send(pkt)
exc.Raiseif(err) exc.Raiseif(err)
...@@ -287,10 +297,8 @@ func TestNodeLink(t *testing.T) { ...@@ -287,10 +297,8 @@ func TestNodeLink(t *testing.T) {
// Conn accept + exchange // Conn accept + exchange
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
wg = WorkGroup() wg = WorkGroup()
//nl2.HandleNewConn(func(c *Conn) {
wg.Gox(func() { wg.Gox(func() {
c, err := nl2.Accept() // XXX -> xaccept ? c := xaccept(nl2)
exc.Raiseif(err)
pkt := xrecv(c) pkt := xrecv(c)
xverifyPkt(pkt, c.connId, 33, []byte("ping")) xverifyPkt(pkt, c.connId, 33, []byte("ping"))
...@@ -312,14 +320,13 @@ func TestNodeLink(t *testing.T) { ...@@ -312,14 +320,13 @@ func TestNodeLink(t *testing.T) {
xsend(c, mkpkt(35, []byte("ping2"))) xsend(c, mkpkt(35, []byte("ping2")))
pkt = xrecv(c) pkt = xrecv(c)
xverifyPkt(pkt, c.connId, 36, []byte("pong2")) xverifyPkt(pkt, c.connId, 36, []byte("pong2"))
//nl2.Wait()
xwait(wg) xwait(wg)
xclose(c) xclose(c)
xclose(nl1) xclose(nl1)
xclose(nl2) xclose(nl2)
// test 2 channels with replies comming in reversed time order // test 2 channels with replies coming in reversed time order
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
wg = WorkGroup() wg = WorkGroup()
replyOrder := map[uint16]struct { // "order" in which to process requests replyOrder := map[uint16]struct { // "order" in which to process requests
...@@ -331,11 +338,9 @@ func TestNodeLink(t *testing.T) { ...@@ -331,11 +338,9 @@ func TestNodeLink(t *testing.T) {
} }
close(replyOrder[2].start) close(replyOrder[2].start)
//nl2.HandleNewConn(func(c *Conn) {
wg.Gox(func() { wg.Gox(func() {
for _ = range replyOrder { for _ = range replyOrder {
c, err := nl2.Accept() c := xaccept(nl2)
exc.Raiseif(err)
wg.Gox(func() { wg.Gox(func() {
pkt := xrecv(c) pkt := xrecv(c)
...@@ -368,7 +373,6 @@ func TestNodeLink(t *testing.T) { ...@@ -368,7 +373,6 @@ func TestNodeLink(t *testing.T) {
} }
xechoWait(c2, 2) xechoWait(c2, 2)
xechoWait(c1, 1) xechoWait(c1, 1)
//nl2.Wait()
xwait(wg) xwait(wg)
xclose(c1) xclose(c1)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment