Commit 3af68694 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 8caeaceb
......@@ -15,6 +15,7 @@
package neo
import (
"errors"
"io"
"net"
"unsafe"
......@@ -49,6 +50,9 @@ type NodeLink struct {
handleNewConn func(conn *Conn) // handler for new connections XXX -> ConnHandler (a-la Handler in net/http) ?
// TODO peerLink .LocalAddr() vs .RemoteAddr() -> msgid even/odd ? (XXX vs NAT ?)
txreq chan txReq // tx requests from Conns go here
closed chan struct{}
}
// Conn is a connection established over NodeLink
......@@ -60,6 +64,8 @@ type NodeLink struct {
type Conn struct {
nodeLink *NodeLink
rxq chan *PktBuf
txerr chan error // transmit errors go back here
closed chan struct{}
}
// Buffer with packet data
......@@ -84,11 +90,22 @@ func NewNodeLink(c net.Conn) *NodeLink {
nl := NodeLink{
peerLink: c,
connTab: map[uint32]*Conn{},
txreq: make(chan txReq),
closed: make(chan struct{}),
}
// XXX run serveRecv() in a goroutine here?
// TODO go nl.serveRecv()
// TODO go nl.serveSend()
return &nl
}
// Close node-node link.
// IO on connections established over it is automatically interrupted with an error.
func (nl *NodeLink) Close() error {
// TODO adjust connTab & friends
close(nl.closed)
return nl.peerLink.Close()
}
// send raw packet to peer
func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
_, err := nl.peerLink.Write(pkt.Data)
......@@ -144,41 +161,40 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
return pkt, nil
}
// Close node-node link.
// IO on connections established over it is automatically interrupted with an error.
func (nl *NodeLink) Close() error {
// TODO adjust connTab & friends
return nl.peerLink.Close()
}
// Make a connection on top of node-node link
func (nl *NodeLink) NewConn() *Conn {
c := &Conn{nodeLink: nl, rxq: make(chan *PktBuf)}
c := &Conn{nodeLink: nl,
rxq: make(chan *PktBuf),
txerr: make(chan error),
closed: make(chan struct{}),
}
// XXX locking
nl.connTab[0] = c // FIXME 0 -> msgid; XXX also check not a duplicate
return c
}
// ServeRecv handles incoming packets routing them to either appropriate
// serveRecv handles incoming packets routing them to either appropriate
// already-established connection or to new serving goroutine.
// TODO vs cancel
// XXX someone has to run serveRecv in a goroutine - XXX user or we internally ?
func (nl *NodeLink) serveRecv() error {
func (nl *NodeLink) serveRecv() {
for {
// receive 1 packet
pkt, err := nl.recvPkt()
if err != nil {
// this might be just error on close - simply stop in such case
select {
case <-nl.closed:
// XXX check err actually what is on interrupt?
return
}
panic(err) // XXX err
}
// if we don't yet have connection established for pkt.Id spawn connection-serving goroutine
// if we don't yet have connection established for pkt.MsgId spawn connection-serving goroutine
// XXX connTab locking
conn := nl.connTab[ntoh32(pkt.Header().MsgId)]
if conn == nil {
if nl.handleNewConn == nil {
if nl.handleNewConn == nil { // TODO check != nil in ctor, not here
// we are not accepting incoming connections - ignore packet
// XXX also log?
continue
......@@ -195,6 +211,32 @@ func (nl *NodeLink) serveRecv() error {
}
}
// request to transmit a packet. Result error goes back to errch
type txReq struct {
pkt *PktBuf
errch chan error
}
// serveSend handles requests to transmit packets from client connections and
// serially executes them over associated node link.
func (nl *NodeLink) serveSend() {
for {
select {
case <-nl.closed:
return
case txreq := <-nl.txreq:
err := nl.sendPkt(txreq.pkt)
if err != nil {
// XXX also close whole nodeLink since tx framing now can be broken?
}
txreq.errch <- err
}
}
}
// XXX move to NodeLink ctor
// Set handler for new incoming connections
func (nl *NodeLink) HandleNewConn(h func(*Conn)) {
// XXX locking
......@@ -202,31 +244,45 @@ func (nl *NodeLink) HandleNewConn(h func(*Conn)) {
}
// ErrClosedConn is the error indicated for read/write operations on closed Conn
var ErrClosedConn = errors.New("read/write on closed connection")
// Send packet via connection
// XXX vs cancel
func (c *Conn) Send(pkt *PktBuf) error {
pkt.Header().MsgId = hton32(0) // TODO next msgid, or using same msgid as received
err := c.nodeLink.sendPkt(pkt)
return err // XXX do we need to adjust err ?
select {
case <-c.closed:
return ErrClosedConn
case c.nodeLink.txreq <- txReq{pkt, c.txerr}:
err := <-c.txerr
return err // XXX adjust err with c?
}
}
// Receive packet from connection
func (c *Conn) Recv() (*PktBuf, error) {
// TODO also select on closech
pkt, ok := <-c.rxq
if !ok {
return nil, io.EOF // XXX check erroring & other errors?
select {
case <-c.closed:
// XXX closed c.rxq might be just indicator for this
return nil, ErrClosedConn
case pkt, ok := <-c.rxq:
if !ok { // see ^^^
return nil, io.EOF // XXX check erroring & other errors?
}
return pkt, nil
}
return pkt, nil
}
// Close connection
// Any blocked Send() or Recv() will be unblocked and return error
// XXX vs cancel
// XXX Send() - if started - will first complete (not to break framing)
func (c *Conn) Close() error { // XXX do we need error here?
// TODO adjust c.nodeLink.connTab + more ?
// TODO interrupt Send/Recv
panic("TODO Conn.Close")
// XXX check for double close?
close(c.closed) // XXX better just close c.rxq + ??? for tx
return nil
}
......
......@@ -207,7 +207,7 @@ func TestNodeLink(t *testing.T) {
xclose(c)
})
pkt, err = c.Recv()
if !(pkt == nil && err == io.ErrClosedPipe) {
if !(pkt == nil && err == ErrClosedConn) {
t.Fatalf("Conn.Recv() after close: pkt = %v err = %v", pkt, err)
}
xwait(wg)
......@@ -221,11 +221,13 @@ func TestNodeLink(t *testing.T) {
})
pkt = &PktBuf{[]byte("data")}
err = c.Send(pkt)
if err != io.ErrClosedPipe {
if err != ErrClosedConn {
t.Fatalf("Conn.Send() after close: err = %v", err)
}
xwait(wg)
// TODO check NodeLink.Close -> aborts Conn.Send/Recv
/*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment