Commit 8caeaceb authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 63a70d44
......@@ -15,9 +15,6 @@
package neo
import (
//"fmt"
//"encoding/binary"
"io"
"net"
"unsafe"
......@@ -67,13 +64,12 @@ type Conn struct {
// Buffer with packet data
type PktBuf struct {
//PktHead
Data []byte // whole packet data including all headers XXX -> Buf ?
}
// Get pointer to packet header
func (pkt *PktBuf) Header() *PktHead {
// XXX check len(Data) < PktHead ?
// XXX check len(Data) < PktHead ? -> no, Data has to be allocated with cap >= PktHeadLen
return (*PktHead)(unsafe.Pointer(&pkt.Data[0]))
}
......@@ -108,6 +104,8 @@ func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
func (nl *NodeLink) recvPkt() (*PktBuf, error) {
// TODO organize rx buffers management (freelist etc)
// TODO cleanup lots of ntoh32(...)
// XXX do we need to retry if err is temporary?
// TODO on error framing is broken -> close connection / whole NodeLink ?
// first read to read pkt header and hopefully up to page of data in 1 syscall
pkt := &PktBuf{make([]byte, 4096)}
......@@ -117,10 +115,8 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
}
pkth := pkt.Header()
//pkt.Id = binary.BigEndian.Uint32(rxbuf[0:]) // XXX -> PktHeader.Decode() ?
//pkt.Code = binary.BigEndian.Uint16(rxbuf[4:])
//pkt.Len = binary.BigEndian.Uint32(rxbuf[6:])
// XXX -> better PktHeader.Decode() ?
if ntoh32(pkth.Len) < PktHeadLen {
panic("TODO pkt.Len < PktHeadLen") // XXX err (length is a whole packet len with header)
}
......@@ -141,13 +137,21 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
if n < len(pkt.Data) {
_, err = io.ReadFull(nl.peerLink, pkt.Data[n:])
if err != nil {
panic(err) // XXX err
return nil, err // XXX err adjust ?
}
}
return pkt, nil
}
// Close node-node link.
// IO on connections established over it is automatically interrupted with an error.
func (nl *NodeLink) Close() error {
// TODO adjust connTab & friends
return nl.peerLink.Close()
}
// Make a connection on top of node-node link
func (nl *NodeLink) NewConn() *Conn {
......@@ -197,14 +201,6 @@ func (nl *NodeLink) HandleNewConn(h func(*Conn)) {
nl.handleNewConn = h // NOTE can change handler at runtime XXX do we need this?
}
// Close node-node link.
// IO on connections established over it is automatically interrupted with an error.
func (nl *NodeLink) Close() error {
// TODO adjust connTab & friends
return nl.peerLink.Close()
}
// Send packet via connection
// XXX vs cancel
......@@ -215,8 +211,8 @@ func (c *Conn) Send(pkt *PktBuf) error {
}
// Receive packet from connection
// XXX vs cancel
func (c *Conn) Recv() (*PktBuf, error) {
// TODO also select on closech
pkt, ok := <-c.rxq
if !ok {
return nil, io.EOF // XXX check erroring & other errors?
......
......@@ -196,10 +196,39 @@ func TestNodeLink(t *testing.T) {
xwait(wgclose)
// test 1 channels on top of nodelink
// test channels on top of nodelink
nl1, nl2 = nodeLinkPipe()
c1 := nl1.NewConn()
// Close vs Recv
c := nl1.NewConn()
wg = WorkGroup()
wg.Gox(func() {
tdelay()
xclose(c)
})
pkt, err = c.Recv()
if !(pkt == nil && err == io.ErrClosedPipe) {
t.Fatalf("Conn.Recv() after close: pkt = %v err = %v", pkt, err)
}
xwait(wg)
// Close vs Send
c = nl1.NewConn()
wg = WorkGroup()
wg.Gox(func() {
tdelay()
xclose(c)
})
pkt = &PktBuf{[]byte("data")}
err = c.Send(pkt)
if err != io.ErrClosedPipe {
t.Fatalf("Conn.Send() after close: err = %v", err)
}
xwait(wg)
/*
nl2.HandleNewConn(func(c *Conn) {
pkt := xrecv(c) // XXX t.Fatal() must be called from main goroutine -> context.Cancel ?
// change pkt a bit (TODO) and send it back
......@@ -211,4 +240,5 @@ func TestNodeLink(t *testing.T) {
// TODO check pkt2 is pkt1 + small modification
// test 2 channels with replies comming in reversed time order
*/
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment