Commit d68e25a6 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d4be13da
...@@ -19,7 +19,6 @@ import ( ...@@ -19,7 +19,6 @@ import (
"io" "io"
"net" "net"
"sync" "sync"
"unsafe"
) )
// NodeLink is a node-node link in NEO // NodeLink is a node-node link in NEO
...@@ -79,46 +78,48 @@ type Conn struct { ...@@ -79,46 +78,48 @@ type Conn struct {
// A role our end of NodeLink is intended to play // A role our end of NodeLink is intended to play
type LinkRole int type LinkRole int
const ( const (
LinkServer ConnRole = iota // link created as server LinkServer LinkRole = iota // link created as server
LinkClient // link created as client LinkClient // link created as client
// for testing: // for testing:
linkNoRecvSend ConnRole = 1<<16 // do not spawn serveRecv & serveSend linkNoRecvSend LinkRole = 1<<16 // do not spawn serveRecv & serveSend
linkFlagsMask ConnRole = (1<<32 - 1) << 16 linkFlagsMask LinkRole = (1<<32 - 1) << 16
) )
// Make a new NodeLink from already established net.Conn // Make a new NodeLink from already established net.Conn
// //
// role specifies how to treat conn - either as server or client one. // Role specifies how to treat our role on the link - either as client or
// The difference in between client and server roles is only in how connection // server one. The difference in between client and server roles is only in
// ids are allocated for connections initiated at our side: there is no overlap in identifiers if one side always allocates them as even and its peer as odd. in connId % 2 XXX text // how connection ids are allocated for connections initiated at our side:
// there is no conflict in identifiers if one side always allocates them as
// even (server) and its peer as odd (client).
// //
// Usually server role should be used for connections created via // Usually server role should be used for connections created via
// net.Listen/net.Accept and client role for connections created via net.Dial. // net.Listen/net.Accept and client role for connections created via net.Dial.
func NewNodeLink(conn net.Conn, role ConnRole) *NodeLink { func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink {
var nextConnId uint32 var nextConnId uint32
switch role&^connFlagsMask { switch role&^linkFlagsMask {
case ConnServer: case LinkServer:
nextConnId = 0 // all initiated by us connId will be even nextConnId = 0 // all initiated by us connId will be even
case ConnClient: case LinkClient:
nextConnId = 1 // ----//---- odd nextConnId = 1 // ----//---- odd
default: default:
panic("invalid conn role") panic("invalid conn role")
} }
nl := NodeLink{ nl := &NodeLink{
peerLink: conn, peerLink: conn,
connTab: map[uint32]*Conn{}, connTab: map[uint32]*Conn{},
nextConnId: nextConnId, nextConnId: nextConnId,
txreq: make(chan txReq), txreq: make(chan txReq),
closed: make(chan struct{}), closed: make(chan struct{}),
} }
if role&connNoRecvSend == 0 { if role&linkNoRecvSend == 0 {
nl.serveWg.Add(2) nl.serveWg.Add(2)
go nl.serveRecv() go nl.serveRecv()
go nl.serveSend() go nl.serveSend()
} }
return &nl return nl
} }
// Close node-node link. // Close node-node link.
...@@ -138,9 +139,7 @@ func (nl *NodeLink) Close() error { ...@@ -138,9 +139,7 @@ func (nl *NodeLink) Close() error {
err := nl.peerLink.Close() err := nl.peerLink.Close()
// wait for serve{Send,Recv} to complete // wait for serve{Send,Recv} to complete
//fmt.Printf("%p serveWg.Wait ...\n", nl)
nl.serveWg.Wait() nl.serveWg.Wait()
//fmt.Printf("%p\t (wait) -> woken up\n", nl)
// XXX do we want to also Wait for handlers here? // XXX do we want to also Wait for handlers here?
// (problem is peerLink is closed first so this might cause handlers to see errors) // (problem is peerLink is closed first so this might cause handlers to see errors)
...@@ -174,15 +173,16 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -174,15 +173,16 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
if err != nil { if err != nil {
return nil, err // XXX err adjust ? return nil, err // XXX err adjust ?
} }
//println("read head:", n)
pkth := pkt.Header() pkth := pkt.Header()
// XXX -> better PktHeader.Decode() ? // XXX -> better PktHeader.Decode() ?
if ntoh32(pkth.Len) < PktHeadLen { if ntoh32(pkth.Len) < PktHeadLen {
// TODO err + close nodelink (framing broken)
panic("TODO pkt.Len < PktHeadLen") // XXX err (length is a whole packet len with header) panic("TODO pkt.Len < PktHeadLen") // XXX err (length is a whole packet len with header)
} }
if ntoh32(pkth.Len) > MAX_PACKET_SIZE { if ntoh32(pkth.Len) > MAX_PACKET_SIZE {
// TODO err + close nodelink (framing broken) (?)
panic("TODO message too big") // XXX err panic("TODO message too big") // XXX err
} }
...@@ -201,7 +201,6 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -201,7 +201,6 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
if err != nil { if err != nil {
return nil, err // XXX err adjust ? return nil, err // XXX err adjust ?
} }
//println("read data:", len(pkt.Data)-n)
} }
return pkt, nil return pkt, nil
...@@ -234,14 +233,12 @@ func (nl *NodeLink) NewConn() *Conn { ...@@ -234,14 +233,12 @@ func (nl *NodeLink) NewConn() *Conn {
// serveRecv handles incoming packets routing them to either appropriate // serveRecv handles incoming packets routing them to either appropriate
// already-established connection or to new serving goroutine. // already-established connection or to new handling goroutine.
func (nl *NodeLink) serveRecv() { func (nl *NodeLink) serveRecv() {
defer nl.serveWg.Done() defer nl.serveWg.Done()
for { for {
// receive 1 packet // receive 1 packet
//println(nl, "serveRecv -> recv...")
pkt, err := nl.recvPkt() pkt, err := nl.recvPkt()
//fmt.Printf("%p\t (recv) -> %v\n", nl, err)
if err != nil { if err != nil {
// this might be just error on close - simply stop in such case // this might be just error on close - simply stop in such case
select { select {
...@@ -254,7 +251,6 @@ func (nl *NodeLink) serveRecv() { ...@@ -254,7 +251,6 @@ func (nl *NodeLink) serveRecv() {
// pkt.ConnId -> Conn // pkt.ConnId -> Conn
connId := ntoh32(pkt.Header().ConnId) connId := ntoh32(pkt.Header().ConnId)
//fmt.Printf("%p\t (recv) -> connId: %v\n", nl, connId)
var handleNewConn func(conn *Conn) var handleNewConn func(conn *Conn)
nl.connMu.Lock() nl.connMu.Lock()
...@@ -278,7 +274,6 @@ func (nl *NodeLink) serveRecv() { ...@@ -278,7 +274,6 @@ func (nl *NodeLink) serveRecv() {
if handleNewConn != nil { if handleNewConn != nil {
// TODO avoid spawning goroutine for each new Ask request - // TODO avoid spawning goroutine for each new Ask request -
// - by keeping pool of read inactive goroutine / conn pool ? // - by keeping pool of read inactive goroutine / conn pool ?
//println("+ handle", connId)
go func() { go func() {
nl.handleWg.Add(1) nl.handleWg.Add(1)
defer nl.handleWg.Done() defer nl.handleWg.Done()
...@@ -310,24 +305,19 @@ func (nl *NodeLink) serveSend() { ...@@ -310,24 +305,19 @@ func (nl *NodeLink) serveSend() {
defer nl.serveWg.Done() defer nl.serveWg.Done()
runloop: runloop:
for { for {
//fmt.Printf("%p serveSend -> select ...\n", nl)
select { select {
case <-nl.closed: case <-nl.closed:
//fmt.Printf("%p\t (send) -> closed\n", nl)
break runloop break runloop
case txreq := <-nl.txreq: case txreq := <-nl.txreq:
//fmt.Printf("%p\t (send) -> txreq\n", nl)
err := nl.sendPkt(txreq.pkt) err := nl.sendPkt(txreq.pkt)
//fmt.Printf("%p\t (send) -> err: %v\n", nl, err)
if err != nil { if err != nil {
// XXX also close whole nodeLink since tx framing now can be broken? // XXX also close whole nodeLink since tx framing now can be broken?
// -> not here - this logic should be in sendPkt
} }
txreq.errch <- err txreq.errch <- err
} }
} }
//fmt.Printf("%p\t (send) -> exit\n", nl)
} }
// XXX move to NodeLink ctor ? // XXX move to NodeLink ctor ?
...@@ -362,8 +352,8 @@ func (c *Conn) Send(pkt *PktBuf) error { ...@@ -362,8 +352,8 @@ func (c *Conn) Send(pkt *PktBuf) error {
// That's why we are also checking for c.closed while waiting // That's why we are also checking for c.closed while waiting
// for reply from serveSend (and leave pkt to finish transmitting). // for reply from serveSend (and leave pkt to finish transmitting).
// //
// NOTE after we return straight here serveSend won't be blocked on // NOTE after we return straight here serveSend won't be later
// c.txerr<- because that backchannel is a non-blocking one. // blocked on c.txerr<- because that backchannel is a non-blocking one.
case <-c.closed: case <-c.closed:
return ErrClosedConn return ErrClosedConn
...@@ -392,13 +382,9 @@ func (c *Conn) Send(pkt *PktBuf) error { ...@@ -392,13 +382,9 @@ func (c *Conn) Send(pkt *PktBuf) error {
func (c *Conn) Recv() (*PktBuf, error) { func (c *Conn) Recv() (*PktBuf, error) {
select { select {
case <-c.closed: case <-c.closed:
// XXX closed c.rxq might be just indicator for this
return nil, ErrClosedConn return nil, ErrClosedConn
case pkt, ok := <-c.rxq: case pkt := <-c.rxq:
if !ok { // see ^^^
return nil, io.EOF // XXX check erroring & other errors?
}
return pkt, nil return pkt, nil
} }
} }
...@@ -406,14 +392,15 @@ func (c *Conn) Recv() (*PktBuf, error) { ...@@ -406,14 +392,15 @@ func (c *Conn) Recv() (*PktBuf, error) {
// worker for Close() & co // worker for Close() & co
func (c *Conn) close() { func (c *Conn) close() {
c.closeOnce.Do(func() { c.closeOnce.Do(func() {
//fmt.Printf("%p Conn.close\n", c) close(c.closed)
close(c.closed) // XXX better just close c.rxq + ??? for tx
}) })
} }
// Close connection // Close connection
// Any blocked Send() or Recv() will be unblocked and return error // Any blocked Send() or Recv() will be unblocked and return error
// XXX Send() - if started - will first complete (not to break framing) XXX <- in background //
// NOTE for Send() - once transmission was started - it will complete in the
// background on the wire not to break framing.
func (c *Conn) Close() error { func (c *Conn) Close() error {
// adjust nodeLink.connTab // adjust nodeLink.connTab
c.nodeLink.connMu.Lock() c.nodeLink.connMu.Lock()
......
...@@ -128,10 +128,10 @@ func tdelay() { ...@@ -128,10 +128,10 @@ func tdelay() {
} }
// create NodeLinks connected via net.Pipe // create NodeLinks connected via net.Pipe
func _nodeLinkPipe(flags1, flags2 ConnRole) (nl1, nl2 *NodeLink) { func _nodeLinkPipe(flags1, flags2 LinkRole) (nl1, nl2 *NodeLink) {
node1, node2 := net.Pipe() node1, node2 := net.Pipe()
nl1 = NewNodeLink(node1, ConnClient | flags1) nl1 = NewNodeLink(node1, LinkClient | flags1)
nl2 = NewNodeLink(node2, ConnServer | flags2) nl2 = NewNodeLink(node2, LinkServer | flags2)
return nl1, nl2 return nl1, nl2
} }
...@@ -143,7 +143,7 @@ func TestNodeLink(t *testing.T) { ...@@ -143,7 +143,7 @@ func TestNodeLink(t *testing.T) {
// TODO catch exception -> add proper location from it -> t.Fatal (see git-backup) // TODO catch exception -> add proper location from it -> t.Fatal (see git-backup)
// Close vs recvPkt // Close vs recvPkt
nl1, nl2 := _nodeLinkPipe(connNoRecvSend, connNoRecvSend) nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg := WorkGroup() wg := WorkGroup()
wg.Gox(func() { wg.Gox(func() {
tdelay() tdelay()
...@@ -157,7 +157,7 @@ func TestNodeLink(t *testing.T) { ...@@ -157,7 +157,7 @@ func TestNodeLink(t *testing.T) {
xclose(nl2) xclose(nl2)
// Close vs sendPkt // Close vs sendPkt
nl1, nl2 = _nodeLinkPipe(connNoRecvSend, connNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = WorkGroup() wg = WorkGroup()
wg.Gox(func() { wg.Gox(func() {
tdelay() tdelay()
...@@ -172,7 +172,7 @@ func TestNodeLink(t *testing.T) { ...@@ -172,7 +172,7 @@ func TestNodeLink(t *testing.T) {
xclose(nl2) xclose(nl2)
// raw exchange // raw exchange
nl1, nl2 = _nodeLinkPipe(connNoRecvSend, connNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg, ctx := WorkGroupCtx(context.Background()) wg, ctx := WorkGroupCtx(context.Background())
wg.Gox(func() { wg.Gox(func() {
...@@ -205,7 +205,7 @@ func TestNodeLink(t *testing.T) { ...@@ -205,7 +205,7 @@ func TestNodeLink(t *testing.T) {
// Test connections on top of nodelink // Test connections on top of nodelink
// Close vs Recv // Close vs Recv
nl1, nl2 = _nodeLinkPipe(0, connNoRecvSend) nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
c := nl1.NewConn() c := nl1.NewConn()
wg = WorkGroup() wg = WorkGroup()
wg.Gox(func() { wg.Gox(func() {
...@@ -221,7 +221,7 @@ func TestNodeLink(t *testing.T) { ...@@ -221,7 +221,7 @@ func TestNodeLink(t *testing.T) {
xclose(nl2) xclose(nl2)
// Close vs Send // Close vs Send
nl1, nl2 = _nodeLinkPipe(0, connNoRecvSend) nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
c = nl1.NewConn() c = nl1.NewConn()
wg = WorkGroup() wg = WorkGroup()
wg.Gox(func() { wg.Gox(func() {
......
...@@ -14,6 +14,12 @@ ...@@ -14,6 +14,12 @@
package neo package neo
import (
"unsafe"
)
// TODO organize rx buffers management (freelist etc)
// Buffer with packet data // Buffer with packet data
type PktBuf struct { type PktBuf struct {
Data []byte // whole packet data including all headers XXX -> Buf ? Data []byte // whole packet data including all headers XXX -> Buf ?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment