Commit cb81558c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5467a295
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
package neo package neo
import ( import (
"encoding/binary"
"io"
"net" "net"
) )
...@@ -44,7 +46,7 @@ type NodeLink struct { // XXX naming (-> PeerLink ?) ...@@ -44,7 +46,7 @@ type NodeLink struct { // XXX naming (-> PeerLink ?)
// TODO locking // TODO locking
connTab map[uint32]*Conn // msgid -> connection associated with msgid connTab map[uint32]*Conn // msgid -> connection associated with msgid
handleNewConn func(conn *Conn) // handler for new connections XXX -> ConnHandler (a-la Handler in net/http ?) handleNewConn func(conn *Conn) // handler for new connections XXX -> ConnHandler (a-la Handler in net/http) ?
// TODO peerLink .LocalAddr() vs .RemoteAddr() -> msgid even/odd ? (XXX vs NAT ?) // TODO peerLink .LocalAddr() vs .RemoteAddr() -> msgid even/odd ? (XXX vs NAT ?)
} }
...@@ -57,7 +59,7 @@ type NodeLink struct { // XXX naming (-> PeerLink ?) ...@@ -57,7 +59,7 @@ type NodeLink struct { // XXX naming (-> PeerLink ?)
// TODO goroutine guarantee (looks to be safe, but if not check whether we need it) // TODO goroutine guarantee (looks to be safe, but if not check whether we need it)
type Conn struct { type Conn struct {
nodeLink *NodeLink nodeLink *NodeLink
rxq chan Pkt // XXX chan &Pkt ? rxq chan *PktBuf
} }
// Buffer with packet data // Buffer with packet data
...@@ -67,52 +69,69 @@ type PktBuf struct { ...@@ -67,52 +69,69 @@ type PktBuf struct {
} }
// Send packet via connection // Make a new NodeLink from already established net.Conn
// XXX vs cancel func NewNodeLink(c net.Conn) *NodeLink {
func (Conn *c) Send(pkt Pkt) error { nl := NodeLink{
pkt.MsgId = 0 // TODO next msgid, or using same msgid as received peerLink: c,
_, err := c.nodeLink.peerLink.Write(pkt.WholeBuffer()) // TODO -> sendPkt(pkt) connTab: map[uint32]*Conn{},
}
// XXX run serveRecv() in a goroutine here?
return &nl
}
// send raw packet to peer
func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
n, err := nl.peerLink.Write(pkt.WholeBuffer()) // XXX WholeBuffer
if err != nil { if err != nil {
// XXX do we need to retry if err is temporary?
// TODO data could be written partially and thus the message stream is now broken // TODO data could be written partially and thus the message stream is now broken
// -> close connection / whole NodeLink ? // -> close connection / whole NodeLink ?
} }
return err return err
} }
// Receive packet from connection // receive raw packet from peer
// XXX vs cancel func (nl *NodeLink) recvPkt() (pkt *PktBuf, err error) {
func (Conn *c) Recv() (PktBuf, error) { // TODO organize rx buffers management (freelist etc)
pkt, ok := <-rxq
if !ok { // first read to read pkt header and hopefully up to page of data in 1 syscall
return PktBuf{}, io.EOF // XXX check erroring & other errors? rxbuf := make([]byte, 4096)
n, err := io.ReadAtLeast(nl.peerLink, rxbuf, PktHeadLen)
if err != nil {
panic(err) // XXX err
} }
return pkt, nil
}
// Close connection pkt.Id = binary.BigEndian.Uint32(rxbuf[0:]) // XXX -> PktHeader.Decode() ?
// Any blocked Send() or Recv() will be unblocked and return error pkt.Code = binary.BigEndian.Uint16(rxbuf[4:])
// XXX vs cancel pkt.Len = binary.BigEndian.Uint32(rxbuf[6:])
func (Conn *c) Close() error { // XXX do we need error here?
// TODO adjust c.nodeLink.connTab + more ?
// TODO interrupt Send/Recv
panic("TODO Conn.Close")
}
if pkt.Len < PktHeadLen {
panic("TODO pkt.Len < PktHeadLen") // XXX err (length is a whole packet len with header)
}
if pkt.Len > MAX_PACKET_SIZE {
panic("TODO message too big") // XXX err
}
if pkt.Len > uint32(len(rxbuf)) {
// grow rxbuf
rxbuf2 := make([]byte, pkt.Len)
copy(rxbuf2, rxbuf[:n])
rxbuf = rxbuf2
}
// Make a new NodeLink from already established net.Conn // read rest of pkt data, if we need to
func NewNodeLink(c net.Conn) *NodeLink { _, err = io.ReadFull(nl.peerLink, rxbuf[n:pkt.Len])
nl := NodeLink{ if err != nil {
peerLink: c, panic(err) // XXX err
connTab: {}, //make(map[uint32]*Conn),
} }
// XXX run serveRecv() in a goroutine here?
return &nl return pkt, nil
} }
// Make a connection on top of node-node link // Make a connection on top of node-node link
func (nl *NodeLink) NewConn() *Conn { func (nl *NodeLink) NewConn() *Conn {
c := &Conn{nodeLink: nl, rxq: make(chan Pkt)} c := &Conn{nodeLink: nl, rxq: make(chan *PktBuf)}
// XXX locking // XXX locking
nl.connTab[0] = c // FIXME 0 -> msgid; XXX also check not a duplicate nl.connTab[0] = c // FIXME 0 -> msgid; XXX also check not a duplicate
return c return c
...@@ -159,44 +178,36 @@ func (nl *NodeLink) HandleNewConn(h func(*Conn)) { ...@@ -159,44 +178,36 @@ func (nl *NodeLink) HandleNewConn(h func(*Conn)) {
} }
// receive 1 packet from peer
func (c *NodeLink) recvPkt() (pkt Pkt, err error) {
// TODO organize rx buffers management (freelist etc)
// first read to read pkt header and hopefully up to page of data in 1 syscall
rxbuf := make([]byte, 4096)
n, err := io.ReadAtLeast(c.peerLink, rxbuf, PktHeadLen)
if err != nil {
panic(err) // XXX err
}
pkt.Id = binary.BigEndian.Uint32(rxbuf[0:]) // XXX -> PktHeader.Decode() ?
pkt.Code = binary.BigEndian.Uint16(rxbuf[4:])
pkt.Length = binary.BigEndian.Uint32(rxbuf[6:])
if pkt.Length < PktHeadLen {
panic("TODO pkt.Length < PktHeadLen") // XXX err (length is a whole packet len with header)
}
if pkt.Length > MAX_PACKET_SIZE {
panic("TODO message too big") // XXX err
}
if pkt.Length > uint32(len(rxbuf)) { // Send packet via connection
// grow rxbuf // XXX vs cancel
rxbuf2 := make([]byte, pkt.Length) func (c *Conn) Send(pkt *PktBuf) error {
copy(rxbuf2, rxbuf[:n]) pkt.Id = 0 // TODO next msgid, or using same msgid as received
rxbuf = rxbuf2 err := c.nodeLink.sendPkt(pkt)
} return err // XXX do we need to adjust err ?
}
// read rest of pkt data, if we need to // Receive packet from connection
_, err = io.ReadFull(c.peerLink, rxbuf[n:pkt.Length]) // XXX vs cancel
if err != nil { func (c *Conn) Recv() (*PktBuf, error) {
panic(err) // XXX err pkt, ok := <-c.rxq
if !ok {
return nil, io.EOF // XXX check erroring & other errors?
} }
return pkt, nil return pkt, nil
} }
// Close connection
// Any blocked Send() or Recv() will be unblocked and return error
// XXX vs cancel
func (c *Conn) Close() error { // XXX do we need error here?
// TODO adjust c.nodeLink.connTab + more ?
// TODO interrupt Send/Recv
panic("TODO Conn.Close")
}
......
...@@ -44,7 +44,7 @@ func TestNodeLink(t *testing.T) { ...@@ -44,7 +44,7 @@ func TestNodeLink(t *testing.T) {
go func() { go func() {
err := nl1.sendPkt(...) err := nl1.sendPkt(...)
if err != nil { if err != nil {
t.Fatal(...) t.Fatal(...) // XXX bad in goroutine
} }
}() }()
pkt, err := nl2.recvPkt(...) pkt, err := nl2.recvPkt(...)
...@@ -54,6 +54,7 @@ func TestNodeLink(t *testing.T) { ...@@ -54,6 +54,7 @@ func TestNodeLink(t *testing.T) {
// TODO check pkt == what was sent // TODO check pkt == what was sent
// TODO also check ^^^ in opposite direction // TODO also check ^^^ in opposite direction
/*
// test 1 channels on top of nodelink // test 1 channels on top of nodelink
c1 := nl1.NewConn() c1 := nl1.NewConn()
nl2.HandleNewConn(func(c *Conn) { nl2.HandleNewConn(func(c *Conn) {
...@@ -67,4 +68,5 @@ func TestNodeLink(t *testing.T) { ...@@ -67,4 +68,5 @@ func TestNodeLink(t *testing.T) {
// TODO check pkt2 is pkt1 + small modification // TODO check pkt2 is pkt1 + small modification
// test 2 channels with replies comming in reversed time order // test 2 channels with replies comming in reversed time order
*/
} }
...@@ -127,7 +127,7 @@ type RowList []struct { ...@@ -127,7 +127,7 @@ type RowList []struct {
// XXX link request <-> answer ? // XXX link request <-> answer ?
// TODO ensure len(encoded packet header) == 10 // TODO ensure len(encoded packet header) == 10
type PktHead struct { type PktHead struct {
Id uint32 Id uint32 // XXX -> MsgId and same vvv ?
Code uint16 // XXX we don't need this as field - this is already encoded in type Code uint16 // XXX we don't need this as field - this is already encoded in type
Len uint32 // XXX we don't need this as field - only on the wire Len uint32 // XXX we don't need this as field - only on the wire
} }
......
...@@ -39,7 +39,7 @@ func (stor *StorageApplication) ServeConn(ctx context.Context, conn net.Conn) { ...@@ -39,7 +39,7 @@ func (stor *StorageApplication) ServeConn(ctx context.Context, conn net.Conn) {
n, err := conn.Read(rxbuf.Bytes()) n, err := conn.Read(rxbuf.Bytes())
*/ */
recvPkt() //recvPkt()
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment