Commit 75dc3060 authored by Kirill Smelkov's avatar Kirill Smelkov

X zodb: Goodby XTid; Load always loads `At`; loadSerial is actually not needed

also:

- DataTid -> DataTidHint + 0 if there is no such hint.
parent ba61dad6
......@@ -36,6 +36,7 @@ import (
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/internal/common"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/neo/go/xcommon/task"
......@@ -459,13 +460,11 @@ func (c *Client) _Load(ctx context.Context, xid zodb.Xid) (*zodb.Buf, zodb.Tid,
// FIXME ^^^ slink.CloseAccept after really dialed (not to deadlock if
// S decides to send us something)
req := neo.GetObject{Oid: xid.Oid}
if xid.TidBefore {
req.Serial = neo.INVALID_TID
req.Tid = xid.Tid
} else {
req.Serial = xid.Tid
req.Tid = neo.INVALID_TID
// on the wire it comes as "before", not "at"
req := neo.GetObject{
Oid: xid.Oid,
Tid: common.At2Before(xid.At),
Serial: neo.INVALID_TID,
}
resp := neo.AnswerObject{}
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package common provides internal bits shared in between NEO client and server packages.
package common
import (
"lab.nexedi.com/kirr/neo/go/zodb"
)
// At2Before converts at to before for ZODB load semantics taking edge cases into account.
//
// For most values it is
//
// before = at + 1 ; at < ∞
//
// but at ∞ (zodb.TidMax) it is just
//
// before = at ; at = ∞
func At2Before(at zodb.Tid) (before zodb.Tid) {
if at < zodb.TidMax {
return at + 1
} else {
// XXX do we need to care here also for at > zodb.TidMax (zodb.Tid is currently unsigned)
return zodb.TidMax
}
}
// Before2At is the reverse function to At2Before
func Before2At(before zodb.Tid) (at zodb.Tid) {
if before < zodb.TidMax {
// XXX before = 0 ?
return before - 1
} else {
// XXX before > zodb.TidMax (same as in At2Before) ?
return zodb.TidMax
}
}
......@@ -39,6 +39,7 @@ import (
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/client"
"lab.nexedi.com/kirr/neo/go/neo/internal/common"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/storage/fs1"
......@@ -428,7 +429,7 @@ func TestMasterStorage(t *testing.T) {
// C starts loading first object ...
wg = &errgroup.Group{}
xid1 := zodb.Xid{Oid: 1, XTid: zodb.XTid{Tid: zodb.TidMax, TidBefore: true}}
xid1 := zodb.Xid{Oid: 1, At: zodb.TidMax}
buf1, serial1, err := zstor.Load(bg, xid1)
exc.Raiseif(err)
gox(wg, func() {
......@@ -462,7 +463,7 @@ func TestMasterStorage(t *testing.T) {
// ... -> GetObject(xid1)
tc.Expect(conntx("c:2", "s:3", 3, &neo.GetObject{
Oid: xid1.Oid,
Tid: xid1.Tid,
Tid: common.At2Before(xid1.At),
Serial: neo.INVALID_TID,
}))
tc.Expect(conntx("s:3", "c:2", 3, &neo.AnswerObject{
......@@ -503,32 +504,24 @@ func TestMasterStorage(t *testing.T) {
t.Fatalf("ziter.NextData: %v", err)
}
for _, tidBefore := range []bool{false, true} {
xid := zodb.Xid{Oid: datai.Oid} // {=,<}tid:oid
xid.Tid = datai.Tid
xid.TidBefore = tidBefore
if tidBefore {
xid.Tid++
}
// TODO also test GetObject(tid=ø, serial=...) which originate from loadSerial on py side
xid := zodb.Xid{Oid: datai.Oid, At: datai.Tid}
buf, tid, err := C.Load(bg, xid)
if datai.Data != nil {
if !(bytes.Equal(buf.Data, datai.Data) && tid == datai.Tid && err == nil) {
t.Fatalf("load: %v:\nhave: %v %v %q\nwant: %v nil %q",
xid, tid, err, buf.Data, datai.Tid, datai.Data)
}
} else {
// deleted
errWant := &zodb.ErrXidMissing{xid}
if !(buf == nil && tid == 0 && reflect.DeepEqual(err, errWant)) {
t.Fatalf("load: %v:\nhave: %v, %#v, %#v\nwant: %v, %#v, %#v",
xid, tid, err, buf, zodb.Tid(0), errWant, []byte(nil))
}
buf, serial, err := C.Load(bg, xid)
if datai.Data != nil {
if !(bytes.Equal(buf.Data, datai.Data) && serial == datai.Tid && err == nil) {
t.Fatalf("load: %v:\nhave: %v %v %q\nwant: %v nil %q",
xid, serial, err, buf.Data, datai.Tid, datai.Data)
}
} else {
// deleted
errWant := &zodb.ErrXidMissing{xid}
if !(buf == nil && serial == 0 && reflect.DeepEqual(err, errWant)) {
t.Fatalf("load: %v:\nhave: %v, %#v, %#v\nwant: %v, %#v, %#v",
xid, serial, err, buf, zodb.Tid(0), errWant, []byte(nil))
}
}
}
}
......@@ -610,9 +603,7 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f
b.Fatal(err)
}
xid1 := zodb.Xid{Oid: 1}
xid1.Tid = zodb.TidMax
xid1.TidBefore = true
xid1 := zodb.Xid{Oid: 1, At: zodb.TidMax}
buf1, serial1, err := zstor.Load(ctx, xid1)
if err != nil {
......
......@@ -31,6 +31,7 @@ import (
"github.com/pkg/errors"
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo/internal/common"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
"lab.nexedi.com/kirr/neo/go/xcommon/task"
......@@ -542,22 +543,29 @@ func (stor *Storage) serveClient1(ctx context.Context, req neo.Msg) (resp neo.Ms
case *neo.GetObject:
xid := zodb.Xid{Oid: req.Oid}
if req.Serial != neo.INVALID_TID {
xid.Tid = req.Serial
xid.TidBefore = false
xid.At = req.Serial
} else {
xid.Tid = req.Tid
xid.TidBefore = true
xid.At = common.Before2At(req.Tid)
}
buf, tid, err := stor.zstor.Load(ctx, xid)
buf, serial, err := stor.zstor.Load(ctx, xid)
if err != nil {
// translate err to NEO protocol error codes
return neo.ErrEncode(err)
}
// for loadSerial - check we have exact hit - else "nodata"
if req.Serial != neo.INVALID_TID {
if serial != req.Serial {
// XXX actually show in error it was strict "=" load
return neo.ErrEncode(&zodb.ErrXidMissing{xid})
}
}
return &neo.AnswerObject{
Oid: xid.Oid,
Serial: tid,
Serial: serial,
Compression: false,
Data: buf,
......
......@@ -176,7 +176,6 @@ func zhash(ctx context.Context, url string, h hasher, useprefetch bool, bench, c
if err != nil {
return err
}
before := lastTid + 1 // XXX overflow ?
if false {
defer profile.Start(profile.TraceProfile).Stop()
......@@ -190,7 +189,7 @@ func zhash(ctx context.Context, url string, h hasher, useprefetch bool, bench, c
nread := 0
loop:
for {
xid := zodb.Xid{Oid: oid, XTid: zodb.XTid{Tid: before, TidBefore: true}}
xid := zodb.Xid{Oid: oid, At: lastTid}
if xid.Oid % nprefetch == 0 {
prefetchBlk(ctx, xid)
}
......
......@@ -43,9 +43,9 @@ type Cache struct {
mu sync.RWMutex
// cache is fully synchronized with storage for transactions with tid < before.
// XXX clarify ^^^ (it means if revCacheEntry.before=∞ it is Cache.before)
before zodb.Tid
// cache is fully synchronized with storage for transactions with tid <= head.
// XXX clarify ^^^ (it means if revCacheEntry.head=∞ it is Cache.head)
head zodb.Tid
entryMap map[zodb.Oid]*oidCacheEntry // oid -> oid's cache entries
......@@ -64,7 +64,7 @@ type oidCacheEntry struct {
sync.Mutex
// cached revisions in ascending order
// [i].serial < [i].before <= [i+1].serial < [i+1].before
// [i].serial <= [i].head < [i+1].serial <= [i+1].head
//
// NOTE ^^^ .serial = 0 while loading is in progress
// NOTE ^^^ .serial = 0 if .err != nil
......@@ -76,21 +76,21 @@ type revCacheEntry struct {
parent *oidCacheEntry // oidCacheEntry holding us
inLRU lruHead // in Cache.lru; protected by Cache.gcMu
// we know that loadBefore(oid, .before) will give this .serial:oid.
// we know that load(oid, .head) will give this .serial:oid.
//
// this is only what we currently know - not necessarily covering
// whole correct range - e.g. if oid revisions in db are 1 and 5 if we
// query db with loadBefore(3) on return we'll get serial=1 and
// remember .before as 3. But for loadBefore(4) we have to redo
// query db with load(@3) on return we'll get serial=1 and
// remember .head as 3. But for load(@4) we have to redo
// database query again.
//
// if .before=∞ here, that actually means before is cache.before
// ( this way we do not need to bump .before to next tid in many
// if .head=∞ here, that actually means head is cache.head
// ( this way we do not need to bump .head to next tid in many
// unchanged cache entries when a transaction invalidation comes )
//
// .before can be > cache.before and still finite - that represents a
// case when loadBefore with tid > cache.before was called.
before zodb.Tid
// .head can be > cache.head and still finite - that represents a
// case when load with tid > cache.head was called.
head zodb.Tid
// loading result: object (buf, serial) or error
buf *zodb.Buf
......@@ -166,13 +166,6 @@ func (c *Cache) Load(ctx context.Context, xid zodb.Xid) (buf *zodb.Buf, serial z
return nil, 0, rce.userErr(xid)
}
// for loadSerial - check we have exact hit - else "nodata"
if !xid.TidBefore {
if rce.serial != xid.Tid {
return nil, 0, &zodb.ErrXidMissing{xid}
}
}
rce.buf.XIncref()
return rce.buf, rce.serial, nil
}
......@@ -199,26 +192,15 @@ func (c *Cache) Prefetch(ctx context.Context, xid zodb.Xid) {
// lookupRCE returns revCacheEntry corresponding to xid.
//
// If xid indicates loadSerial query (xid.TidBefore=false) then rce will be
// lookuped and eventually loaded as if it was queried with <(serial+1).
// It is caller responsibility to check loadSerial cases for exact hits after
// rce will become ready.
//
// rceNew indicates whether rce is new and so loading on it has not been
// initiated yet. If so the caller should proceed to loading rce via loadRCE.
func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) {
// loadSerial(serial) -> loadBefore(serial+1)
before := xid.Tid
if !xid.TidBefore {
before++ // XXX overflow
}
// oid -> oce (oidCacheEntry) ; create new empty oce if not yet there
// exit with oce locked and cache.before read consistently
// exit with oce locked and cache.syncedTo read consistently
c.mu.RLock()
oce := c.entryMap[xid.Oid]
cacheBefore := c.before
cacheHead := c.head
if oce != nil {
oce.Lock()
......@@ -232,51 +214,51 @@ func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) {
oce = &oidCacheEntry{}
c.entryMap[xid.Oid] = oce
}
cacheBefore = c.before // reload c.before because we relocked the cache
cacheHead = c.head // reload c.head because we relocked the cache
oce.Lock()
c.mu.Unlock()
}
// oce, before -> rce (revCacheEntry)
// oce, at -> rce (revCacheEntry)
l := len(oce.rcev)
i := sort.Search(l, func(i int) bool {
before_i := oce.rcev[i].before
if before_i == zodb.TidMax {
before_i = cacheBefore
head_i := oce.rcev[i].head
if head_i == zodb.TidMax {
head_i = cacheHead
}
return before <= before_i
return xid.At <= head_i
})
switch {
// not found - before > max(rcev.before) - insert new max entry
// not found - at > max(rcev.head) - insert new max entry
case i == l:
rce = oce.newRevEntry(i, before)
if rce.before == cacheBefore {
rce = oce.newRevEntry(i, xid.At)
if rce.head == cacheHead {
// FIXME better do this when the entry becomes loaded ?
// XXX vs concurrent invalidations?
rce.before = zodb.TidMax
rce.head = zodb.TidMax
}
rceNew = true
// found:
// before <= rcev[i].before
// before > rcev[i-1].before
// at <= rcev[i].head
// at > rcev[i-1].head
// exact match - we already have entry for this before
case before == oce.rcev[i].before:
// exact match - we already have entry for this at
case xid.At == oce.rcev[i].head:
rce = oce.rcev[i]
// non-exact match:
// - same entry if q(before) ∈ (serial, before]
// - we can also reuse this entry if q(before) < before and err="nodata"
// - same entry if q(at) ∈ [serial, head]
// - we can also reuse this entry if q(at) <= head and err="nodata"
case oce.rcev[i].loaded() && (
(oce.rcev[i].err == nil && oce.rcev[i].serial < before) ||
(isErrNoData(oce.rcev[i].err) && before < oce.rcev[i].before)):
(oce.rcev[i].err == nil && oce.rcev[i].serial <= xid.At) ||
(isErrNoData(oce.rcev[i].err) && xid.At <= oce.rcev[i].head)):
rce = oce.rcev[i]
// otherwise - insert new entry
default:
rce = oce.newRevEntry(i, before)
rce = oce.newRevEntry(i, xid.At)
rceNew = true
}
......@@ -290,10 +272,7 @@ func (c *Cache) lookupRCE(xid zodb.Xid) (rce *revCacheEntry, rceNew bool) {
// loading completion is signalled by closing rce.ready.
func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid zodb.Oid) {
oce := rce.parent
buf, serial, err := c.loader.Load(ctx, zodb.Xid{
Oid: oid,
XTid: zodb.XTid{Tid: rce.before, TidBefore: true},
})
buf, serial, err := c.loader.Load(ctx, zodb.Xid{At: rce.head, Oid: oid})
// normalize buf/serial if it was error
if err != nil {
......@@ -305,16 +284,16 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid zodb.Oid) {
rce.serial = serial
rce.buf = buf
rce.err = err
// verify db gives serial < before
if rce.serial >= rce.before {
rce.errDB(oid, "load(<%v) -> %v", rce.before, serial)
// verify db gives serial <= head
if rce.serial > rce.head {
rce.errDB(oid, "load(@%v) -> %v", rce.head, serial)
}
close(rce.ready)
δsize := rce.buf.Len()
// merge rce with adjacent entries in parent
// ( e.g. loadBefore(3) and loadBefore(4) results in the same data loaded if
// ( e.g. load(@3) and load(@4) results in the same data loaded if
// there are only revisions with serials 1 and 5 )
oce.Lock()
i := oce.find(rce)
......@@ -391,7 +370,7 @@ func (c *Cache) loadRCE(ctx context.Context, rce *revCacheEntry, oid zodb.Oid) {
//
// both prev and next must be already loaded.
// prev and next must come adjacent to each other in parent.rcev with
// prev.before < next.before .
// prev.head < next.head .
//
// cur must be one of either prev or next and indicates which rce is current
// and so may be adjusted with consistency check error.
......@@ -406,31 +385,31 @@ func tryMerge(prev, next, cur *revCacheEntry, oid zodb.Oid) bool {
// can merge if consistent if
// (if merging)
//
// Pok Nok Ns < Pb Ps = Ns
// Pe Nok Ns < Pb Pe != "nodata" (e.g. it was IO loading error for P)
// Pok Nok Ns <= Ph Ps = Ns
// Pe Nok Ns <= Ph Pe != "nodata" (e.g. it was IO loading error for P)
// Pok Ne ---
// Ne Pe (Pe="nodata") && (Ne="nodata") -> XXX vs deleteObject?
// -> let deleted object actually read
// -> as special non-error value
//
// b - before
// h - head
// s - serial
// e - error
if next.err == nil && next.serial < prev.before {
if next.err == nil && next.serial <= prev.head {
// drop prev
prev.parent.del(prev)
// check consistency
switch {
case prev.err == nil && prev.serial != next.serial:
cur.errDB(oid, "load(<%v) -> %v; load(<%v) -> %v",
prev.before, prev.serial, next.before, next.serial)
cur.errDB(oid, "load(@%v) -> %v; load(@%v) -> %v",
prev.head, prev.serial, next.head, next.serial)
case prev.err != nil && !isErrNoData(prev.err):
if cur.err == nil {
cur.errDB(oid, "load(<%v) -> %v; load(<%v) -> %v",
prev.before, prev.err, next.before, next.serial)
cur.errDB(oid, "load(@%v) -> %v; load(@%v) -> %v",
prev.head, prev.err, next.head, next.serial)
}
}
......@@ -532,13 +511,13 @@ func isErrNoData(err error) bool {
return true
}
// newRevEntry creates new revCacheEntry with .before and inserts it into .rcev @i.
// newRevEntry creates new revCacheEntry with .head and inserts it into .rcev @i.
// (if i == len(oce.rcev) - entry is appended)
func (oce *oidCacheEntry) newRevEntry(i int, before zodb.Tid) *revCacheEntry {
func (oce *oidCacheEntry) newRevEntry(i int, head zodb.Tid) *revCacheEntry {
rce := &revCacheEntry{
parent: oce,
serial: 0,
before: before,
head: head,
ready: make(chan struct{}),
}
rce.inLRU.Init() // initially not on Cache.lru list
......@@ -596,7 +575,7 @@ func (rce *revCacheEntry) loaded() bool {
// userErr returns error that, if any, needs to be returned to user from Cache.Load
//
// ( ErrXidMissing contains xid for which it is missing. In cache we keep such
// xid with max .before but users need to get ErrXidMissing with their own query )
// xid with max .head but users need to get ErrXidMissing with their own query )
func (rce *revCacheEntry) userErr(xid zodb.Xid) error {
switch e := rce.err.(type) {
case *zodb.ErrXidMissing:
......
This diff is collapsed.
......@@ -168,13 +168,8 @@ func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (buf *zodb.Buf, tid
}
func (fs *FileStorage) _Load(dh *DataHeader, xid zodb.Xid) (*zodb.Buf, zodb.Tid, error) {
tidBefore := xid.XTid.Tid
if !xid.XTid.TidBefore {
tidBefore++ // XXX recheck this is ok wrt overflow
}
// search backwards for when we first have data record with tid satisfying xid.XTid
for dh.Tid >= tidBefore {
// search backwards for when we first have data record with tid satisfying xid.At
for {
err := dh.LoadPrevRev(fs.file)
if err != nil {
if err == io.EOF {
......@@ -186,11 +181,10 @@ func (fs *FileStorage) _Load(dh *DataHeader, xid zodb.Xid) (*zodb.Buf, zodb.Tid,
return nil, 0, err
}
}
// found dh.Tid < tidBefore; check it really satisfies xid.XTid
if !xid.XTid.TidBefore && dh.Tid != xid.XTid.Tid {
return nil, 0, &zodb.ErrXidMissing{Xid: xid}
if dh.Tid <= xid.At {
break
}
}
// even if we will scan back via backpointers, the tid returned should
......@@ -274,7 +268,7 @@ func (zi *zIter) NextData(_ context.Context) (*zodb.DataInfo, error) {
zi.datai.Tid = zi.iter.Datah.Tid
// NOTE dh.LoadData() changes dh state while going through backpointers -
// - need to use separate dh because of this
// - need to use separate dh because of this.
zi.dhLoading = zi.iter.Datah
if zi.dataBuf != nil {
zi.dataBuf.Release()
......@@ -286,7 +280,11 @@ func (zi *zIter) NextData(_ context.Context) (*zodb.DataInfo, error) {
}
zi.datai.Data = zi.dataBuf.Data
zi.datai.DataTid = zi.dhLoading.Tid
if zi.dhLoading.Tid != zi.datai.Tid {
zi.datai.DataTidHint = zi.dhLoading.Tid
} else {
zi.datai.DataTidHint = 0
}
return &zi.datai, nil
}
......
......@@ -41,10 +41,10 @@ type dbEntry struct {
// one entry inside transaction
type txnEntry struct {
Header DataHeader
rawData []byte // what is on disk, e.g. it can be backpointer
userData []byte // data client should see on load; `sameAsRaw` means same as RawData
dataTid zodb.Tid // data tid client should see on iter; 0 means same as Header.Tid
Header DataHeader
rawData []byte // what is on disk, e.g. it can be backpointer
userData []byte // data client should see on load; `sameAsRaw` means same as RawData
DataTidHint zodb.Tid // data tid client should see on iter
}
var sameAsRaw = []byte{0}
......@@ -58,15 +58,6 @@ func (txe *txnEntry) Data() []byte {
return data
}
// DataTid returns data tid a client should see
func (txe *txnEntry) DataTid() zodb.Tid {
dataTid := txe.dataTid
if dataTid == 0 {
dataTid = txe.Header.Tid
}
return dataTid
}
// state of an object in the database for some particular revision
type objState struct {
tid zodb.Tid
......@@ -137,29 +128,26 @@ func TestLoad(t *testing.T) {
// XXX check Load finds data at correct .Pos / etc ?
// loadSerial
xid := zodb.Xid{zodb.XTid{txh.Tid, false}, txh.Oid}
// ~ loadSerial
xid := zodb.Xid{txh.Tid, txh.Oid}
checkLoad(t, fs, xid, objState{txh.Tid, txe.Data()})
// loadBefore
xid = zodb.Xid{zodb.XTid{txh.Tid, true}, txh.Oid}
// ~ loadBefore
xid = zodb.Xid{txh.Tid - 1, txh.Oid}
expect, ok := before[txh.Oid]
if ok {
checkLoad(t, fs, xid, expect)
}
// loadBefore to get current record
xid.Tid += 1
checkLoad(t, fs, xid, objState{txh.Tid, txe.Data()})
before[txh.Oid] = objState{txh.Tid, txe.Data()}
}
}
// loadBefore with TidMax
// load at ∞ with TidMax
// XXX should we get "no such transaction" with at > head?
for oid, expect := range before {
xid := zodb.Xid{zodb.XTid{zodb.TidMax, true}, oid}
xid := zodb.Xid{zodb.TidMax, oid}
checkLoad(t, fs, xid, expect)
}
}
......@@ -268,8 +256,8 @@ func testIterate(t *testing.T, fs *FileStorage, tidMin, tidMax zodb.Tid, expectv
dataErrorf("data mismatch:\nhave %q\nwant %q", datai.Data, txe.Data())
}
if datai.DataTid != txe.DataTid() {
dataErrorf("data tid mismatch: have %v; want %v", datai.DataTid, txe.DataTid())
if datai.DataTidHint != txe.DataTidHint {
dataErrorf("data tid hint mismatch: have %v; want %v", datai.DataTidHint, txe.DataTidHint)
}
}
}
......
......@@ -119,11 +119,11 @@ def main():
datatid = "/* deleted */ 0"
else:
data = "[]byte(%s)" % escapeqq(drec.data)
datatid = hex64(drec.data_txn)
datatid = "/* copy from */ " + hex64(drec.data_txn)
else:
rawdata = drec.data
data = "/* same as ^^^ */ sameAsRaw"
datatid = "/* same as ^^^ */ 0"
datatid = "/* no copy */ 0"
emit("\t\t\t\t[]byte(%s)," % escapeqq(rawdata))
emit("\t\t\t\t%s," % data)
......
......@@ -61,60 +61,26 @@ func (oid Oid) XFmtString(b []byte) []byte {
return xfmt.AppendHex016(b, uint64(oid))
}
// bint converts bool to int with true => 1; false => 0.
//
// XXX place = ?
func bint(b bool) int {
if b {
return 1
} else {
return 0
}
}
// String converts xtid to string.
//
// Default xtid string representation is:
//
// - "=" or "<" character depending on whether xtid represents exact or "tid before" query
// - tid
//
// e.g.
//
// =0285cbac258bf266 - exactly 0285cbac258bf266
// <0285cbac258bf266 - before 0285cbac258bf266
//
// See also: ParseXTid.
func (xtid XTid) String() string {
// XXX also print "tid:" prefix ?
return fmt.Sprintf("%c%v", "=<"[bint(xtid.TidBefore)], xtid.Tid)
}
// String converts xid to string.
//
// Default xid string representation is:
//
// - string of xtid
// - string of at
// - ":"
// - string of oid
//
// e.g.
//
// =0285cbac258bf266:0000000000000001 - oid 1 at exactly 0285cbac258bf266 transaction
// <0285cbac258bf266:0000000000000001 - oid 1 at first newest transaction changing it with tid < 0285cbac258bf266
// 0285cbac258bf266:0000000000000001 - oid 1 at first newest transaction changing it with tid <= 0285cbac258bf266
//
// See also: ParseXid.
func (xid Xid) String() string {
return xid.XTid.String() + ":" + xid.Oid.String()
return xid.At.String() + ":" + xid.Oid.String()
}
/* TODO reenable?
func (xtid XTid) XFmtString(b []byte) []byte {
b .C("=<"[bint(xtid.TidBefore)]) .V(xtid.Tid)
}
func (xid Xid) XFmtString(b xfmt.Buffer) xfmt.Buffer {
b .V(xid.XTid) .C(':') .V(xid.Oid)
b .V(xid.At) .C(':') .V(xid.Oid)
}
*/
......@@ -151,55 +117,23 @@ func ParseOid(s string) (Oid, error) {
return Oid(x), err
}
// ParseXTid parses xtid from string.
//
// See also: XTid.String .
func ParseXTid(s string) (XTid, error) {
if len(s) < 1 {
goto Error
}
{
var tidBefore bool
switch s[0] {
case '<':
tidBefore = true
case '=':
tidBefore = false
default:
goto Error
}
tid, err := ParseTid(s[1:])
if err != nil {
goto Error
}
return XTid{tid, tidBefore}, nil
}
Error:
return XTid{}, fmt.Errorf("xtid %q invalid", s)
}
// ParseXid parses xid from string.
//
// See also: Xid.String .
func ParseXid(s string) (Xid, error) {
xtids, oids, err := xstrings.Split2(s, ":")
ats, oids, err := xstrings.Split2(s, ":")
if err != nil {
goto Error
}
{
xtid, err1 := ParseXTid(xtids)
at, err1 := ParseTid(ats)
oid, err2 := ParseOid(oids)
if err1 != nil || err2 != nil {
goto Error
}
return Xid{xtid, oid}, nil
return Xid{at, oid}, nil
}
Error:
......
......@@ -45,30 +45,11 @@ func TestParseHex64(t *testing.T) {
}
}
func TestParseXTid(t *testing.T) {
var testv = []struct {in string; xtid XTid; estr string} {
{"", XTid{}, `xtid "" invalid`},
{"a", XTid{}, `xtid "a" invalid`},
{"0123456789abcdef", XTid{}, `xtid "0123456789abcdef" invalid`}, // XXX or let it be < by default ?
{"z0123456789abcdef", XTid{}, `xtid "z0123456789abcdef" invalid`},
{"=0123456789abcdef", XTid{0x0123456789abcdef, false}, ""},
{"<0123456789abcdef", XTid{0x0123456789abcdef, true}, ""},
}
for _, tt := range testv {
xtid, err := ParseXTid(tt.in)
if !(xtid == tt.xtid && estr(err) == tt.estr) {
t.Errorf("parsextid: %v: test error:\nhave: %v %q\nwant: %v %q",
tt.in, xtid, err, tt.xtid, tt.estr)
}
}
}
func TestParseXid(t *testing.T) {
var testv = []struct {in string; xid Xid; estr string} {
{"", Xid{}, `xid "" invalid`},
{"a", Xid{}, `xid "a" invalid`},
{"0123456789abcdef", Xid{}, `xid "0123456789abcdef" invalid`}, // XXX or let it be < by default ?
{"0123456789abcdef", Xid{}, `xid "0123456789abcdef" invalid`},
{"z0123456789abcdef", Xid{}, `xid "z0123456789abcdef" invalid`},
{"=0123456789abcdef", Xid{}, `xid "=0123456789abcdef" invalid`},
{"<0123456789abcdef", Xid{}, `xid "<0123456789abcdef" invalid`},
......@@ -76,8 +57,9 @@ func TestParseXid(t *testing.T) {
{"=0123456789abcdef|fedcba9876543210", Xid{}, `xid "=0123456789abcdef|fedcba9876543210" invalid`},
{"<0123456789abcdef|fedcba9876543210", Xid{}, `xid "<0123456789abcdef|fedcba9876543210" invalid`},
{"=0123456789abcdef:fedcba9876543210", Xid{XTid{0x0123456789abcdef, false}, 0xfedcba9876543210}, ""},
{"<0123456789abcdef:fedcba9876543210", Xid{XTid{0x0123456789abcdef, true}, 0xfedcba9876543210}, ""},
{"=0123456789abcdef:fedcba9876543210", Xid{}, `xid "=0123456789abcdef:fedcba9876543210" invalid`},
{"<0123456789abcdef:fedcba9876543210", Xid{}, `xid "<0123456789abcdef:fedcba9876543210" invalid`},
{"0123456789abcdef:fedcba9876543210", Xid{0x0123456789abcdef, 0xfedcba9876543210}, ""},
}
for _, tt := range testv {
......
......@@ -34,10 +34,12 @@ import (
// Tid is transaction identifier.
//
// In ZODB transaction identifiers are unique 64-bit integers connected to time
// when corresponding transaction was created.
// In ZODB transaction identifiers are unique 64-bit integers corresponding to
// time when transaction in question was committed.
//
// See also: XTid.
// This way tid can also be used to specify whole database state constructed
// by all cumulated transaction changes from database beginning up to, and
// including, transaction specified by tid.
type Tid uint64
// ZODB/py defines maxtid to be max signed int64 since Jun 7 2016:
......@@ -49,13 +51,29 @@ const TidMax Tid = 1<<63 - 1 // 0x7fffffffffffffff
// Oid is object identifier.
//
// In ZODB objects are uniquely identified by 64-bit integer.
// Every object can have several revisions - each committed in different transaction.
// An object can have several revisions - each committed in different transaction.
// The combination of object identifier and particular transaction (serial)
// uniquely addresses corresponding data record.
//
// See also: Xid.
type Oid uint64
// Xid is "extended" oid - that fully specifies object and query for its revision.
//
// At specifies whole database state at which object identified with Oid should
// be looked up. The object revision is taken from latest transaction modifying
// the object with tid <= At.
//
// Note that Xids are not unique - the same object revision can be addressed
// with several xids.
//
// See also: Tid, Oid.
type Xid struct {
At Tid
Oid Oid
}
// TxnInfo is metadata information about one transaction.
type TxnInfo struct {
Tid Tid
......@@ -75,11 +93,16 @@ type DataInfo struct {
Tid Tid
Data []byte // nil means: deleted XXX -> *Buf ?
// original tid data was committed at (e.g. in case of undo)
// DataTidHint is optional hint from a storage that the same data was
// already originally committed in earlier transaction, for example in
// case of undo. It is 0 if there is no such hint.
//
// FIXME we don't really need this and this unnecessarily constraints interfaces.
// originates from: https://github.com/zopefoundation/ZODB/commit/2b0c9aa4
DataTid Tid
// Storages are not obliged to provide this hint, and in particular it
// is valid for a storage to always return this as zero.
//
// In ZODB/py world this originates from
// https://github.com/zopefoundation/ZODB/commit/2b0c9aa4.
DataTidHint Tid
}
// TxnStatus represents status of a transaction
......@@ -92,22 +115,6 @@ const (
)
// XTid is "extended" transaction identifier.
//
// It defines a transaction for oid lookup - either exactly by serial, or by < beforeTid.
type XTid struct {
Tid
TidBefore bool // XXX merge into Tid itself (high bit) ?
}
// Xid is "extended" oid = oid + serial/beforeTid, completely specifying object address query.
type Xid struct {
XTid
Oid
}
// XXX add XidBefore() and XidSerial() as syntax convenience?
// ---- interfaces ----
// ErrOidMissing is an error which tells that there is no such oid in the database at all
......@@ -123,7 +130,7 @@ func (e ErrOidMissing) Error() string {
}
// ErrXidMissing is an error which tells that oid exists in the database,
// but there is no its revision satisfying xid.XTid search criteria.
// but there is no its revision satisfying xid.At search criteria.
type ErrXidMissing struct {
Xid Xid
}
......@@ -132,20 +139,17 @@ func (e *ErrXidMissing) Error() string {
return fmt.Sprintf("%v: no matching data record found", e.Xid)
}
// IStorage is the interface provided by ZODB storages
// IStorage is the interface provided when a ZODB storage is opened
type IStorage interface {
// URL returns URL of this storage
URL() string
// XXX also +StorageName() with storage driver name?
// Close closes storage
Close() error
// LastTid returns the id of the last committed transaction.
//
// If no transactions have been committed yet, LastTid returns Tid zero value.
// If no transactions have been committed yet, LastTid returns 0.
LastTid(ctx context.Context) (Tid, error)
// LastOid returns highest object id of objects committed to storage.
......@@ -154,37 +158,51 @@ type IStorage interface {
// XXX ZODB/py does not define this in IStorage.
LastOid(ctx context.Context) (Oid, error)
// Load loads data from database.
// Load loads object data addressed by xid from database.
//
// XXX currently deleted data is returned as buf.Data=nil -- is it ok?
// TODO specify error when data not found -> ErrOidMissing | ErrXidMissing
//
// NOTE ZODB/py provides 2 entrypoints in IStorage for loading:
// LoadSerial and LoadBefore but in ZODB/go we have only Load which is
// a bit different from both:
//
// The object to load is addressed by xid.
// - Load loads object data for object at database state specified by xid.At
// - LoadBefore loads object data for object at database state previous to xid.At
// it is thus equivalent to Load(..., xid.At-1)
// - LoadSerial loads object data from revision exactly modified
// by transaction with tid = xid.At.
// it is thus equivalent to Load(..., xid.At) with followup
// check that returned serial is exactly xid.At(*)
//
// NOTE ZODB/py provides 2 entrypoints in IStorage: LoadSerial and
// LoadBefore. Load generalizes them into one (see Xid for details).
// (*) LoadSerial is used only in a few places in ZODB/py - mostly in
// conflict resolution code where plain Load semantic - without
// checking object was particularly modified at that revision - would
// suffice.
//
// XXX zodb.loadBefore() returns (data, serial, serial_next) -> add serial_next?
// XXX currently deleted data is returned as buf.Data=nil -- is it ok?
// TODO specify error when data not found -> ErrOidMissing | ErrXidMissing
Load(ctx context.Context, xid Xid) (buf *Buf, serial Tid, err error) // XXX -> DataInfo ?
Load(ctx context.Context, xid Xid) (buf *Buf, serial Tid, err error)
// Prefetch(ctx, xid Xid) (no error)
// TODO add invalidation channel (notify about changes made to DB not by us)
// TODO: write mode
// Store(oid Oid, serial Tid, data []byte, txn ITransaction) error
// XXX Restore ?
// CheckCurrentSerialInTransaction(oid Oid, serial Tid, txn ITransaction) // XXX naming
// KeepCurrent(oid Oid, serial Tid, txn ITransaction)
// TpcBegin(txn)
// TpcVote(txn)
// TpcFinish(txn, callback)
// TpcAbort(txn)
// TODO: invalidation channel (notify about changes made to DB not by us)
// TODO:
// tpc_begin(txn)
// tpc_vote(txn)
// tpc_finish(txn, callback) XXX clarify about callback
// tpc_abort(txn)
// TODO: History(ctx, oid, size=1)
// Iterate creates iterator to iterate storage in [tidMin, tidMax] range.
//
// XXX allow iteration both ways (forward & backward)
// TODO allow iteration both ways (forward & backward)
Iterate(tidMin, tidMax Tid) ITxnIterator // XXX ctx , error ?
}
......@@ -210,7 +228,8 @@ type IDataIterator interface {
// Valid returns whether tid is in valid transaction identifiers range
func (tid Tid) Valid() bool {
if 0 <= tid && tid <= TidMax {
// NOTE 0 is invalid tid
if 0 < tid && tid <= TidMax {
return true
} else {
return false
......
......@@ -62,7 +62,7 @@ func Dumpobj(ctx context.Context, w io.Writer, stor zodb.IStorage, xid zodb.Xid,
objInfo.Oid = xid.Oid
objInfo.Tid = tid
objInfo.Data = buf.Data
objInfo.DataTid = tid // XXX generally wrong
objInfo.DataTidHint = 0 // no copy detection at catobj - just dump raw content
d := dumper{W: w, HashOnly: hashOnly}
err = d.DumpData(&objInfo)
......
......@@ -94,8 +94,8 @@ func (d *dumper) DumpData(datai *zodb.DataInfo) error {
case datai.Data == nil:
buf .S("delete")
case datai.Tid != datai.DataTid:
buf .S("from ") .V(&datai.DataTid)
case datai.DataTidHint != 0:
buf .S("from ") .V(&datai.DataTidHint)
default:
// XXX sha1 is hardcoded for now. Dump format allows other hashes.
......
......@@ -53,15 +53,13 @@ http://docs.pylonsproject.org/projects/zodburi/
const helpXid =
`An object address for loading from ZODB should be specified as follows:
- "=" or "<" character depending on whether it is exact or "tid before" query
- tid
- ":"
- oid
for example
=0285cbac258bf266:0000000000000001 - oid 1 at exactly 0285cbac258bf266 transaction
<0285cbac258bf266:0000000000000001 - oid 1 at first newest transaction changing it with tid < 0285cbac258bf266
0285cbac258bf266:0000000000000001 - oid 1 at first newest transaction changing it with tid <= 0285cbac258bf266
`
var helpTopics = prog.HelpRegistry{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment