Commit 1f7deac2 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 7d02826c
......@@ -18,8 +18,10 @@
// See https://www.nexedi.com/licensing for rationale and options.
package zeo
// marshalling of messages <-> wire encoding.
// see https://github.com/zopefoundation/ZEO/blob/5.2.1-20-gcb26281d/doc/protocol.rst
// Protocol for exchanging ZEO messages.
// On the wire messages are encoded via either pickles or msgpack.
// Each message is wrapped into packet with be32 header of whole packet size.
// See https://github.com/zopefoundation/ZEO/blob/5.2.1-20-gcb26281d/doc/protocol.rst for details.
import (
"bytes"
......@@ -35,9 +37,30 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb/internal/pickletools"
)
// msg represents 1 message.
// arg is arbitrary argument(s) passed/received along ZEO call or reply.
//
// for objects in arg user code has to be obtain them via encoding.as*() and
// set them via encoding.Tid(), encoding.Oid() and other similar methods that
// convert application-level data into objects properly corresponding to wire
// encoding of messages.
type msg struct {
msgid int64
flags msgFlags
method string
arg interface{} // can be e.g. tuple(arg1, arg2, ...)
}
type msgFlags int64
const (
msgAsync msgFlags = 1 // message does not need a reply
msgExcept = 2 // exception was raised on remote side (ZEO5)
)
// encoding represents messages encoding.
type encoding byte // Z - pickles, M - msgpack
// ---- message encode/decode ----
// ---- message encode/decode ↔ packet ----
// pktEncode encodes message into raw packet.
func (e encoding) pktEncode(m msg) *pktBuf {
......@@ -90,6 +113,7 @@ func pktEncodeM(m msg) *pktBuf {
// arg
// it is interface{} - use shamaton/msgpack since msgp does not handle
// arbitrary interfaces well.
/*
// XXX shamaton/msgpack encodes tuple(nil) as nil, not empty tuple
// XXX move to zLink.Call?
arg := m.arg
......@@ -98,6 +122,8 @@ func pktEncodeM(m msg) *pktBuf {
arg = tuple{}
}
dataArg, err := msgpack.Encode(arg)
*/
dataArg, err := msgpack.Encode(m.arg)
if err != nil {
panic(err) // all our types are expected to be supported by msgpack
}
......@@ -118,7 +144,7 @@ func pktDecodeZ(pkb *pktBuf) (msg, error) {
return m, err
}
tpkt, ok := xpkt.(pickle.Tuple) // XXX also list?
tpkt, ok := xpkt.(pickle.Tuple) // XXX also list? -> Z.asTuple(xpkt)
if !ok {
return m, derrf("got %T; expected tuple", xpkt)
}
......@@ -247,9 +273,9 @@ func derrf(format string, argv ...interface{}) error {
}
// ---- encode/decode for data types ----
// ---- retrieve/put objects from/into msg.arg ----
// xuint64Unpack tries to decode packed 8-byte string as bigendian uint64
// xuint64Unpack tries to retrieve packed 8-byte string as bigendian uint64.
func (e encoding) xuint64Unpack(xv interface{}) (uint64, bool) {
switch e {
default:
......@@ -279,7 +305,7 @@ func (e encoding) xuint64Unpack(xv interface{}) (uint64, bool) {
}
// xuint64Pack packs v into big-endian 8-byte string
// xuint64Pack packs v into big-endian 8-byte string.
func (e encoding) xuint64Pack(v uint64) interface{} {
var b [8]byte
binary.BigEndian.PutUint64(b[:], v)
......@@ -298,26 +324,54 @@ func (e encoding) xuint64Pack(v uint64) interface{} {
}
}
// Tid converts tid into corresponding object appropriate for encoding e.
func (e encoding) Tid(tid zodb.Tid) interface{} {
return e.xuint64Pack(uint64(tid))
}
// Oid converts oid into corresponding object appropriate for encoding e.
func (e encoding) Oid(oid zodb.Oid) interface{} {
return e.xuint64Pack(uint64(oid))
}
// asTid tries to retrieve Tid from corresponding object decoded via encoding e.
func (e encoding) asTid(xv interface{}) (zodb.Tid, bool) {
v, ok := e.xuint64Unpack(xv)
return zodb.Tid(v), ok
}
// asOid tries to retrieve Oid from corresponding object decoded via encoding e.
func (e encoding) asOid(xv interface{}) (zodb.Oid, bool) {
v, ok := e.xuint64Unpack(xv)
return zodb.Oid(v), ok
}
// asTuple tries to decode object as tuple. XXX
// tuple represents py tuple.
type tuple []interface{}
// Tuple converts t into corresponding object appropriate for encoding e.
func (e encoding) Tuple(t tuple) interface{} {
switch e {
default:
panic("bug")
case 'Z':
// pickle: -> pickle.Tuple
return pickle.Tuple(t)
case 'M':
// msgpack: -> leave as tuple
// However shamaton/msgpack encodes tuple(nil) as nil, not empty tuple
// so nil -> tuple{}
if t == nil {
t = tuple{}
}
return t
}
}
// asTuple tries to retrieve tuple from corresponding object decoded via encoding e.
func (e encoding) asTuple(xt interface{}) (tuple, bool) {
switch e {
default:
......@@ -341,7 +395,7 @@ func (e encoding) asTuple(xt interface{}) (tuple, bool) {
}
}
// asBytes tries to decode object as raw bytes.
// asBytes tries to retrieve bytes from corresponding object decoded via encoding e.
func (e encoding) asBytes(xb interface{}) ([]byte, bool) {
switch e {
default:
......@@ -362,7 +416,7 @@ func (e encoding) asBytes(xb interface{}) ([]byte, bool) {
}
}
// asString tries to decode object as string.
// asString tries to retrieve string from corresponding object decoded via encoding e.
func (e encoding) asString(xs interface{}) (string, bool) {
switch e {
default:
......
......@@ -229,23 +229,6 @@ func (zl *zLink) serveRecv1(pkb *pktBuf) error {
return nil
}
// tuple represents py tuple.
type tuple []interface{}
// msg represents 1 message.
type msg struct {
msgid int64
flags msgFlags
method string
arg interface{} // can be e.g. tuple(arg1, arg2, ...)
}
type msgFlags int64
const (
msgAsync msgFlags = 1 // message does not need a reply
msgExcept = 2 // exception was raised on remote side (ZEO5)
)
// Call makes 1 RPC call to server, waits for reply and returns it.
func (zl *zLink) Call(ctx context.Context, method string, argv ...interface{}) (reply msg, err error) {
......@@ -273,7 +256,7 @@ func (zl *zLink) Call(ctx context.Context, method string, argv ...interface{}) (
msgid: callID,
flags: 0,
method: method,
arg: tuple(argv), // XXX zl.enc.Tuple(argv...)
arg: zl.enc.Tuple(argv),
})
// ok, pkt is ready to go
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment