Commit 8ab1e541 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c5940f2e
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
package neo package neo
import ( import (
//"fmt"
//"encoding/binary" //"encoding/binary"
"io" "io"
"net" "net"
...@@ -66,7 +68,7 @@ type Conn struct { ...@@ -66,7 +68,7 @@ type Conn struct {
// Buffer with packet data // Buffer with packet data
type PktBuf struct { type PktBuf struct {
//PktHead //PktHead
Data []byte // whole packet data including all headers Data []byte // whole packet data including all headers XXX -> Buf ?
} }
// Get pointer to packet header // Get pointer to packet header
...@@ -75,6 +77,11 @@ func (pkt *PktBuf) Header() *PktHead { ...@@ -75,6 +77,11 @@ func (pkt *PktBuf) Header() *PktHead {
return (*PktHead)(unsafe.Pointer(&pkt.Data[0])) return (*PktHead)(unsafe.Pointer(&pkt.Data[0]))
} }
// Get packet payload
func (pkt *PktBuf) Payload() []byte {
return pkt.Data[PktHeadLen:]
}
// Make a new NodeLink from already established net.Conn // Make a new NodeLink from already established net.Conn
func NewNodeLink(c net.Conn) *NodeLink { func NewNodeLink(c net.Conn) *NodeLink {
...@@ -98,12 +105,13 @@ func (nl *NodeLink) sendPkt(pkt *PktBuf) error { ...@@ -98,12 +105,13 @@ func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
} }
// receive raw packet from peer // receive raw packet from peer
func (nl *NodeLink) recvPkt() (pkt *PktBuf, err error) { func (nl *NodeLink) recvPkt() (*PktBuf, error) {
// TODO organize rx buffers management (freelist etc) // TODO organize rx buffers management (freelist etc)
// TODO cleanup lots of ntoh32(...)
// first read to read pkt header and hopefully up to page of data in 1 syscall // first read to read pkt header and hopefully up to page of data in 1 syscall
rxbuf := make([]byte, 4096) pkt := &PktBuf{make([]byte, 4096)}
n, err := io.ReadAtLeast(nl.peerLink, rxbuf, PktHeadLen) n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, PktHeadLen)
if err != nil { if err != nil {
return nil, err // XXX err adjust ? return nil, err // XXX err adjust ?
} }
...@@ -120,18 +128,22 @@ func (nl *NodeLink) recvPkt() (pkt *PktBuf, err error) { ...@@ -120,18 +128,22 @@ func (nl *NodeLink) recvPkt() (pkt *PktBuf, err error) {
panic("TODO message too big") // XXX err panic("TODO message too big") // XXX err
} }
if ntoh32(pkth.Len) > uint32(len(rxbuf)) { if ntoh32(pkth.Len) > uint32(cap(pkt.Data)) {
// grow rxbuf // grow rxbuf
rxbuf2 := make([]byte, ntoh32(pkth.Len)) rxbuf2 := make([]byte, ntoh32(pkth.Len))
copy(rxbuf2, rxbuf[:n]) copy(rxbuf2, pkt.Data[:n])
rxbuf = rxbuf2 pkt.Data = rxbuf2
} }
// cut .Data len to length of packet
pkt.Data = pkt.Data[:ntoh32(pkth.Len)]
// read rest of pkt data, if we need to // read rest of pkt data, if we need to
_, err = io.ReadFull(nl.peerLink, rxbuf[n:ntoh32(pkth.Len)]) if n < len(pkt.Data) {
_, err = io.ReadFull(nl.peerLink, pkt.Data[n:])
if err != nil { if err != nil {
panic(err) // XXX err panic(err) // XXX err
} }
}
return pkt, nil return pkt, nil
} }
......
...@@ -17,10 +17,15 @@ package neo ...@@ -17,10 +17,15 @@ package neo
import ( import (
//"fmt" //"fmt"
"bytes"
"context"
"io" "io"
"fmt"
"net" "net"
"testing" "testing"
"time" "time"
"golang.org/x/sync/errgroup"
) )
func xsend(c *Conn, pkt *PktBuf) { func xsend(c *Conn, pkt *PktBuf) {
...@@ -70,17 +75,30 @@ func xspawn(funcv ...func()) { ...@@ -70,17 +75,30 @@ func xspawn(funcv ...func()) {
} }
*/ */
// run f in a goroutine, see its return error, if != nil -> t.Fatal in main goroutine
func xgo(t *testing.T, f func() error) { // // run f in a goroutine; check its return error; if != nil -> t.Fatal in main goroutine
var err error // func xgo(t *testing.T, f func() error) {
done := make(chan struct{}) // var err error
go func() { // done := make(chan struct{})
err = f() // go func() {
close(done) // err = f()
}() // close(done)
<-done // if err != nil {
// panic(err) // XXX temp - see vvv
// }
// }()
// /* FIXME below just blocks main goroutine waiting for f() to complete
// <-done
// if err != nil {
// t.Fatal(err) // TODO adjust lineno (report calling location, not here)
// }
// */
// }
func xwait(t *testing.T, w interface { Wait() error }) {
err := w.Wait()
if err != nil { if err != nil {
t.Fatal(err) // TODO adjust lineno (report not here) t.Fatal(err) // TODO include caller location
} }
} }
...@@ -91,15 +109,21 @@ func tdelay() { ...@@ -91,15 +109,21 @@ func tdelay() {
time.Sleep(1*time.Millisecond) time.Sleep(1*time.Millisecond)
} }
// create NodeLinks connected via net.Pipe
func nodeLinkPipe() (nl1, nl2 *NodeLink) {
node1, node2 := net.Pipe()
nl1 = NewNodeLink(node1)
nl2 = NewNodeLink(node2)
return nl1, nl2
}
func TestNodeLink(t *testing.T) { func TestNodeLink(t *testing.T) {
// verify NodeLink via net.Pipe nl1, nl2 := nodeLinkPipe()
node1, node2 := net.Pipe()
nl1 := NewNodeLink(node1)
nl2 := NewNodeLink(node2)
// Close vs recv // Close vs recvPkt
xgo(t, func() error { g := &errgroup.Group{}
g.Go(func() error {
tdelay() tdelay()
return nl1.Close() return nl1.Close()
}) })
...@@ -107,56 +131,92 @@ func TestNodeLink(t *testing.T) { ...@@ -107,56 +131,92 @@ func TestNodeLink(t *testing.T) {
if !(pkt == nil && err == io.ErrClosedPipe) { if !(pkt == nil && err == io.ErrClosedPipe) {
t.Fatalf("NodeLink.recvPkt() after close: pkt = %v err = %v", pkt, err) t.Fatalf("NodeLink.recvPkt() after close: pkt = %v err = %v", pkt, err)
} }
xwait(t, g)
// Close vs send // Close vs sendPkt
xgo(t, func() error { g = &errgroup.Group{}
g.Go(func() error {
tdelay() tdelay()
return nl2.Close() return nl2.Close()
}) })
pkt = &PktBuf{[]byte("hello world")} pkt = &PktBuf{[]byte("data")}
err = nl2.sendPkt(pkt) err = nl2.sendPkt(pkt)
if err != io.ErrClosedPipe { if err != io.ErrClosedPipe {
t.Fatalf("NodeLink.sendPkt() after close: err = %v", err) t.Fatalf("NodeLink.sendPkt() after close: err = %v", err)
} }
xwait(t, g)
// TODO (?) every func: run with exception catcher (including t.Fatal)
/*
// TODO setup context
// TODO on context.cancel -> nl{1,2} -> Close
// TODO every func: run with exception catcher (including t.Fatal)
// if caught: // if caught:
// * ctx.cancel // * ctx.cancel
// * wait all for finish // * wait all for finish
// * rethrough in main // * rethrough in main
nl1, nl2 = nodeLinkPipe()
// first check raw exchange works g, ctx := errgroup.WithContext(context.Background())
// XXX move vvv also to g ?
go func() { go func() {
pkt = ... <-ctx.Done()
//time.Sleep(100*time.Millisecond)
t.Log("ctx was Done - closing nodelinks")
nl1.Close() // XXX err
nl2.Close() // XXX err
}()
// check raw exchange works
g.Go(func() error {
// send ping; wait for pong
pkt := &PktBuf{make([]byte, PktHeadLen + 4)}
pkth := pkt.Header()
pkth.Len = hton32(PktHeadLen + 4)
copy(pkt.Payload(), "ping")
err := nl1.sendPkt(pkt) err := nl1.sendPkt(pkt)
if err != nil { if err != nil {
t.Fatal(...) // XXX bad in goroutine t.Errorf("nl1.sendPkt: %v", err)
return err
} }
pkt, err = nl1.recvPkt() pkt, err = nl1.recvPkt()
if err != nil { if err != nil {
t.Fatal(...) t.Errorf("nl1.recvPkt: %v", err)
return err
} }
// TODO check pkt == what was sent back if !bytes.Equal(pkt.Data, []byte("pong")) {
}() // XXX vvv -> util ?
go func() { e := fmt.Errorf("nl1 received: %v ; want \"pong\"", pkt.Data)
t.Error(e)
return e
}
return nil
})
g.Go(func() error {
// wait for ping; send pong
pkt, err := nl2.recvPkt() pkt, err := nl2.recvPkt()
if err != nil { if err != nil {
t.Fatal(...) t.Errorf("nl2.recvPkt: %v", err)
return err
} }
// TODO check pkt == what was sent if !bytes.Equal(pkt.Data, []byte("ping")) {
// XXX vvv -> util ?
// TODO change pkt a bit e := fmt.Errorf("nl2 received: %v ; want \"ping\"", pkt.Data)
// send pkt back t.Error(e)
return e
}
pkt = &PktBuf{[]byte("pong")}
err = nl2.sendPkt(pkt) err = nl2.sendPkt(pkt)
if err != nil { if err != nil {
t.Fatal(...) // XXX bad in goroutine t.Errorf("nl2.sendPkt: %v", err)
return err
} }
return nil
})
xwait(t, g)
t.Fatal("bbb")
/*
err = g.Wait()
if err != nil {
t.Fatal("raw exchange verification failed")
} }
*/ */
/* /*
// test 1 channels on top of nodelink // test 1 channels on top of nodelink
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment