Commit 3ca87f70 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent b4170a64
......@@ -15,9 +15,10 @@
package neo
import (
"encoding/binary"
//"encoding/binary"
"io"
"net"
"unsafe"
)
// NodeLink is a node-node link in NEO
......@@ -69,7 +70,7 @@ type PktBuf struct {
}
// Get pointer to packet header
func (pkt *PktBuf) Head() *PktHead {
func (pkt *PktBuf) Header() *PktHead {
// XXX check len(Data) < PktHead ?
return (*PktHead)(unsafe.Pointer(&pkt.Data[0]))
}
......@@ -87,7 +88,7 @@ func NewNodeLink(c net.Conn) *NodeLink {
// send raw packet to peer
func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
n, err := nl.peerLink.Write(pkt.WholeBuffer()) // XXX WholeBuffer
_, err := nl.peerLink.Write(pkt.Data)
if err != nil {
// XXX do we need to retry if err is temporary?
// TODO data could be written partially and thus the message stream is now broken
......@@ -107,26 +108,27 @@ func (nl *NodeLink) recvPkt() (pkt *PktBuf, err error) {
panic(err) // XXX err
}
pkt.Id = binary.BigEndian.Uint32(rxbuf[0:]) // XXX -> PktHeader.Decode() ?
pkt.Code = binary.BigEndian.Uint16(rxbuf[4:])
pkt.Len = binary.BigEndian.Uint32(rxbuf[6:])
pkth := pkt.Header()
//pkt.Id = binary.BigEndian.Uint32(rxbuf[0:]) // XXX -> PktHeader.Decode() ?
//pkt.Code = binary.BigEndian.Uint16(rxbuf[4:])
//pkt.Len = binary.BigEndian.Uint32(rxbuf[6:])
if pkt.Len < PktHeadLen {
if ntoh32(pkth.Len) < PktHeadLen {
panic("TODO pkt.Len < PktHeadLen") // XXX err (length is a whole packet len with header)
}
if pkt.Len > MAX_PACKET_SIZE {
if ntoh32(pkth.Len) > MAX_PACKET_SIZE {
panic("TODO message too big") // XXX err
}
if pkt.Len > uint32(len(rxbuf)) {
if ntoh32(pkth.Len) > uint32(len(rxbuf)) {
// grow rxbuf
rxbuf2 := make([]byte, pkt.Len)
rxbuf2 := make([]byte, ntoh32(pkth.Len))
copy(rxbuf2, rxbuf[:n])
rxbuf = rxbuf2
}
// read rest of pkt data, if we need to
_, err = io.ReadFull(nl.peerLink, rxbuf[n:pkt.Len])
_, err = io.ReadFull(nl.peerLink, rxbuf[n:ntoh32(pkth.Len)])
if err != nil {
panic(err) // XXX err
}
......@@ -158,7 +160,7 @@ func (nl *NodeLink) serveRecv() error {
// if we don't yet have connection established for pkt.Id spawn connection-serving goroutine
// XXX connTab locking
conn := nl.connTab[pkt.Id]
conn := nl.connTab[ntoh32(pkt.Header().MsgId)]
if conn == nil {
if nl.handleNewConn == nil {
// we are not accepting incoming connections - ignore packet
......@@ -196,7 +198,7 @@ func (nl *NodeLink) Close() error {
// Send packet via connection
// XXX vs cancel
func (c *Conn) Send(pkt *PktBuf) error {
pkt.Id = 0 // TODO next msgid, or using same msgid as received
pkt.Header().MsgId = hton32(0) // TODO next msgid, or using same msgid as received
err := c.nodeLink.sendPkt(pkt)
return err // XXX do we need to adjust err ?
}
......
......@@ -169,7 +169,7 @@ type RequestIdentification struct {
UUID UUID
Address
Name string
IdTimestamp Float
IdTimestamp Float64
}
// XXX -> ReplyIdentification? RequestIdentification.Answer somehow ?
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment