Commit 39982595 authored by Kirill Smelkov's avatar Kirill Smelkov

go/neo/neonet: User-visible Send/Recv + Ask/Expect

Provide Conn.Send and Conn.Recv which work on NEO messages and
automatically encode/decode them into packets on the fly.

Similarly to NEO/py also provide Ask to send a request and receive
expected reply and Expect which does only the latter half of Ask.
parent e8396003
...@@ -31,6 +31,9 @@ ...@@ -31,6 +31,9 @@
// new connection can be accepted via link.Accept(), and all further communication // new connection can be accepted via link.Accept(), and all further communication
// send/receive exchange will be happening in between those 2 connections. // send/receive exchange will be happening in between those 2 connections.
// //
// Use conn.Send and conn.Recv to actually exchange messages. See Conn
// documentation for other message-exchange utilities like Ask and Expect.
//
// See also package lab.nexedi.com/kirr/neo/go/neo/proto for definition of NEO // See also package lab.nexedi.com/kirr/neo/go/neo/proto for definition of NEO
// messages. // messages.
package neonet package neonet
...@@ -59,6 +62,7 @@ import ( ...@@ -59,6 +62,7 @@ import (
"io" "io"
"math" "math"
"net" "net"
"reflect"
//"runtime" //"runtime"
"sync" "sync"
"time" "time"
...@@ -1003,6 +1007,7 @@ func (nl *NodeLink) serveSend() { ...@@ -1003,6 +1007,7 @@ func (nl *NodeLink) serveSend() {
// on IO error framing over peerLink becomes broken // on IO error framing over peerLink becomes broken
// so we shut down node link and all connections over it. // so we shut down node link and all connections over it.
// //
// XXX dup wrt sendPktDirect
// XXX move to link.sendPkt? // XXX move to link.sendPkt?
if err != nil { if err != nil {
nl.shutdown() nl.shutdown()
...@@ -1012,6 +1017,38 @@ func (nl *NodeLink) serveSend() { ...@@ -1012,6 +1017,38 @@ func (nl *NodeLink) serveSend() {
} }
} }
// ---- transmit direct mode ----
// serveSend is not strictly needed - net.Conn.Write already can be used by multiple
// goroutines simultaneously and works atomically; (same for Conn.Read etc - see pool.FD)
// https://github.com/golang/go/blob/go1.9-3-g099336988b/src/net/net.go#L109
// https://github.com/golang/go/blob/go1.9-3-g099336988b/src/internal/poll/fd_unix.go#L14
//
// thus the only reason we use serveSend is so that Conn.Close can "interrupt" Conn.Send via .txdown .
// however this adds overhead and is not needed in light mode.
// sendPktDirect sends raw packet with appropriate connection ID directly via link.
func (c *Conn) sendPktDirect(pkt *pktBuf) error {
// set pkt connId associated with this connection
pkt.Header().ConnId = packed.Hton32(c.connId)
// NOTE if n.peerLink was just closed by rx->shutdown we'll get ErrNetClosing
err := c.link.sendPkt(pkt)
//fmt.Printf("sendPkt -> %v\n", err)
// on IO error framing over peerLink becomes broken
// so we shut down node link and all connections over it.
//
// XXX dup wrt serveSend
// XXX move to link.sendPkt?
if err != nil {
c.link.shutdown()
}
return err
}
// ---- raw IO ---- // ---- raw IO ----
const dumpio = false const dumpio = false
...@@ -1189,13 +1226,130 @@ func msgPack(connId uint32, msg proto.Msg) *pktBuf { ...@@ -1189,13 +1226,130 @@ func msgPack(connId uint32, msg proto.Msg) *pktBuf {
// TODO msgUnpack // TODO msgUnpack
// Recv receives message from the connection.
func (c *Conn) Recv() (proto.Msg, error) {
pkt, err := c.recvPkt()
if err != nil {
return nil, err
}
//defer pkt.Free()
msg, err := c._Recv(pkt)
pkt.Free()
return msg, err
}
func (c *Conn) _Recv(pkt *pktBuf) (proto.Msg, error) {
// decode packet
pkth := pkt.Header()
msgCode := packed.Ntoh16(pkth.MsgCode)
msgType := proto.MsgType(msgCode)
if msgType == nil {
err := fmt.Errorf("invalid msgCode (%d)", msgCode)
// XXX "decode" -> "recv: decode"?
return nil, c.err("decode", err)
}
// TODO use free-list for decoded messages + when possible decode in-place
msg := reflect.New(msgType).Interface().(proto.Msg)
// msg := reflect.NewAt(msgType, bufAlloc(msgType.Size())
_, err := msg.NEOMsgDecode(pkt.Payload())
if err != nil {
return nil, c.err("decode", err) // XXX "decode:" is already in ErrDecodeOverflow
}
return msg, nil
}
// sendMsg sends message with specified connection ID. // sendMsg sends message with specified connection ID.
// //
// it encodes message into packet, sets header appropriately and sends it. // it encodes message into packet, sets header appropriately and sends it.
// //
// it is ok to call sendMsg in parallel with serveSend. // it is ok to call sendMsg in parallel with serveSend. XXX link to sendPktDirect for rationale?
func (link *NodeLink) sendMsg(connId uint32, msg proto.Msg) error { func (link *NodeLink) sendMsg(connId uint32, msg proto.Msg) error {
buf := msgPack(connId, msg) buf := msgPack(connId, msg)
return link.sendPkt(buf) // XXX more context in err? (msg type) return link.sendPkt(buf) // XXX more context in err? (msg type)
// FIXME ^^^ shutdown whole link on error // FIXME ^^^ shutdown whole link on error
} }
// Send sends message over the connection.
func (c *Conn) Send(msg proto.Msg) error {
buf := msgPack(c.connId, msg)
return c.sendPkt(buf) // XXX more context in err? (msg type)
}
func (c *Conn) sendMsgDirect(msg proto.Msg) error {
return c.link.sendMsg(c.connId, msg)
}
// Expect receives message and checks it is one of expected types.
//
// If verification is successful the message is decoded inplace and returned
// which indicates index of received message.
//
// On error (-1, err) is returned.
func (c *Conn) Expect(msgv ...proto.Msg) (which int, err error) {
// XXX a bit dup wrt Recv
pkt, err := c.recvPkt()
if err != nil {
return -1, err
}
//defer pkt.Free()
which, err = c._Expect(pkt, msgv...)
pkt.Free()
return which, err
}
func (c *Conn) _Expect(pkt *pktBuf, msgv ...proto.Msg) (int, error) {
pkth := pkt.Header()
msgCode := packed.Ntoh16(pkth.MsgCode)
for i, msg := range msgv {
if msg.NEOMsgCode() == msgCode {
_, err := msg.NEOMsgDecode(pkt.Payload())
if err != nil {
return -1, c.err("decode", err)
}
return i, nil
}
}
// unexpected message
msgType := proto.MsgType(msgCode)
if msgType == nil {
return -1, c.err("decode", fmt.Errorf("invalid msgCode (%d)", msgCode))
}
// XXX also add which messages were expected ?
return -1, c.err("recv", fmt.Errorf("unexpected message: %v", msgType))
}
// Ask sends request and receives a response.
//
// It expects response to be either of resp type or proto.Error:
//
// If resp-type message is received, it is decoded inplace and nil is returned.
// If proto.Error message is received, it is returned as error.
//
// Otherwise returned error describes the problem.
//
// XXX return proto.Error explicitly?
func (c *Conn) Ask(req proto.Msg, resp proto.Msg) error {
err := c.Send(req)
if err != nil {
return err
}
nerr := &proto.Error{}
which, err := c.Expect(resp, nerr)
switch which {
case 0:
return nil
case 1:
return nerr
}
return err
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment