Commit 21902468 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 599d17bc
...@@ -47,20 +47,15 @@ type NodeLink struct { ...@@ -47,20 +47,15 @@ type NodeLink struct {
peerLink net.Conn // raw conn to peer peerLink net.Conn // raw conn to peer
connMu sync.Mutex // TODO -> RW ? connMu sync.Mutex // TODO -> RW ?
connTab map[uint32]*Conn // connid -> connection associated with connid connTab map[uint32]*Conn // connId -> Conn associated with connId
nextConnId uint32 // next connId to use for Conn initiated by us nextConnId uint32 // next connId to use for Conn initiated by us
handleNewConn func(conn *Conn) // handler for new connections XXX -> ConnHandler (a-la Handler in net/http) ? handleNewConn func(conn *Conn) // handler for new connections XXX -> ConnHandler (a-la Handler in net/http) ?
txreq chan txReq // tx requests from Conns go here txreq chan txReq // tx requests from Conns go via here
// (received pkt go dispatched to connTab[connid].rxq)
closed chan struct{} closed chan struct{}
} }
// Conn is a connection established over NodeLink // Conn is a connection established over NodeLink
// //
// Data can be sent and received over it. // Data can be sent and received over it.
...@@ -70,8 +65,8 @@ type NodeLink struct { ...@@ -70,8 +65,8 @@ type NodeLink struct {
type Conn struct { type Conn struct {
nodeLink *NodeLink nodeLink *NodeLink
connId uint32 connId uint32
rxq chan *PktBuf rxq chan *PktBuf // received packets for this Conn go here
txerr chan error // transmit errors go back here txerr chan error // transmit errors for this Conn go back here
closed chan struct{} closed chan struct{}
} }
...@@ -110,9 +105,9 @@ func NewNodeLink(conn net.Conn, role ConnRole) *NodeLink { ...@@ -110,9 +105,9 @@ func NewNodeLink(conn net.Conn, role ConnRole) *NodeLink {
var nextConnId uint32 var nextConnId uint32
switch role { switch role {
case ConnServer: case ConnServer:
nextConnId = 0 nextConnId = 0 // all initiated by us connId will be even
case ConnClient: case ConnClient:
nextConnId = 1 nextConnId = 1 // ----//---- odd
default: default:
panic("invalid conn role") panic("invalid conn role")
} }
...@@ -134,17 +129,21 @@ func NewNodeLink(conn net.Conn, role ConnRole) *NodeLink { ...@@ -134,17 +129,21 @@ func NewNodeLink(conn net.Conn, role ConnRole) *NodeLink {
func (nl *NodeLink) Close() error { func (nl *NodeLink) Close() error {
close(nl.closed) close(nl.closed)
err := nl.peerLink.Close() err := nl.peerLink.Close()
// TODO wait for serve{Send,Recv} to complete
//nl.wg.Wait()
// close active Conns // close active Conns
nl.connMu.Lock() nl.connMu.Lock()
defer nl.connMu.Unlock() defer nl.connMu.Unlock()
for _, conn := range nl.connTab { for _, conn := range nl.connTab {
// FIXME it also wants to lock conntab -> conn.close() ? // FIXME it also wants to lock conntab -> conn.close() ?
println("conn", conn.connId, " -> closing ...") println("conn", conn.connId, " -> closing ...")
conn.Close() // XXX err // XXX only interrupt, not close? Or allow multiple conn.Close() ?
conn.close() // XXX err -> errv
} }
nl.connTab = nil // XXX ok? vs panic on NewConn after close ? nl.connTab = nil // XXX ok? vs panic on NewConn after close ?
// XXX wait for serve{Send,Recv} to complete
//nl.wg.Wait()
return err return err
} }
...@@ -203,18 +202,25 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -203,18 +202,25 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
return pkt, nil return pkt, nil
} }
// Make a connection on top of node-node link
func (nl *NodeLink) NewConn() *Conn { // worker for NewConn() & friends. Must be called with connMu held.
func (nl *NodeLink) newConn(connId uint32) *Conn {
c := &Conn{nodeLink: nl, c := &Conn{nodeLink: nl,
connId: connId,
rxq: make(chan *PktBuf), rxq: make(chan *PktBuf),
txerr: make(chan error), txerr: make(chan error),
closed: make(chan struct{}), closed: make(chan struct{}),
} }
nl.connTab[connId] = c
return c
}
// Create a connection on top of node-node link
func (nl *NodeLink) NewConn() *Conn {
nl.connMu.Lock() nl.connMu.Lock()
defer nl.connMu.Unlock() defer nl.connMu.Unlock()
c.connId = nl.nextConnId c := nl.newConn(nl.nextConnId)
nl.nextConnId += 2 nl.nextConnId += 2
nl.connTab[c.connId] = c
return c return c
} }
...@@ -235,20 +241,29 @@ func (nl *NodeLink) serveRecv() { ...@@ -235,20 +241,29 @@ func (nl *NodeLink) serveRecv() {
panic(err) // XXX err panic(err) // XXX err
} }
// if we don't yet have connection established for pkt.ConnId - // pkt.ConnId -> Conn
// spawn connection-serving goroutine connId := ntoh32(pkt.Header().ConnId)
// XXX connTab locking accept := false
conn := nl.connTab[ntoh32(pkt.Header().ConnId)]
nl.connMu.Lock()
conn := nl.connTab[connId]
if conn == nil && nl.handleNewConn != nil {
conn = nl.newConn(connId)
accept = true
}
nl.connMu.Unlock()
// we have not accepted incoming connection - ignore packet
if conn == nil { if conn == nil {
if nl.handleNewConn == nil { // XXX also log?
// we are not accepting incoming connections - ignore packet continue
// XXX also log? }
continue
}
conn = nl.NewConn() // we are accepting new incoming connection - spawn
// connection-serving goroutine
if accept {
// TODO avoid spawning goroutine for each new Ask request - // TODO avoid spawning goroutine for each new Ask request -
// - by keeping pool of read inactive goroutine / conn pool // - by keeping pool of read inactive goroutine / conn pool ?
go nl.handleNewConn(conn) go nl.handleNewConn(conn)
} }
...@@ -295,8 +310,7 @@ var ErrClosedConn = errors.New("read/write on closed connection") ...@@ -295,8 +310,7 @@ var ErrClosedConn = errors.New("read/write on closed connection")
// Send packet via connection // Send packet via connection
func (c *Conn) Send(pkt *PktBuf) error { func (c *Conn) Send(pkt *PktBuf) error {
// set pkt connid associated with this connection // set pkt connId associated with this connection
// TODO for new Conn - it should be set by serveSend ?
pkt.Header().ConnId = hton32(c.connId) pkt.Header().ConnId = hton32(c.connId)
select { select {
...@@ -324,13 +338,20 @@ func (c *Conn) Recv() (*PktBuf, error) { ...@@ -324,13 +338,20 @@ func (c *Conn) Recv() (*PktBuf, error) {
} }
} }
// worker for Close() & co
func (c *Conn) close() {
close(c.closed) // XXX better just close c.rxq + ??? for tx
}
// Close connection // Close connection
// Any blocked Send() or Recv() will be unblocked and return error // Any blocked Send() or Recv() will be unblocked and return error
// XXX Send() - if started - will first complete (not to break framing) // XXX Send() - if started - will first complete (not to break framing)
func (c *Conn) Close() error { // XXX do we need error here? func (c *Conn) Close() error { // XXX do we need error here?
// TODO adjust c.nodeLink.connTab + more ? // adjust nodeLink.connTab
// XXX check for double close? c.nodeLink.connMu.Lock()
close(c.closed) // XXX better just close c.rxq + ??? for tx delete(c.nodeLink.connTab, c.connId)
c.nodeLink.connMu.Unlock()
c.close()
return nil return nil
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment