Commit a16e8d52 authored by Kirill Smelkov's avatar Kirill Smelkov

X teach golang to access ZEO

For the reference on deco (performance, frequence not fixed):

	name                           time/object
	deco/fs1/zhash.py              15.8µs ± 2%
	deco/fs1/zhash.py-P16           116µs ±12%
	deco/fs1/zhash.go              2.60µs ± 0%
	deco/fs1/zhash.go+prefetch128  3.70µs ±11%
	deco/fs1/zhash.go-P16          13.4µs ±43%
	deco/zeo/zhash.py               316µs ± 7%
	deco/zeo/zhash.py-P16          2.68ms ± 7%
	deco/zeo/zhash.go               111µs ± 2%
	deco/zeo/zhash.go+prefetch128  57.7µs ± 2%
	deco/zeo/zhash.go-P16          1.23ms ± 5%
parent 39a77e3b
......@@ -1129,7 +1129,11 @@ var ErrPktTooBig = errors.New("packet too big")
// recvPkt receives raw packet from peer.
//
// rx error, if any, is returned as is and is analyzed in serveRecv
//
// XXX dup in ZEO.
func (nl *NodeLink) recvPkt() (*pktBuf, error) {
// FIXME if rxbuf is non-empty - first look there for header and then if
// we know size -> allocate pkt with that size.
pkt := pktAlloc(4096)
// len=4K but cap can be more since pkt is from pool - use all space to buffer reads
// XXX vvv -> pktAlloc() ?
......
......@@ -65,6 +65,7 @@ func (e *_HandshakeError) Error() string {
}
func handshake(ctx context.Context, conn net.Conn, version uint32) (err error) {
// XXX simplify -> errgroup
errch := make(chan error, 2)
// tx handshake word
......
......@@ -1047,11 +1047,6 @@ zbench() {
nrun zhash.py --check=$zhashok --bench=$topic/%s --$zhashfunc $url
echo -e "\n# ${Npar} clients in parallel"
nrunpar zhash.py --check=$zhashok --bench=$topic/%s-P$Npar --$zhashfunc $url
if [[ $url == zeo://* ]]; then
echo "(skipping zhash.go on ZEO -- Cgo does not support zeo:// protocol)"
return
fi
echo
zbench_go $url $topic $zhashok
}
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Utilities related to python pickles
package pickletools
import (
"math/big"
)
// Xint64 tries to convert unpickled value to int64.
//
// (ogórek decodes python long as big.Int)
func Xint64(xv interface{}) (v int64, ok bool) {
switch v := xv.(type) {
case int64:
return v, true
case *big.Int:
if v.IsInt64() {
return v.Int64(), true
}
}
return 0, false
}
......@@ -28,13 +28,13 @@ import (
"fmt"
"io"
"io/ioutil"
"math/big"
"os"
"path/filepath"
"strconv"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1/fsb"
"lab.nexedi.com/kirr/neo/go/zodb/internal/pickletools"
pickle "github.com/kisielk/og-rek"
......@@ -220,20 +220,6 @@ func (e *IndexLoadError) Error() string {
return s
}
// xint64 tries to convert unpickled value to int64
func xint64(xv interface{}) (v int64, ok bool) {
switch v := xv.(type) {
case int64:
return v, true
case *big.Int:
if v.IsInt64() {
return v.Int64(), true
}
}
return 0, false
}
// LoadIndex loads index from a reader
func LoadIndex(r io.Reader) (fsi *Index, err error) {
var picklePos int64
......@@ -255,7 +241,7 @@ func LoadIndex(r io.Reader) (fsi *Index, err error) {
if err != nil {
return nil, err
}
topPos, ok := xint64(xtopPos)
topPos, ok := pickletools.Xint64(xtopPos)
if !ok {
return nil, fmt.Errorf("topPos is %T:%v (expected int64)", xtopPos, xtopPos)
}
......
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package zeo provides simple ZEO client
package zeo
import (
"context"
"encoding/binary"
"fmt"
"net/url"
"sync"
pickle "github.com/kisielk/og-rek"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/zodb"
)
type zeo struct {
srv *zLink
// state we get from server by way of server notificatons.
mu sync.Mutex
lastTid zodb.Tid
url string // we were opened via this
}
func (z *zeo) LastTid(ctx context.Context) (zodb.Tid, error) {
z.mu.Lock()
defer z.mu.Unlock()
return z.lastTid, nil
}
func (z *zeo) Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, error) {
// defer func() ...
buf, serial, err := z._Load(ctx, xid)
if err != nil {
err = &zodb.OpError{URL: z.URL(), Op: "load", Args: xid, Err: err}
}
return buf, serial, err
}
func (z *zeo) _Load(ctx context.Context, xid zodb.Xid) (*mem.Buf, zodb.Tid, error) {
rpc := z.rpc("loadBefore")
xres, err := rpc.call(ctx, oidPack(xid.Oid), tidPack(xid.At+1)) // XXX at2Before
if err != nil {
return nil, 0, err
}
// (data, serial, next_serial | None)
res, ok := xres.(pickle.Tuple)
if !ok || len(res) != 3 {
return nil, 0, rpc.ereplyf("got %#v; expect 3-tuple", res)
}
data, ok1 := res[0].(string)
serial, ok2 := tidUnpack(res[1])
// next_serial (res[2]) - just ignore
if !(ok1 && ok2) {
return nil, 0, rpc.ereplyf("got (%T, %v, %T); expect (str, tid, .)", res...)
}
return &mem.Buf{Data: mem.Bytes(data)}, serial, nil
}
func (z *zeo) Iterate(ctx context.Context, tidMin, tidMax zodb.Tid) zodb.ITxnIterator {
panic("TODO")
}
// errorUnexpectedReply is returned by zLink.Call callers when reply was
// received successfully, but is not what the caller expected.
type errorUnexpectedReply struct {
Addr string
Method string
Err error
}
func (e *errorUnexpectedReply) Error() string {
return fmt.Sprintf("%s: call %s: unexpected reply: %s", e.Addr, e.Method, e.Err)
}
func ereplyf(addr, method, format string, argv ...interface{}) *errorUnexpectedReply {
return &errorUnexpectedReply{
Addr: addr,
Method: method,
Err: fmt.Errorf(format, argv...),
}
}
// rpc returns rpc object handy to make calls/create errors
func (z *zeo) rpc(method string) rpc {
return rpc{zl: z.srv, method: method}
}
type rpc struct {
zl *zLink
method string
}
// rpcExcept represents generic exception
type rpcExcept struct {
exc string
argv []interface{}
}
func (r *rpcExcept) Error() string {
return fmt.Sprintf("exception: %s %q", r.exc, r.argv)
}
func (r rpc) call(ctx context.Context, argv ...interface{}) (interface{}, error) {
reply, err := r.zl.Call(ctx, r.method, argv...)
if err != nil {
return nil, err
}
if reply.flags & msgExcept == 0 {
return reply.arg, nil
}
// exception - let's decode it
// ('type', (arg1, arg2, arg3, ...))
texc, ok := reply.arg.(pickle.Tuple)
if !ok || len(texc) != 2 {
return nil, r.ereplyf("except: got %#v; expect 2-tuple", reply.arg)
}
exc, ok1 := texc[0].(string)
argv, ok2 := texc[1].(pickle.Tuple)
if !(ok1 && ok2) {
return nil, r.ereplyf("except: got (%T, %T); expect (str, tuple)", texc...)
}
// translate well-known exceptions
switch exc {
case "ZODB.POSException.POSKeyError":
// POSKeyError(oid)
if len(argv) != 1 {
return nil, r.ereplyf("poskeyerror: got %#v; expect 1-tuple", argv...)
}
oid, ok := oidUnpack(argv[0])
if !ok {
return nil, r.ereplyf("poskeyerror: got (%v); expect (oid)", argv[0])
}
// XXX POSKeyError does not allow to distinguish whether it is
// no object at all or object exists and its data was not found
// for tid_before. IOW we cannot translate to zodb.NoDataError
return nil, &zodb.NoObjectError{Oid: oid}
}
return nil, &rpcExcept{exc, argv}
}
func (r rpc) ereplyf(format string, argv ...interface{}) *errorUnexpectedReply {
return ereplyf(r.zl.link.RemoteAddr().String(), r.method, format, argv...)
}
// ---- open ----
func openByURL(ctx context.Context, u *url.URL, opt *zodb.OpenOptions) (_ zodb.IStorageDriver, err error) {
url := u.String()
defer xerr.Contextf(&err, "open %s:", url)
// zeo://host:port/path?storage=...&...
var net xnet.Networker
var addr string
if u.Host != "" {
net = xnet.NetPlain("tcp")
addr = u.Host
} else {
net = xnet.NetPlain("unix")
addr = u.Path
}
storageID := "1"
q := u.Query()
if s := q.Get("storage"); s != "" {
storageID = s
}
if !opt.ReadOnly {
return nil, fmt.Errorf("TODO write mode not implemented")
}
zl, err := dialZLink(ctx, net, addr)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
zl.Close()
}
}()
z := &zeo{srv: zl, url: url}
rpc := z.rpc("register")
xlastTid, err := rpc.call(ctx, storageID, opt.ReadOnly)
if err != nil {
return nil, err
}
lastTid, ok := tidUnpack(xlastTid) // XXX -> xlastTid -> scan
if !ok {
return nil, rpc.ereplyf("got %v; expect tid", xlastTid)
}
z.lastTid = lastTid
//call('get_info') -> {}str->str, ex // XXX can be omitted
/*
{'interfaces': (('ZODB.interfaces', 'IStorageRestoreable'),
('ZODB.interfaces', 'IStorageIteration'),
('ZODB.interfaces', 'IStorageUndoable'),
('ZODB.interfaces', 'IStorageCurrentRecordIteration'),
('ZODB.interfaces', 'IExternalGC'),
('ZODB.interfaces', 'IStorage'),
('zope.interface', 'Interface')),
'length': 2128,
'name': '/home/kirr/src/neo/src/lab.nexedi.com/kirr/neo/go/neo/t/var/wczblk1-8/fs1/data.fs',
'size': 8630075,
'supportsUndo': True,
'supports_record_iternext': True})
*/
return z, nil
}
func (z *zeo) Close() error {
return z.srv.Close()
}
func (z *zeo) URL() string {
return z.url
}
func init() {
zodb.RegisterDriver("zeo", openByURL)
}
// ---- oid/tid packing ----
// xuint64Unpack tries to decode packed 8-byte string as bigendian uint64
func xuint64Unpack(xv interface{}) (uint64, bool) {
s, ok := xv.(string)
if !ok || len(s) != 8 {
return 0, false
}
return binary.BigEndian.Uint64(mem.Bytes(s)), true
}
// xuint64Pack packs v into big-endian 8-byte string
func xuint64Pack(v uint64) string {
var b [8]byte
binary.BigEndian.PutUint64(b[:], v)
return mem.String(b[:])
}
func tidPack(tid zodb.Tid) string {
return xuint64Pack(uint64(tid))
}
func oidPack(oid zodb.Oid) string {
return xuint64Pack(uint64(oid))
}
func tidUnpack(xv interface{}) (zodb.Tid, bool) {
v, ok := xuint64Unpack(xv)
return zodb.Tid(v), ok
}
func oidUnpack(xv interface{}) (zodb.Oid, bool) {
v, ok := xuint64Unpack(xv)
return zodb.Oid(v), ok
}
This diff is collapsed.
......@@ -23,11 +23,12 @@
//
// import _ "lab.nexedi.com/kirr/neo/go/zodb/wks"
//
// and this way automatically link in support for file:// neo:// ... and other
// common storages.
// and this way automatically link in support for file:// zeo:// neo:// ... and
// other common storages.
package wks
import (
_ "lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
_ "lab.nexedi.com/kirr/neo/go/zodb/storage/zeo"
_ "lab.nexedi.com/kirr/neo/go/neo"
)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment