Commit 89309b93 authored by Kirill Smelkov's avatar Kirill Smelkov

X Move message IO to Conn

parent c2a1b63a
......@@ -59,7 +59,7 @@ func (c *Client) LastTid() (zodb.Tid, error) {
// FIXME do not use global conn (see comment in openClientByURL)
// XXX open new conn for this particular req/reply ?
reply := neo.AnswerLastTransaction{}
err := neo.Ask(c.storConn, &neo.LastTransaction{}, &reply)
err := c.storConn.Ask(&neo.LastTransaction{}, &reply)
if err != nil {
return 0, err // XXX err ctx
}
......@@ -78,7 +78,7 @@ func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
}
resp := neo.AnswerGetObject{}
err = neo.Ask(c.storConn, &req, &resp)
err = c.storConn.Ask(&req, &resp)
if err != nil {
return nil, 0, err // XXX err context
}
......
......@@ -28,6 +28,8 @@ import (
"encoding/binary"
"fmt"
"reflect"
)
// NodeLink is a node-node link in NEO
......@@ -541,7 +543,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
// first read to read pkt header and hopefully up to page of data in 1 syscall
pkt := &PktBuf{make([]byte, 4096)}
// TODO reenable, but NOTE next packet can be also prefetched here -> use buffering ?
//n, err := io.ReadAtLeast(nl.peerLink, ptb.Data, PktHeadLen)
//n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, PktHeadLen)
n, err := io.ReadFull(nl.peerLink, pkt.Data[:PktHeadLen])
if err != nil {
return nil, err
......@@ -728,3 +730,113 @@ func (c *Conn) err(op string, e error) error {
}
return &ConnError{Conn: c, Op: op, Err: e}
}
// ---- exchange of messages ----
// Recv receives message
// it receives packet and decodes message from it
func (c *Conn) Recv() (Msg, error) {
// TODO use freelist for PktBuf
pkt, err := c.recvPkt()
if err != nil {
return nil, err
}
// decode packet
pkth := pkt.Header()
msgCode := ntoh16(pkth.MsgCode)
msgType := msgTypeRegistry[msgCode]
if msgType == nil {
err = fmt.Errorf("invalid msgCode (%d)", msgCode)
// XXX "decode" -> "recv: decode"?
return nil, &ConnError{Conn: c, Op: "decode", Err: err}
}
// TODO use free-list for decoded messages + when possible decode in-place
msg := reflect.New(msgType).Interface().(Msg)
_, err = msg.NEOMsgDecode(pkt.Payload())
if err != nil {
return nil, &ConnError{Conn: c, Op: "decode", Err: err}
}
return msg, nil
}
// Send sends message
// it encodes message into packet and sends it
func (c *Conn) Send(msg Msg) error {
l := msg.NEOMsgEncodedLen()
buf := PktBuf{make([]byte, PktHeadLen + l)} // TODO -> freelist
h := buf.Header()
// h.ConnId will be set by conn.Send
h.MsgCode = hton16(msg.NEOMsgCode())
h.MsgLen = hton32(uint32(l)) // XXX casting: think again
msg.NEOMsgEncode(buf.Payload())
// XXX why pointer?
// XXX more context in err? (msg type)
return c.sendPkt(&buf)
}
// Expect receives message and checks it is one of expected types
//
// if verification is successful the message is decoded inplace and returned
// which indicates index of received message.
//
// on error (-1, err) is returned
func (c *Conn) Expect(msgv ...Msg) (which int, err error) {
// XXX a bit dup wrt Recv
// TODO use freelist for PktBuf
pkt, err := c.recvPkt()
if err != nil {
return -1, err
}
pkth := pkt.Header()
msgCode := ntoh16(pkth.MsgCode)
for i, msg := range msgv {
if msg.NEOMsgCode() == msgCode {
_, err = msg.NEOMsgDecode(pkt.Payload())
if err != nil {
return -1, &ConnError{Conn: c, Op: "decode", Err: err}
}
return i, nil
}
}
// unexpected message
msgType := msgTypeRegistry[msgCode]
if msgType == nil {
return -1, &ConnError{c, "decode", fmt.Errorf("invalid msgCode (%d)", msgCode)}
}
// XXX also add which messages were expected ?
return -1, &ConnError{c, "recv", fmt.Errorf("unexpected message: %v", msgType)}
}
// Ask sends request and receives response
// It expects response to be exactly of resp type and errors otherwise
// XXX clarify error semantic (when Error is decoded)
// XXX do the same as Expect wrt respv ?
func (c *Conn) Ask(req Msg, resp Msg) error {
err := c.Send(req)
if err != nil {
return err
}
nerr := &Error{}
which, err := c.Expect(resp, nerr)
switch which {
case 0:
return nil
case 1:
return ErrDecode(nerr)
}
return err
}
......@@ -307,13 +307,13 @@ func storCtlRecovery(ctx context.Context, link *neo.NodeLink, res chan storRecov
// XXX cancel on ctx
recovery := neo.AnswerRecovery{}
err = neo.Ask(conn, &neo.Recovery{}, &recovery)
err = conn.Ask(&neo.Recovery{}, &recovery)
if err != nil {
return
}
resp := neo.AnswerPartitionTable{}
err = neo.Ask(conn, &neo.X_PartitionTable{}, &resp)
err = conn.Ask(&neo.X_PartitionTable{}, &resp)
if err != nil {
return
}
......@@ -486,7 +486,7 @@ func storCtlVerify(ctx context.Context, link *neo.NodeLink, res chan storVerify)
conn, _ := link.NewConn()
locked := neo.AnswerLockedTransactions{}
err = neo.Ask(conn, &neo.LockedTransactions{}, &locked)
err = conn.Ask(&neo.LockedTransactions{}, &locked)
if err != nil {
return
}
......@@ -498,7 +498,7 @@ func storCtlVerify(ctx context.Context, link *neo.NodeLink, res chan storVerify)
}
last := neo.AnswerLastIDs{}
err = neo.Ask(conn, &neo.LastIDs{}, &last)
err = conn.Ask(&neo.LastIDs{}, &last)
if err != nil {
return
}
......@@ -689,11 +689,11 @@ func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) {
}
idReq := neo.RequestIdentification{}
err = neo.Expect(conn, &idReq)
_, err = conn.Expect(&idReq)
if err != nil {
logf("identify: %v", err)
// XXX ok to let peer know error as is? e.g. even IO error on Recv?
err = neo.EncodeAndSend(conn, &neo.Error{neo.PROTOCOL_ERROR, err.Error()})
err = conn.Send(&neo.Error{neo.PROTOCOL_ERROR, err.Error()})
if err != nil {
logf("failed to send error: %v", err)
}
......@@ -714,7 +714,7 @@ func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) {
}
// let the peer know identification result
err = neo.EncodeAndSend(conn, idResp)
err = conn.Send(idResp)
if err != nil {
return
}
......@@ -776,7 +776,7 @@ func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) {
// changed = true
}
err = neo.EncodeAndSend(connNotify, msg)
err = connNotify.Send(msg)
if err != nil {
// XXX err
}
......
......@@ -113,7 +113,7 @@ func IdentifyPeer(link *neo.NodeLink, myNodeType neo.NodeType) (nodeInfo neo.Req
}()
req := neo.RequestIdentification{}
err = neo.Expect(conn, &req)
_, err = conn.Expect(&req)
if err != nil {
return nodeInfo, err
}
......@@ -122,7 +122,7 @@ func IdentifyPeer(link *neo.NodeLink, myNodeType neo.NodeType) (nodeInfo neo.Req
// TODO hook here in logic to check identification request, assign nodeID etc
err = neo.EncodeAndSend(conn, &neo.AcceptIdentification{
err = conn.Send(&neo.AcceptIdentification{
NodeType: myNodeType,
MyNodeUUID: 0, // XXX
NumPartitions: 1, // XXX
......
......@@ -155,7 +155,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) error {
if err != nil { panic(err) } // XXX
for {
notify, err := neo.RecvAndDecode(conn)
notify, err := conn.Recv()
if err != nil {
// XXX TODO
}
......@@ -261,7 +261,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *neo.Conn) {
}()
for {
req, err := neo.RecvAndDecode(conn)
req, err := conn.Recv()
if err != nil {
return // XXX log / err / send error before closing
}
......@@ -296,7 +296,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *neo.Conn) {
}
}
neo.EncodeAndSend(conn, reply) // XXX err
conn.Send(reply) // XXX err
case *neo.LastTransaction:
var reply neo.Msg
......@@ -308,7 +308,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *neo.Conn) {
reply = &neo.AnswerLastTransaction{lastTid}
}
neo.EncodeAndSend(conn, reply) // XXX err
conn.Send(reply) // XXX err
//case *ObjectHistory:
//case *StoreObject:
......
......@@ -3,130 +3,14 @@ package neo
import (
"fmt"
"reflect"
"../zodb"
"lab.nexedi.com/kirr/go123/xerr"
)
// Recv receives packet and decodes message from it
func RecvAndDecode(conn *Conn) (Msg, error) {
pkt, err := conn.recvPkt()
if err != nil {
return nil, err
}
// decode packet
pkth := pkt.Header()
msgCode := ntoh16(pkth.MsgCode)
msgType := msgTypeRegistry[msgCode]
if msgType == nil {
err = fmt.Errorf("invalid msgCode (%d)", msgCode)
// XXX -> ProtoError ?
return nil, &ConnError{Conn: conn, Op: "decode", Err: err}
}
// TODO use free-list for decoded packets + when possible decode in-place
msg := reflect.New(msgType).Interface().(Msg)
_, err = msg.NEOMsgDecode(pkt.Payload())
if err != nil {
// XXX -> ProtoError ?
return nil, &ConnError{Conn: conn, Op: "decode", Err: err}
}
return msg, nil
}
// EncodeAndSend encodes message into packet and sends it
func EncodeAndSend(conn *Conn, msg Msg) error {
l := msg.NEOMsgEncodedLen()
buf := PktBuf{make([]byte, PktHeadLen + l)} // XXX -> freelist
h := buf.Header()
// h.ConnId will be set by conn.Send
h.MsgCode = hton16(msg.NEOMsgCode())
h.MsgLen = hton32(uint32(l)) // XXX casting: think again
msg.NEOMsgEncode(buf.Payload())
return conn.sendPkt(&buf) // XXX why pointer?
}
// Ask does simple request/response protocol exchange
// It expects the answer to be exactly of resp type and errors otherwise
func Ask(conn *Conn, req Msg, resp Msg) error {
err := EncodeAndSend(conn, req)
if err != nil {
return err
}
err = Expect(conn, resp) // XXX +Error
return err
}
// ProtoError is returned when there was a protocol error, like receiving
// unexpected packet or packet with wrong header
// FIXME -> ConnError{Op: "decode"}
type ProtoError struct {
Conn *Conn
Err error
}
func (e *ProtoError) Error() string {
return fmt.Sprintf("%v: %v", e.Conn, e.Err)
}
// Expect receives 1 packet and expects it to be exactly of msg type
// XXX naming (-> Recv1 ?)
func Expect(conn *Conn, msg Msg) (err error) {
pkt, err := conn.recvPkt()
if err != nil {
return err
}
// received ok. Now it is all decoding
// XXX dup wrt RecvAndDecode
pkth := pkt.Header()
msgCode := ntoh16(pkth.MsgCode)
if msgCode != msg.NEOMsgCode() {
// unexpected Error response
if msgCode == (&Error{}).NEOMsgCode() {
errResp := Error{}
_, err = errResp.NEOMsgDecode(pkt.Payload())
if err != nil {
return &ProtoError{conn, err}
}
// FIXME clarify error decoding logic:
// - in some cases Error is one of "expected" answers (e.g. Ask(GetObject))
// - in other cases Error is completely not expected
// (e.g. getting 1st packet on connection)
return ErrDecode(&errResp) // XXX err ctx vs ^^^ errcontextf ?
}
msgType := msgTypeRegistry[msgCode]
if msgType == nil {
return &ProtoError{conn, fmt.Errorf("invalid msgCode (%d)", msgCode)}
}
return &ProtoError{conn, fmt.Errorf("unexpected packet: %v", msgType)}
}
_, err = msg.NEOMsgDecode(pkt.Payload())
if err != nil {
return &ProtoError{conn, err}
}
return nil
}
// ------------------------------------------
// XXX place=?
// XXX place=? -> methods of Error
// errEncode translates an error into Error packet
// XXX more text describing relation with zodb errors
......@@ -178,7 +62,7 @@ func IdentifyWith(expectPeerType NodeType, link *NodeLink, myInfo NodeInfo, clus
}()
accept = &AcceptIdentification{}
err = Ask(conn, &RequestIdentification{
err = conn.Ask(&RequestIdentification{
NodeType: myInfo.NodeType,
NodeUUID: myInfo.NodeUUID,
Address: myInfo.Address,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment