Commit 0696cd64 authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb/zeo: Introduce notion of encoding

Keep information about which message encoding is used on the wire in
encoding type. Make pktDecode/pktEncode and data type conversion
utilities be methods of this type. For now there is only 'pickles'
encoding, but we'll soon introduce 'msgpack'.

Currently not everything related to pickles is localized in encoding -
we'll be moving more bits to encoding in the followup patches.
parent ca7e96e0
......@@ -19,6 +19,7 @@
package zeo
// Protocol for exchanged ZEO messages.
// On the wire messages are encoded via pickles.
// Each message is wrapped into packet with be32 header of whole packet size.
// See https://github.com/zopefoundation/ZEO/blob/5.2.1-20-gcb26281d/doc/protocol.rst for details.
......@@ -35,6 +36,12 @@ import (
)
// msg represents 1 message.
// arg is arbitrary argument(s) passed/received along ZEO call or reply.
//
// for objects in arg user code has to obtain them via encoding.*Unpack() and
// set them via encoding.*Pack() methods that
// convert application-level data into objects properly corresponding to wire
// encoding of messages.
type msg struct {
msgid int64
flags msgFlags
......@@ -48,10 +55,30 @@ const (
msgExcept = 2 // exception was raised on remote side (ZEO5)
)
// encoding represents messages encoding.
type encoding byte // Z - pickles
// ---- message encode/decode ↔ packet ----
// pktEncode encodes message into raw packet.
func pktEncode(m msg) *pktBuf {
func (e encoding) pktEncode(m msg) *pktBuf {
switch e {
case 'Z': return pktEncodeZ(m)
default: panic("bug")
}
}
// pktDecode decodes raw packet into message.
func (e encoding) pktDecode(pkb *pktBuf) (msg, error) {
switch e {
case 'Z': return pktDecodeZ(pkb)
default: panic("bug")
}
}
// pktEncodeZ encodes message into raw Z (pickle) packet.
func pktEncodeZ(m msg) *pktBuf {
pkb := allocPkb()
p := pickle.NewEncoder(pkb)
err := p.Encode(pickle.Tuple{m.msgid, m.flags, m.method, m.arg})
......@@ -61,8 +88,8 @@ func pktEncode(m msg) *pktBuf {
return pkb
}
// pktDecode decodes raw packet into message.
func pktDecode(pkb *pktBuf) (msg, error) {
// pktDecodeZ decodes raw Z (pickle) packet into message.
func pktDecodeZ(pkb *pktBuf) (msg, error) {
var m msg
// must be (msgid, False|0, ".reply", res)
d := pickle.NewDecoder(bytes.NewReader(pkb.Payload()))
......@@ -105,41 +132,53 @@ func derrf(format string, argv ...interface{}) error {
}
// ---- oid/tid packing ----
// ---- retrieve/put objects from/into msg.arg ----
// xuint64Unpack tries to decode packed 8-byte string as bigendian uint64
func xuint64Unpack(xv interface{}) (uint64, bool) {
v, err := pickletools.Xstrbytes8(xv)
if err != nil {
return 0, false
func (e encoding) xuint64Unpack(xv interface{}) (uint64, bool) {
switch e {
default:
panic("bug")
case 'Z':
// pickle: str|bytes
v, err := pickletools.Xstrbytes8(xv)
if err != nil {
return 0, false
}
return v, true
}
return v, true
}
// xuint64Pack packs v into big-endian 8-byte string
//
// XXX do we need to emit bytes instead of str?
func xuint64Pack(v uint64) string {
func (e encoding) xuint64Pack(v uint64) string {
var b [8]byte
binary.BigEndian.PutUint64(b[:], v)
return mem.String(b[:])
switch e {
default:
panic("bug")
case 'Z':
// pickle: -> str XXX do we need to emit bytes instead of str?
return mem.String(b[:])
}
}
func tidPack(tid zodb.Tid) string {
return xuint64Pack(uint64(tid))
func (e encoding) tidPack(tid zodb.Tid) string {
return e.xuint64Pack(uint64(tid))
}
func oidPack(oid zodb.Oid) string {
return xuint64Pack(uint64(oid))
func (e encoding) oidPack(oid zodb.Oid) string {
return e.xuint64Pack(uint64(oid))
}
func tidUnpack(xv interface{}) (zodb.Tid, bool) {
v, ok := xuint64Unpack(xv)
func (e encoding) tidUnpack(xv interface{}) (zodb.Tid, bool) {
v, ok := e.xuint64Unpack(xv)
return zodb.Tid(v), ok
}
func oidUnpack(xv interface{}) (zodb.Oid, bool) {
v, ok := xuint64Unpack(xv)
func (e encoding) oidUnpack(xv interface{}) (zodb.Oid, bool) {
v, ok := e.xuint64Unpack(xv)
return zodb.Oid(v), ok
}
......@@ -63,7 +63,7 @@ func (z *zeo) Sync(ctx context.Context) (head zodb.Tid, err error) {
return zodb.InvalidTid, err
}
head, ok := tidUnpack(xhead)
head, ok := z.link.enc.tidUnpack(xhead)
if !ok {
return zodb.InvalidTid, rpc.ereplyf("got %v; expect tid", xhead)
}
......@@ -79,7 +79,8 @@ func (z *zeo) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial zodb
}()
rpc := z.rpc("loadBefore")
xres, err := rpc.call(ctx, oidPack(xid.Oid), tidPack(xid.At+1)) // XXX at2Before
enc := z.link.enc
xres, err := rpc.call(ctx, enc.oidPack(xid.Oid), enc.tidPack(xid.At+1)) // XXX at2Before
if err != nil {
return nil, 0, err
}
......@@ -91,7 +92,7 @@ func (z *zeo) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial zodb
}
data, ok1 := res[0].(string)
serial, ok2 := tidUnpack(res[1])
serial, ok2 := enc.tidUnpack(res[1])
// next_serial (res[2]) - just ignore
if !(ok1 && ok2) {
......@@ -183,7 +184,7 @@ func (r rpc) excError(exc string, argv []interface{}) error {
return r.ereplyf("poskeyerror: got %#v; expect 1-tuple", argv...)
}
oid, ok := oidUnpack(argv[0])
oid, ok := r.zlink.enc.oidUnpack(argv[0])
if !ok {
return r.ereplyf("poskeyerror: got (%v); expect (oid)", argv[0])
}
......@@ -360,7 +361,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
}
}
lastTid, ok := tidUnpack(xlastTid) // XXX -> xlastTid -> scan
lastTid, ok := zlink.enc.tidUnpack(xlastTid) // XXX -> xlastTid -> scan
if !ok {
return nil, zodb.InvalidTid, rpc.ereplyf("got %v; expect tid", xlastTid)
}
......
......@@ -68,7 +68,8 @@ type zLink struct {
down1 sync.Once
errDown error // error with which the link was shut down
ver string // protocol verision in use (without "Z" or "M" prefix)
ver string // protocol version in use (without "Z" or "M" prefix)
enc encoding // protocol encoding in use (always 'Z')
}
// (called after handshake)
......@@ -135,7 +136,7 @@ func (zl *zLink) serveRecv() {
// serveRecv1 handles 1 incoming packet.
func (zl *zLink) serveRecv1(pkb *pktBuf) error {
// decode packet
m, err := pktDecode(pkb)
m, err := zl.enc.pktDecode(pkb)
if err != nil {
return err
}
......@@ -183,7 +184,7 @@ func (zl *zLink) Call(ctx context.Context, method string, argv ...interface{}) (
zl.callMu.Unlock()
// (msgid, async, method, argv)
pkb := pktEncode(msg{
pkb := zl.enc.pktEncode(msg{
msgid: callID,
flags: 0,
method: method,
......@@ -389,7 +390,8 @@ func handshake(ctx context.Context, conn net.Conn) (_ *zLink, err error) {
// even if server announced it prefers 'M' (msgpack) it will
// accept 'Z' (pickles) as encoding. We always use 'Z'.
//
enc := encoding('Z')
// extract peer version from protocol string and choose actual
// version to use as min(peer, mybest)
ver := proto[1:]
......@@ -413,13 +415,14 @@ func handshake(ctx context.Context, conn net.Conn) (_ *zLink, err error) {
// version selected - now send it back to server as
// corresponding handshake reply.
pkb = allocPkb()
pkb.WriteString("Z" + ver)
pkb.WriteString(fmt.Sprintf("%c%s", enc, ver))
err = zl.sendPkt(pkb)
if err != nil {
return fmt.Errorf("tx: %s", err)
}
zl.ver = ver
zl.enc = enc
close(hok)
return nil
})
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment