Commit 30f5ddc7 authored by Kirill Smelkov's avatar Kirill Smelkov

ΔFtail += .Epoch in δf

Unfortunately I could not avoid computing diff for ZBigFile objects
themselves, because after recent ΔFtail and ΔZtail fixes, wcfs tests
started to fail becase in the following scenario

    δFtail is created                        @at0
    file is created                          @at1
    file starts to be tracked when δFtail is @at2
    δFtail query is made about file block -> boom reports @at0, because the file does not exist there at all

To fix this ΔFtail now detects when ZBigFile objects are changed
themselves and indicate such a change with specifal δf with
δf.Epoch=true .

* t2+δfepoch: (38 commits)
  X ΔFtail: Rebuild vδE after first track
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  ...
parents 7547a4d2 bf3ace66
......@@ -453,8 +453,8 @@ func diffX(ctx context.Context, a, b Node, δZTC setOid, trackSet blib.PPTreeSub
// a, b point to top of subtrees @old and @new revisions.
// δZTC is connected set of objects covering δZT (objects changed in this tree in old..new).
func diffT(ctx context.Context, A, B *Tree, δZTC setOid, trackSet blib.PPTreeSubSet) (δ map[Key]ΔValue, δtrack *blib.ΔPPTreeSubSet, δtkeycov *blib.RangedKeySet, err error) {
tracefDiff(" diffT %s %s\n", xidOf(A), xidOf(B))
defer xerr.Contextf(&err, "diffT %s %s", xidOf(A), xidOf(B))
tracefDiff(" diffT %s %s\n", xzodb.XidOf(A), xzodb.XidOf(B))
defer xerr.Contextf(&err, "diffT %s %s", xzodb.XidOf(A), xzodb.XidOf(B))
δ = map[Key]ΔValue{}
δtrack = blib.NewΔPPTreeSubSet()
......@@ -887,8 +887,8 @@ func δMerge(δ, δ2 map[Key]ΔValue) error {
// diffB computes difference in between two buckets.
// see diffX for details.
func diffB(ctx context.Context, a, b *Bucket) (δ map[Key]ΔValue, err error) {
tracefDiff(" diffB %s %s\n", xidOf(a), xidOf(b))
defer xerr.Contextf(&err, "diffB %s %s", xidOf(a), xidOf(b))
tracefDiff(" diffB %s %s\n", xzodb.XidOf(a), xzodb.XidOf(b))
defer xerr.Contextf(&err, "diffB %s %s", xzodb.XidOf(a), xzodb.XidOf(b))
// XXX oid can be InvalidOid for T/B... (i.e. B is part of T and is not yet committed separately)
var av []BucketEntry
......@@ -952,13 +952,10 @@ func diffB(ctx context.Context, a, b *Bucket) (δ map[Key]ΔValue, err error) {
// zgetNodeOrNil returns btree node corresponding to zconn.Get(oid) .
// if the node does not exist, (nil, ok) is returned.
func zgetNodeOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (_ Node, err error) {
func zgetNodeOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (node Node, err error) {
defer xerr.Contextf(&err, "getnode %s@%s", oid, zconn.At())
xnode, err := zconn.Get(ctx, oid)
if err != nil {
if xzodb.IsErrNoData(err) {
err = nil
}
xnode, err := xzodb.ZGetOrNil(ctx, zconn, oid)
if xnode == nil || err != nil {
return nil, err
}
......@@ -966,20 +963,6 @@ func zgetNodeOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (_
if !ok {
return nil, fmt.Errorf("unexpected type: %s", zodb.ClassOf(xnode))
}
// activate the node to find out it really exists
// after removal on storage, the object might have stayed in Connection
// cache due to e.g. PCachePinObject, and it will be PActivate that
// will return "deleted" error.
err = node.PActivate(ctx)
if err != nil {
if xzodb.IsErrNoData(err) {
return nil, nil
}
return nil, err
}
node.PDeactivate()
return node, nil
}
......@@ -993,15 +976,6 @@ func vOid(xvalue interface{}) (zodb.Oid, error) {
return value.POid(), nil
}
// xidOf return string representation of object xid.
func xidOf(obj zodb.IPersistent) string {
if obj == nil || reflect.ValueOf(obj).IsNil() {
return "ø"
}
xid := zodb.Xid{At: obj.PJar().At(), Oid: obj.POid()}
return xid.String()
}
func (rn *nodeInRange) String() string {
done := " "; if rn.done { done = "*" }
return fmt.Sprintf("%s%s%s", done, rn.keycov, vnode(rn.node))
......
......@@ -266,6 +266,16 @@ func (t *Commit) XGetBlkData(oid zodb.Oid) string {
return zblki.Data
}
// XGetBlkByName returns ZBlk info associated with ZBlk<name>
func (t *Commit) XGetBlkByName(name string) (zodb.Oid, ZBlkInfo) {
for oid, zblki := range t.ZBlkTab {
if zblki.Name == name {
return oid, zblki
}
}
panicf("ZBlk<%q> not found", name)
return zodb.InvalidOid, ZBlkInfo{} // XXX should be not needed
}
// xGetTree loads Tree from zurl@at->obj<root>.
//
......
......@@ -221,6 +221,18 @@ def TreesSrv(zstor, r):
xprint("%s" % ashex(head))
continue
# øf command to delete the file
if treetxt == "øf":
head = commitDelete(zfile, subj)
xprint("%s" % ashex(head))
continue
# make sure we continue with undeleted ztree/zfile
if deleted(ztree):
undelete(ztree)
if deleted(zfile):
undelete(zfile)
# t... D... commands to natively commit updates to tree and values
if treetxt.startswith('t'):
t, D = treetxt.split()
......@@ -480,8 +492,18 @@ def commitDelete(obj, description): # -> tid
# reset transaction to a new one
transaction.begin()
obj._v_deleted = True
return tid
# deleted reports whether obj was deleted via commitDelete.
def deleted(obj): # -> bool
return getattr(obj, '_v_deleted', False)
# undelete forces recreation for obj that was previously deleted via commitDelete.
def undelete(obj):
obj._p_changed = True
del obj._v_deleted
# ztreetxt returns text representation of a ZODB tree.
@func(ZCtx)
......
......@@ -305,24 +305,42 @@ func (δBtail *ΔBtail) rebuildAll() (err error) {
defer xerr.Context(&err, "ΔBtail rebuildAll")
// XXX locking
trackNewRoots := δBtail.trackNewRoots
tracefΔBtail("\nRebuildAll @%s..@%s trackNewRoots: %s\n", δBtail.Tail(), δBtail.Head(), δBtail.trackNewRoots)
tracefΔBtail("\nRebuildAll @%s..@%s trackNewRoots: %s\n", δBtail.Tail(), δBtail.Head(), trackNewRoots)
for root := range δBtail.trackNewRoots {
delete(δBtail.trackNewRoots, root)
δBtail.rebuild1(root)
}
for root := range trackNewRoots {
δTtail := δBtail.vδTbyRoot[root] // must be there
δtrackSet, δrevSet, err := δTtail.rebuild(root, δBtail.δZtail, δBtail.db)
if err != nil {
return err
}
δBtail.trackSet.UnionInplace(δtrackSet)
δBtail.vδBroots_Update(root, δrevSet)
return nil
}
// rebuild1IfNeeded rebuilds ΔBtail for single root if that root needs rebuilding.
func (δBtail *ΔBtail) rebuild1IfNeeded(root zodb.Oid) error {
// XXX locking
_, ok := δBtail.trackNewRoots[root]
if !ok {
return nil
}
δBtail.trackNewRoots = setOid{}
delete(δBtail.trackNewRoots, root)
return δBtail.rebuild1(root)
}
// rebuild1 rebuilds ΔBtail for single root.
func (δBtail *ΔBtail) rebuild1(root zodb.Oid) error {
// XXX locking
δTtail := δBtail.vδTbyRoot[root] // must be there
δtrackSet, δrevSet, err := δTtail.rebuild(root, δBtail.δZtail, δBtail.db)
if err != nil {
return err
}
δBtail.trackSet.UnionInplace(δtrackSet)
δBtail.vδBroots_Update(root, δrevSet)
return nil
}
// rebuild rebuilds ΔTtail taking trackNew requests into account.
//
// It returns:
......@@ -333,7 +351,7 @@ func (δBtail *ΔBtail) rebuildAll() (err error) {
//
// XXX place
func (δTtail *ΔTtail) rebuild(root zodb.Oid, δZtail *zodb.ΔTail, db *zodb.DB) (δtrackSet blib.PPTreeSubSet, δrevSet setTid, err error) {
defer xerr.Context(&err, "ΔTtail rebuild")
defer xerr.Contextf(&err, "ΔTtail<%s> rebuild", root)
// XXX locking
tracefΔBtail("\nRebuild %s @%s .. @%s\n", root, δZtail.Tail(), δZtail.Head())
......@@ -732,6 +750,7 @@ func (δBtail *ΔBtail) ForgetPast(revCut zodb.Tid) {
}
func (δTtail *ΔTtail) forgetPast(revCut zodb.Tid) {
// XXX locking
// XXX lastRevOf
icut := 0
......@@ -760,13 +779,18 @@ func (δBtail *ΔBtail) GetAt(ctx context.Context, root *Tree, key Key, at zodb.
// XXX key not tracked -> panic
// XXX at not ∈ (tail, head] -> panic
// XXX dirty -> rebuild
// XXX locking
rootAt := root.PJar().At()
if rootAt != δBtail.Head() {
panicf("δBtail: root.at (@%s) != head (@%s)", rootAt, δBtail.Head())
}
err = δBtail.rebuild1IfNeeded(root.POid())
if err != nil {
return
}
δTtail := δBtail.vδTbyRoot[root.POid()]
if δTtail == nil {
panicf("δBtail: root<%s> not tracked", root.POid())
......@@ -805,6 +829,9 @@ func (δBtail *ΔBtail) GetAt(ctx context.Context, root *Tree, key Key, at zodb.
// @tail[key] is not present - key was not changing in (tail, head].
// since at ∈ (tail, head] we can use @head[key] as the result
xvalue, ok, err := root.Get(ctx, key)
if !ok {
value = VDEL
}
if err != nil || !ok {
return
}
......@@ -837,7 +864,12 @@ func (δBtail *ΔBtail) GetAt(ctx context.Context, root *Tree, key Key, at zodb.
func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) /*readonly*/[]ΔTree {
xtail.AssertSlice(δBtail, lo, hi)
// XXX locking
// XXX rebuild
err := δBtail.rebuild1IfNeeded(root)
if err != nil {
panic(err) // XXX
}
δTtail, ok := δBtail.vδTbyRoot[root]
if !ok {
return []ΔTree{}
......@@ -874,6 +906,11 @@ func (δBtail *ΔBtail) ΔZtail() /*readonly*/*zodb.ΔTail {
return δBtail.δZtail
}
// DB returns database handle that δBtail is using to access ZODB.
func (δBtail *ΔBtail) DB() *zodb.DB {
return δBtail.db
}
func tracefΔBtail(format string, argv ...interface{}) {
if traceΔBtail {
......
......@@ -1260,17 +1260,6 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
t.Errorf("%s:\nhave: %s\nwant: %s", subj, have, want)
}
// zblkByName returns oid of ZBlk that has .Name == name
zblkByName := func(name string) zodb.Oid {
for oid, zblki := range t0.ZBlkTab {
if zblki.Name == name {
return oid
}
}
panicf("ZBlk<%q> not found", name)
return zodb.InvalidOid // XXX should be not needed
}
s00 := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At)
s01 := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At)
s02 := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At)
......@@ -1286,8 +1275,8 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
assertvδT("t2.s22", s22)
// sXX should be all aliased to vδT
gg := zblkByName("g")
hh := zblkByName("h")
gg, _ := t0.XGetBlkByName("g")
hh, _ := t0.XGetBlkByName("h")
vδT[0].Rev = t0.At; δkv0 := vδT[0].ΔKV; vδT[0].ΔKV = map[Key]ΔValue{11:{gg,gg}}
vδT[1].Rev = t0.At; δkv1 := vδT[1].ΔKV; vδT[1].ΔKV = map[Key]ΔValue{12:{hh,hh}}
assertvδT("t2.vδT*", vδT, ΔT{t0.At, δ{11:{g,g}}}, ΔT{t0.At, δ{12:{h,h}}})
......@@ -1331,8 +1320,8 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
assertvδT("t12.s22_", s22_)
// sXX_ should be all aliased to vδT, but not sXX
bb := zblkByName("b")
cc := zblkByName("c")
bb, _ := t0.XGetBlkByName("b")
cc, _ := t0.XGetBlkByName("c")
vδT[0].Rev = t0.At; δkv0 = vδT[0].ΔKV; vδT[0].ΔKV = map[Key]ΔValue{111:{bb,bb}}
vδT[1].Rev = t0.At; δkv1 = vδT[1].ΔKV; vδT[1].ΔKV = map[Key]ΔValue{112:{cc,cc}}
assertvδT("t12.vδT*", vδT, ΔT{t0.At, δ{111:{b,b}}}, ΔT{t0.At, δ{112:{c,c}}})
......
......@@ -24,8 +24,10 @@ import (
"context"
"errors"
"fmt"
"reflect"
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb"
......@@ -82,6 +84,33 @@ func ZOpen(ctx context.Context, zdb *zodb.DB, zopt *zodb.ConnOptions) (_ *ZConn,
}, nil
}
// ZGetOrNil returns zconn.Get(oid), or (nil,ok) if the object does not exist.
func ZGetOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (_ zodb.IPersistent, err error) {
defer xerr.Contextf(&err, "zget %s@%s", oid, zconn.At())
obj, err := zconn.Get(ctx, oid)
if err != nil {
if IsErrNoData(err) {
err = nil
}
return nil, err
}
// activate the object to find out it really exists
// after removal on storage, the object might have stayed in Connection
// cache due to e.g. PCachePinObject, and it will be PActivate that
// will return "deleted" error.
err = obj.PActivate(ctx)
if err != nil {
if IsErrNoData(err) {
return nil, nil
}
return nil, err
}
obj.PDeactivate()
return obj, nil
}
// IsErrNoData returns whether err is due to NoDataError or NoObjectError.
func IsErrNoData(err error) bool {
var eNoData *zodb.NoDataError
......@@ -96,3 +125,12 @@ func IsErrNoData(err error) bool {
return false
}
}
// XidOf return string representation of object xid.
func XidOf(obj zodb.IPersistent) string {
if obj == nil || reflect.ValueOf(obj).IsNil() {
return "ø"
}
xid := zodb.Xid{At: obj.PJar().At(), Oid: obj.POid()}
return xid.String()
}
......@@ -19,18 +19,23 @@
package zdata
// XXX note about ΔFtail organization: queries results are built on the fly to
// avoid complexity of recomputing vδF on tracking set change.
import (
"context"
"fmt"
"sort"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/btree"
"lab.nexedi.com/nexedi/wendelin.core/wcfs/internal/set"
"lab.nexedi.com/nexedi/wendelin.core/wcfs/internal/xbtree"
"lab.nexedi.com/nexedi/wendelin.core/wcfs/internal/xtail"
// "lab.nexedi.com/nexedi/wendelin.core/wcfs/internal/xzodb"
"lab.nexedi.com/nexedi/wendelin.core/wcfs/internal/xzodb"
)
type setI64 = set.I64
......@@ -46,12 +51,14 @@ type setOid = set.Oid
//
// δF:
// .rev↑
// {} file -> {}blk
// {} file -> {}blk | EPOCH
//
// Only files and blocks explicitly requested to be tracked are guaranteed to
// be present. In particular a block that was not explicitly requested to be
// tracked, even if it was changed in δZ, is not guaranteed to be present in δF.
//
// XXX after epoch previous track requests has no effects
//
// ΔFtail provides the following operations:
//
// .Track(file, blk, path, zblk) - add file and block reached via BTree path to tracked set.
......@@ -75,13 +82,33 @@ type setOid = set.Oid
type ΔFtail struct {
// ΔFtail merges ΔBtail with history of ZBlk
δBtail *xbtree.ΔBtail
fileIdx map[zodb.Oid]setOid // tree-root -> {} ZBigFile<oid> as of @head
rootIdx map[zodb.Oid]zodb.Oid // file -> tree-root
fileIdx map[zodb.Oid]setOid // tree-root -> {} ZBigFile<oid> as of @head XXX -> root2file ?
byFile map[zodb.Oid]*_ΔFileTail // file -> vδf tail XXX
// set of files, which are newly tracked and for which vδE was not yet rebuilt
trackNew setOid // {}foid
trackSetZFile setOid // set of tracked ZBigFiles as of @head
trackSetZBlk map[zodb.Oid]*zblkTrack // zblk -> {} root -> {}blk as of @head
}
// _ΔFileTail represents tail of revisional changes to one file.
type _ΔFileTail struct {
root zodb.Oid // .blktab as of @head
vδE []_ΔFileEpoch // changes to ZBigFile object itself ; nil if not yet rebuilt
}
// _ΔFileEpoch represent change to ZBigFile object.
type _ΔFileEpoch struct {
Rev zodb.Tid
oldRoot zodb.Oid // .blktab was pointing to oldRoot before ; VDEL if ZBigFile deleted
newRoot zodb.Oid // .blktab was changed to point to newRoot ; ----//----
newBlkSize int64 // .blksize was changed to newBlkSize ; -1 if ZBigFile deleted
// XXX +oldBlkSize ?
// snapshot of trackSetZBlk for this file right before this epoch
oldTrackSetZBlk map[zodb.Oid]setI64 // {} zblk -> {}blk
}
// zblkTrack keeps information in which root/blocks ZBlk is present as of @head.
type zblkTrack struct {
inroot map[zodb.Oid]setI64 // {} root -> {}blk
......@@ -96,14 +123,11 @@ type ΔF struct {
// ΔFile represents a change to one file.
type ΔFile struct {
Rev zodb.Tid
Epoch bool // whether file changed completely
Blocks setI64 // changed blocks XXX -> ΔBlocks ?
Size bool // whether file size changed XXX -> ΔSize?
}
// XXX note about ΔFtail organization: queries results are built on the fly to
// avoid complexity of recomputing vδF on tracking set change.
// NewΔFtail creates new ΔFtail object.
//
// Initial tracked set is empty.
......@@ -115,8 +139,8 @@ func NewΔFtail(at0 zodb.Tid, db *zodb.DB) *ΔFtail {
return &ΔFtail{
δBtail: xbtree.NewΔBtail(at0, db),
fileIdx: map[zodb.Oid]setOid{},
rootIdx: map[zodb.Oid]zodb.Oid{},
trackSetZFile: setOid{},
byFile: map[zodb.Oid]*_ΔFileTail{},
trackNew: setOid{},
trackSetZBlk: map[zodb.Oid]*zblkTrack{},
}
}
......@@ -160,18 +184,17 @@ func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, zb
}
files.Add(foid)
roid_, ok := δFtail.rootIdx[foid]
if ok {
if roid != roid_ {
panicf("zfile<%s> changed root from %s -> %s", foid, roid_, roid)
}
} else {
δFtail.rootIdx[foid] = roid
δftail, ok := δFtail.byFile[foid]
if !ok {
δftail = &_ΔFileTail{root: roid, vδE: nil /*will need to be rebuilt till past*/}
δFtail.byFile[foid] = δftail
δFtail.trackNew.Add(foid)
}
if δftail.root != roid {
panicf("zfile<%s> changed root from %s -> %s", foid, δftail.root, roid)
}
δFtail.trackSetZFile.Add(foid)
// associate zblk with file, if it was not hole
if zblk != nil {
zoid := zblk.POid()
......@@ -193,6 +216,90 @@ func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, zb
}
}
// rebuildAll rebuilds vδE for all files from trackNew requests.
func (δFtail *ΔFtail) rebuildAll() (err error) {
defer xerr.Contextf(&err, "ΔFtail rebuildAll")
// XXX locking
for foid := range δFtail.trackNew {
delete(δFtail.trackNew, foid)
δftail := δFtail.byFile[foid]
δBtail := δFtail.δBtail
err := δftail.rebuild1(foid, δBtail.ΔZtail(), δBtail.DB())
if err != nil {
return err
}
}
return nil
}
// rebuildIfNeeded rebuilds vδE if there is such need.
//
// it returns corresponding δftail for convenience.
// the only case when vδE actually needs to be rebuilt is when the file just started to be tracked.
func (δFtail *ΔFtail) rebuildIfNeeded(foid zodb.Oid) (_ *_ΔFileTail, err error) {
// XXX locking
δftail := δFtail.byFile[foid]
if δftail.vδE != nil {
err = nil
} else {
δBtail := δFtail.δBtail
err = δftail.rebuild1(foid, δBtail.ΔZtail(), δBtail.DB())
}
return δftail, err
}
// rebuild rebuilds vδE.
func (δftail *_ΔFileTail) rebuild1(foid zodb.Oid, δZtail *zodb.ΔTail, db *zodb.DB) (err error) {
defer xerr.Contextf(&err, "file<%s>: rebuild", foid)
// XXX locking
if δftail.vδE != nil {
panic("rebuild: vδE != nil")
}
vδE := []_ΔFileEpoch{}
vδZ := δZtail.Data()
atPrev := δZtail.Tail()
for i := 0; i < len(vδZ); i++ {
δZ := vδZ[i]
fchanged := false
for _, oid := range δZ.Changev {
if oid == foid {
fchanged = true
break
}
}
if !fchanged {
continue
}
δ, err := zfilediff(db, foid, atPrev, δZ.Rev)
if err != nil {
return err
}
if δ != nil {
δE := _ΔFileEpoch{
Rev: δZ.Rev,
oldRoot: δ.blktabOld,
newRoot: δ.blktabNew,
newBlkSize: δ.blksizeNew,
oldTrackSetZBlk: nil, // nothing was tracked
}
vδE = append(vδE, δE)
}
atPrev = δZ.Rev
}
δftail.vδE = vδE
return nil
}
// Update updates δFtail given raw ZODB changes.
//
// It returns change in files space that corresponds to δZ.
......@@ -210,6 +317,14 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) (_ ΔF, err error) {
// XXX verify zhead.At() == δFtail.Head()
// XXX locking
// rebuild vδE for newly tracked files
err = δFtail.rebuildAll()
if err != nil {
return ΔF{}, err
}
headOld := δFtail.Head()
δB, err := δFtail.δBtail.Update(δZ)
if err != nil {
return ΔF{}, err
......@@ -217,6 +332,43 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) (_ ΔF, err error) {
δF := ΔF{Rev: δB.Rev, ByFile: make(map[zodb.Oid]*ΔFile)}
// take ZBigFile changes into account
δzfile := map[zodb.Oid]*_ΔZBigFile{} // which tracked ZBigFiles are changed
for _, oid := range δZ.Changev {
δftail, ok := δFtail.byFile[oid]
if !ok {
continue // not ZBigFile or file is not tracked
}
δ, err := zfilediff(δFtail.δBtail.DB(), oid, headOld, δZ.Tid)
if err != nil {
return ΔF{}, err
}
//fmt.Printf("zfile<%s> diff %s..%s -> δ: %v\n", oid, headOld, δZ.Tid, δ)
if δ != nil {
// XXX rebuild first
δzfile[oid] = δ
δE := _ΔFileEpoch{
Rev: δZ.Tid,
oldRoot: δ.blktabOld,
newRoot: δ.blktabNew,
newBlkSize: δ.blksizeNew,
oldTrackSetZBlk: map[zodb.Oid]setI64{},
}
for oid, zt := range δFtail.trackSetZBlk {
inblk, ok := zt.inroot[δftail.root]
if ok {
δE.oldTrackSetZBlk[oid] = inblk
delete(zt.inroot, δftail.root)
}
}
δftail.root = δE.newRoot
δftail.vδE = append(δftail.vδE, δE)
}
}
// take btree changes into account
// fmt.Printf("δB.ΔByRoot: %v\n", δB.ΔByRoot)
for root, δt := range δB.ΔByRoot {
......@@ -276,13 +428,6 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) (_ ΔF, err error) {
// take zblk changes into account
for _, oid := range δZ.Changev {
if δFtail.trackSetZFile.Has(oid) {
// TODO check that .blksize and .blktab (it is only
// persistent reference) do not change.
return ΔF{}, fmt.Errorf("ZBigFile<%s> changed @%s", oid, δZ.Tid)
}
zt, ok := δFtail.trackSetZBlk[oid]
if !ok {
continue // not tracked
......@@ -306,6 +451,25 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) (_ ΔF, err error) {
}
}
// if ZBigFile object is changed - it starts new epoch for that file
for foid, δ := range δzfile {
δfile, ok := δF.ByFile[foid]
if !ok {
δfile = &ΔFile{Rev: δF.Rev}
δF.ByFile[foid] = δfile
}
δfile.Epoch = true
δfile.Blocks = nil
δfile.Size = false
// XXX + rebuild XXX not here - in track(new file)
_ = δ
//fmt.Printf("δZBigFile: %v\n", δ)
// XXX update .fileIdx
}
// fmt.Printf("-> δF: %v\n", δF)
return δF, nil
}
......@@ -314,6 +478,28 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) (_ ΔF, err error) {
// ForgetPast discards all δFtail entries with rev ≤ revCut.
func (δFtail *ΔFtail) ForgetPast(revCut zodb.Tid) {
δFtail.δBtail.ForgetPast(revCut)
// XXX locking
// XXX keep index which file changed epoch where (similarly to ΔBtail),
// and, instead of scanning all files, trim vδE only on files that is really necessary.
for _, δftail := range δFtail.byFile {
δftail.forgetPast(revCut)
}
}
func (δftail *_ΔFileTail) forgetPast(revCut zodb.Tid) {
// XXX locking
icut := 0
for ; icut < len(δftail.vδE); icut++ {
if δftail.vδE[icut].Rev > revCut {
break
}
}
// vδE[:icut] should be forgotten
if icut > 0 { // XXX workarond for ΔFtail.ForgetPast calling forgetPast on all files
δftail.vδE = append([]_ΔFileEpoch{}, δftail.vδE[icut:]...)
}
}
// XXX don't need
......@@ -336,10 +522,12 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
xtail.AssertSlice(δFtail, lo, hi)
// XXX locking
// XXX rebuild
// query .δBtail.SliceByRootRev(file.blktab, lo, hi) +
// merge δZBlk history with that.
// merging tree (δT) and Zblk (δZblk) histories into file history (δFile):
// δT ────────·──────────────·─────────────────·────────────
......@@ -354,10 +542,6 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
// δFile ────────o───────o──────x─────x────────────────────────
root := δFtail.rootIdx[zfile.POid()]
vδT := δFtail.δBtail.SliceByRootRev(root, lo, δFtail.Head()) // NOTE @head, not hi
vδZ := δFtail.δBtail.ΔZtail().SliceByRev(lo, hi)
var vδf []*ΔFile
// vδfTail returns or creates vδf entry for revision tail
// tail must be <= all vδf revisions
......@@ -377,82 +561,143 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
return δfTail
}
// state of Zinblk as we are scanning ←
// initially corresponds to @head = vδT[-1]
Zinblk := map[zodb.Oid]setI64{} // zblk -> which #blk refer to it
var ZinblkAt zodb.Tid // Zinblk covers [ZinblkAt,<next δT>)
for zblk, zt := range δFtail.trackSetZBlk {
inblk, ok := zt.inroot[root]
if ok {
Zinblk[zblk] = inblk.Clone()
}
}
it := len(vδT) - 1
if it >= 0 {
ZinblkAt = vδT[it].Rev
} else {
ZinblkAt = lo
δftail, err := δFtail.rebuildIfNeeded(zfile.POid())
if err != nil {
panic(err) // XXX
}
vδZ := δFtail.δBtail.ΔZtail().SliceByRev(lo, hi)
iz := len(vδZ) - 1
for (iz >= 0 || it >= 0) {
// δZ that is covered by current Zinblk
// -> update δf
if iz >= 0 {
δZ := vδZ[iz]
if ZinblkAt <= δZ.Rev {
//fmt.Printf("δZ @%s\n", δZ.Rev)
for _, oid := range δZ.Changev {
inblk, ok := Zinblk[oid]
if ok && len(inblk) != 0 {
δf := vδfTail(δZ.Rev)
δf.Blocks.Update(inblk)
}
// find epoch that covers hi
vδE := δftail.vδE
le := len(vδE)
ie := sort.Search(le, func(i int) bool {
return hi < vδE[i].Rev
})
// vδE[ie] is next epoch
// vδE[ie-1] is epoch that covers hi
// loop through all epochs till lo
for lastEpoch := false; !lastEpoch ; {
// current epoch
var epoch zodb.Tid
ie--
if ie < 0 {
epoch = δFtail.Tail()
} else {
epoch = vδE[ie].Rev
}
if epoch <= lo {
epoch = lo
lastEpoch = true
}
var root zodb.Oid // root of blktab in current epoch
var head zodb.Tid // head] of current epoch coverage
// state of Zinblk as we are scanning ← current epoch
// initially corresponds to head of the epoch (= @head for latest epoch)
Zinblk := map[zodb.Oid]setI64{} // zblk -> which #blk refers to it
var ZinblkAt zodb.Tid // Zinblk covers [ZinblkAt,<next δT>)
if ie+1 == le {
// head
root = δftail.root
head = δFtail.Head()
for zblk, zt := range δFtail.trackSetZBlk {
inblk, ok := zt.inroot[root]
if ok {
Zinblk[zblk] = inblk.Clone()
}
iz--
continue
}
// XXX ZinblkAt
} else {
δE := vδE[ie+1]
root = δE.oldRoot
head = δE.Rev - 1 // XXX ok?
for zblk, inblk := range δE.oldTrackSetZBlk {
Zinblk[zblk] = inblk.Clone()
}
}
// δT -> adjust Zinblk + update δf
// vδT for current epoch
vδT := δFtail.δBtail.SliceByRootRev(root, epoch, head) // NOTE @head, not hi
it := len(vδT) - 1
if it >= 0 {
δT := vδT[it]
//fmt.Printf("δT @%s\n", δT.Rev)
for blk, δzblk := range δT.ΔKV {
// apply in reverse as we go ←
if δzblk.New != xbtree.VDEL {
inblk, ok := Zinblk[δzblk.New]
if ok {
inblk.Del(blk)
ZinblkAt = vδT[it].Rev
} else {
ZinblkAt = epoch
}
// merge vδZ and vδT of current epoch
for ((iz >= 0 && vδZ[iz].Rev >= epoch) || it >= 0) {
// δZ that is covered by current Zinblk
// -> update δf
if iz >= 0 {
δZ := vδZ[iz]
if ZinblkAt <= δZ.Rev {
//fmt.Printf("δZ @%s\n", δZ.Rev)
for _, oid := range δZ.Changev {
inblk, ok := Zinblk[oid]
if ok && len(inblk) != 0 {
δf := vδfTail(δZ.Rev)
δf.Blocks.Update(inblk)
}
}
iz--
continue
}
if δzblk.Old != xbtree.VDEL {
inblk, ok := Zinblk[δzblk.Old]
if !ok {
inblk = setI64{}
Zinblk[δzblk.Old] = inblk
}
// δT -> adjust Zinblk + update δf
if it >= 0 {
δT := vδT[it]
//fmt.Printf("δT @%s\n", δT.Rev)
for blk, δzblk := range δT.ΔKV {
// apply in reverse as we go ←
if δzblk.New != xbtree.VDEL {
inblk, ok := Zinblk[δzblk.New]
if ok {
inblk.Del(blk)
}
}
if δzblk.Old != xbtree.VDEL {
inblk, ok := Zinblk[δzblk.Old]
if !ok {
inblk = setI64{}
Zinblk[δzblk.Old] = inblk
}
inblk.Add(blk)
}
if δT.Rev <= hi {
δf := vδfTail(δT.Rev)
δf.Blocks.Add(blk)
δf.Size = true // see Update
}
inblk.Add(blk)
}
if δT.Rev <= hi {
δf := vδfTail(δT.Rev)
δf.Blocks.Add(blk)
δf.Size = true // see Update
it--
if it >= 0 {
ZinblkAt = vδT[it].Rev
} else {
ZinblkAt = epoch
}
}
}
it--
if it >= 0 {
ZinblkAt = vδT[it].Rev
} else {
ZinblkAt = lo
// emit epoch δf
if ie >= 0 {
epoch := vδE[ie].Rev
if epoch > lo { // it could be <=
δf := vδfTail(epoch)
δf.Epoch = true
δf.Blocks = nil // XXX must be already nil
δf.Size = false // XXX must be already false
}
}
}
// vδf was built in reverse order
// invert the order before returning
for i,j := 0, len(vδf)-1; i<j; i,j = i+1,j-1 {
......@@ -473,34 +718,190 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
//
// XXX +ctx, error rebuild []δF here
func (δFtail *ΔFtail) LastBlkRev(ctx context.Context, zf *ZBigFile, blk int64, at zodb.Tid) (_ zodb.Tid, exact bool) {
//defer xerr.Contextf(&err, "") // XXX text
err := zf.PActivate(ctx)
rev, exact, err := δFtail._LastBlkRev(ctx, zf, blk, at)
if err != nil {
panic(err) // XXX
}
defer zf.PDeactivate()
return rev, exact
}
func (δFtail *ΔFtail) _LastBlkRev(ctx context.Context, zf *ZBigFile, blk int64, at zodb.Tid) (_ zodb.Tid, exact bool, err error) {
defer xerr.Contextf(&err, "blkrev f<%s> #%d @%s", zf.POid(), blk, at)
// XXX tabRev -> treeRev ?
zblkOid, ok, tabRev, tabRevExact, err := δFtail.δBtail.GetAt(ctx, zf.blktab, blk, at)
//fmt.Printf("GetAt #%d @%s -> %s, %v, @%s, %v\n", blk, at, zblkOid, ok, tabRev, tabRevExact)
//fmt.Printf("\nblkrev #%d @%s\n", blk, at)
// XXX locking
δftail, err := δFtail.rebuildIfNeeded(zf.POid())
if err != nil {
panic(err) // XXX
return zodb.InvalidTid, false, err
}
// find epoch that covers at and associated blktab root/object
vδE := δftail.vδE
//fmt.Printf(" vδE: %v\n", vδE)
l := len(vδE)
i := sort.Search(l, func(i int) bool {
return at < vδE[i].Rev
})
// vδE[i] is next epoch
// vδE[i-1] is epoch that covers at
// root
var root zodb.Oid
var rootObj *btree.LOBTree
if i == l {
err := zf.PActivate(ctx)
if err != nil {
// file deleted
if xzodb.IsErrNoData(err) {
root = xbtree.VDEL
} else {
return zodb.InvalidTid, false, err
}
} else {
rootObj = zf.blktab
root = rootObj.POid()
zf.PDeactivate()
}
} else {
root = vδE[i].oldRoot
}
// epoch
var epoch zodb.Tid
i--
if i < 0 {
// i<0 - first epoch (no explicit start) - use δFtail.tail as lo
epoch = δFtail.Tail()
} else {
epoch = vδE[i].Rev
}
//fmt.Printf(" epoch: @%s root: %s\n", epoch, root)
// get to rootObj (NOTE @head, because it is ΔBtail.GetAt requirement)
if rootObj == nil && root != xbtree.VDEL {
zconn := zf.PJar()
xrootObj, err := zconn.Get(ctx, root)
if err != nil {
return zodb.InvalidTid, false, err
}
var ok bool
rootObj, ok = xrootObj.(*btree.LOBTree)
if !ok {
return zodb.InvalidTid, false, fmt.Errorf("blktab: expect LOBTree; got %s", xzodb.TypeOf(xrootObj))
}
}
// XXX take epochs into account
var zblkOid zodb.Oid
var tabRev zodb.Tid
var tabRevExact, ok bool
if rootObj != nil {
zblkOid, ok, tabRev, tabRevExact, err = δFtail.δBtail.GetAt(ctx, rootObj, blk, at)
//fmt.Printf(" GetAt #%d @%s -> %s, %v, @%s, %v\n", blk, at, zblkOid, ok, tabRev, tabRevExact)
if err != nil {
return zodb.InvalidTid, false, err
}
}
if tabRev < epoch {
tabRev = epoch
tabRevExact = true
}
// block was removed
// XXX or not in tracked set?
if !ok {
return tabRev, tabRevExact
return tabRev, tabRevExact, nil
}
// blktab[blk] was changed to point to a zblk @rev.
// blk revision is max rev and when zblk changed last in (rev, at] range.
zblkRev, zblkRevExact := δFtail.δBtail.ΔZtail().LastRevOf(zblkOid, at)
//fmt.Printf("ZRevOf %s @%s -> @%s, %v\n", zblkOid, at, zblkRev, zblkRevExact)
//fmt.Printf(" ZRevOf %s @%s -> @%s, %v\n", zblkOid, at, zblkRev, zblkRevExact)
if zblkRev > tabRev {
return zblkRev, zblkRevExact
return zblkRev, zblkRevExact, nil
} else {
return tabRev, tabRevExact, nil
}
}
// ----------------------------------------
// zfilediff returns direct difference for ZBigFile<foid> old..new .
type _ΔZBigFile struct {
blksizeOld, blksizeNew int64
blktabOld, blktabNew zodb.Oid
}
func zfilediff(db *zodb.DB, foid zodb.Oid, old, new zodb.Tid) (δ *_ΔZBigFile, err error) {
txn, ctx := transaction.New(context.TODO()) // XXX - merge in ctx arg?
defer txn.Abort()
zconnOld, err := db.Open(ctx, &zodb.ConnOptions{At: old})
if err != nil {
return nil, err
}
zconnNew, err := db.Open(ctx, &zodb.ConnOptions{At: new})
if err != nil {
return nil, err
}
a, err1 := zgetFileOrNil(ctx, zconnOld, foid)
b, err2 := zgetFileOrNil(ctx, zconnNew, foid)
err = xerr.Merge(err1, err2)
if err != nil {
return nil, err
}
return diffF(ctx, a, b)
}
// diffF returns direct difference in between two ZBigFile objects.
func diffF(ctx context.Context, a, b *ZBigFile) (δ *_ΔZBigFile, err error) {
defer xerr.Contextf(&err, "diffF %s %s", xzodb.XidOf(a), xzodb.XidOf(b))
δ = &_ΔZBigFile{}
if a == nil {
δ.blksizeOld = -1
δ.blktabOld = xbtree.VDEL
} else {
err = a.PActivate(ctx); if err != nil { return nil, err }
defer a.PDeactivate()
δ.blksizeOld = a.blksize
δ.blktabOld = a.blktab.POid()
}
if b == nil {
δ.blksizeNew = -1
δ.blktabNew = xbtree.VDEL
} else {
return tabRev, tabRevExact
err = b.PActivate(ctx); if err != nil { return nil, err }
defer b.PDeactivate()
δ.blksizeNew = b.blksize
δ.blktabNew = b.blktab.POid()
}
// return δ=nil if no change
if δ.blksizeOld == δ.blksizeNew && δ.blktabOld == δ.blktabNew {
δ = nil
}
return δ, nil
}
// zgetFileOrNil returns ZBigFile corresponding to zconn.Get(oid) .
// if the file does not exist, (nil, ok) is returned.
func zgetFileOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (zfile *ZBigFile, err error) {
defer xerr.Contextf(&err, "getfile %s@%s", oid, zconn.At())
xfile, err := xzodb.ZGetOrNil(ctx, zconn, oid)
if xfile == nil || err != nil {
return nil, err
}
zfile, ok := xfile.(*ZBigFile)
if !ok {
return nil, fmt.Errorf("unexpected type: %s", zodb.ClassOf(xfile))
}
return zfile, nil
}
......@@ -97,7 +97,8 @@ func TestΔFtail(t *testing.T) {
{δT{5:e}, δD()},
{δT{}, δD(i)},
// XXX more
// XXX text
{nil, nil},
// ---- found by TestΔFtailRandom ----
......@@ -174,7 +175,29 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
t := xbtreetest.NewT(t_)
X := exc.Raiseif
δftail := NewΔFtail(t.Head().At, t.DB)
xat := map[zodb.Tid]string{} // tid > "at<i>"
// start δFtail when zfile does not yet exists
// this way we'll verify how ΔFtail rebuilds vδE for started-to-be-tracked file
t0 := t.CommitTree("øf")
xat[t0.At] = "at0"
t.Logf("# @at0 (%s)", t0.At)
δFtail := NewΔFtail(t.Head().At, t.DB)
// load dataTab
dataTab := map[string]string{} // ZBlk<name> -> data
for /*oid*/_, zblki := range t.Head().ZBlkTab {
dataTab[zblki.Name] = zblki.Data
}
// create zfile, but do not track it yet
t1 := t.CommitTree(fmt.Sprintf("t0:a D%s", dataTabTxt(dataTab)))
xat[t1.At] = "at1"
t.Logf("# → @at1 (%s) %s\t; not-yet-tracked", t1.At, t1.Tree)
δF, err := δFtail.Update(t1.ΔZ); X(err)
if !(δF.Rev == t1.At && len(δF.ByFile) == 0) {
t.Errorf("wrong δF:\nhave {%s, %v}\nwant: {%s, ø}", δF.Rev, δF.ByFile, t1.At)
}
// load zfile via root['treegen/file']
txn, ctx := transaction.New(context.Background())
......@@ -184,41 +207,57 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
err = zroot.PActivate(ctx); X(err)
zfile := zroot.Data["treegen/file"].(*ZBigFile)
zroot.PDeactivate()
zfileOid := zfile.POid()
foid := zfile.POid()
err = zfile.PActivate(ctx); X(err)
blksize := zfile.blksize
blktabOid := zfile.blktab.POid()
if blktabOid != t.Root() {
t.Fatalf("BUG: zfile.blktab (%s) != treeroot (%s)", blktabOid, t.Root())
}
zfile.PDeactivate()
// track zfile[-∞,∞) from the beginning
// start track zfile[0,∞) from the beginning
// this should make ΔFtail to see all zfile changes
size, path, err := zfile.Size(ctx); X(err)
δftail.Track(zfile, /*blk*/-1, path, /*zblk*/nil)
if size != 0 {
t.Fatalf("BUG: zfile is not initially empty: size=%d", size)
δFtail.Track(zfile, /*blk*/-1, path, /*zblk*/nil)
if sizeOK := 1*blksize; size != sizeOK { // NOTE maches t1 commit
t.Fatalf("BUG: zfile size: have %d ; want %d", size, sizeOK)
}
// data built via applying changes from testv
vδf := []*ΔFile{} // (rev↑, {}blk)
blkTab := map[int64]string{} // #blk -> ZBlk<name>
dataTab := map[string]string{} // ZBlk<name> -> data
vδf := []*ΔFile{ // (rev↑, {}blk)
{Rev: t1.At, Epoch: true},
}
vδE := []_ΔFileEpoch{ // (rev↑, EPOCH)
{
Rev: t1.At,
oldRoot: zodb.InvalidOid,
newRoot: blktabOid,
newBlkSize: blksize,
oldTrackSetZBlk: nil,
},
}
blkTab := map[int64]string{0:"a"} // #blk -> ZBlk<name>
Zinblk := map[string]setI64{} // ZBlk<name> -> which #blk refer to it
blkRevAt := map[zodb.Tid]map[int64]zodb.Tid{} // {} at -> {} #blk -> rev
// initialize dataTab from root['treegen/values']
for /*oid*/_, zblki := range t.Head().ZBlkTab {
dataTab[zblki.Name] = zblki.Data
// retrack should be called after new epoch to track zfile[-∞,∞) again
retrack := func() {
for blk := range blkTab {
_, path, zblk, _, err := zfile.LoadBlk(ctx, blk); X(err)
δFtail.Track(zfile, blk, path, zblk)
}
}
xat := map[zodb.Tid]string{} // tid > "at<i>"
xat[δftail.Head()] = "at0"
t.Logf("# @at0 (%s)", δftail.Head())
epochv := []zodb.Tid{t0.At, t1.At}
// δfstr/vδfstr converts δf/vδf to string taking xat into account
δfstr := func(δf *ΔFile) string {
s := fmt.Sprintf("@%s·%s", xat[δf.Rev], δf.Blocks)
if δf.Epoch {
s += "E"
}
if δf.Size {
s += "S"
}
......@@ -233,73 +272,130 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
}
i := 0
i := 1 // matches t1
delfilePrev := false
for test := range testq {
i++
δblk := setI64{}
δtree := false
delfile := false
// rebuild blkTab/Zinblk
for blk, zblk := range test.δblkTab {
zprev, ok := blkTab[blk]
if ok {
delete(Zinblk[zprev], blk)
} else {
zprev = ø
// command to delete zfile
if test.δblkTab == nil && test.δdataTab == nil {
delfile = true
}
// new epoch starts when file is deleted or recreated
newEpoch := delfile || (!delfile && delfile != delfilePrev)
delfilePrev = delfile
ZinblkPrev := map[string]setI64{}
for zblk, inblk := range Zinblk {
ZinblkPrev[zblk] = inblk.Clone()
}
// newEpoch -> reset
if newEpoch {
blkTab = map[int64]string{}
Zinblk = map[string]setI64{}
δblk = nil
} else {
// rebuild blkTab/Zinblk
for blk, zblk := range test.δblkTab {
zprev, ok := blkTab[blk]
if ok {
delete(Zinblk[zprev], blk)
} else {
zprev = ø
}
if zblk != ø {
blkTab[blk] = zblk
inblk, ok := Zinblk[zblk]
if !ok {
inblk = setI64{}
Zinblk[zblk] = inblk
}
inblk.Add(blk)
} else {
delete(blkTab, blk)
}
// update δblk due to change in blkTab
if zblk != zprev {
δblk.Add(blk)
δtree = true
}
}
if zblk != ø {
blkTab[blk] = zblk
inblk, ok := Zinblk[zblk]
// rebuild dataTab
for zblk := range test.δdataTab {
data, ok := dataTab[zblk] // e.g. a -> a2
if !ok {
inblk = setI64{}
Zinblk[zblk] = inblk
t.Fatalf("BUG: blk %s not in dataTab\ndataTab: %v", zblk, dataTab)
}
inblk.Add(blk)
} else {
delete(blkTab, blk)
}
data = fmt.Sprintf("%s%d", data[:1], i) // e.g. a4
dataTab[zblk] = data
// update δblk due to change in blkTab
if zblk != zprev {
δblk.Add(blk)
δtree = true
// update δblk due to change in ZBlk data
for blk := range Zinblk[zblk] {
δblk.Add(blk)
}
}
}
// rebuild dataTab
for zblk := range test.δdataTab {
data, ok := dataTab[zblk] // e.g. a -> a2
if !ok {
t.Fatalf("BUG: blk %s not in dataTab\ndataTab: %v", zblk, dataTab)
}
data = fmt.Sprintf("%s%d", data[:1], i) // e.g. a4
dataTab[zblk] = data
// update δblk due to change in ZBlk data
for blk := range Zinblk[zblk] {
δblk.Add(blk)
}
// commit updated zfile / blkTab + dataTab
var req string
if delfile {
req = "øf"
} else {
tTxt := "t" + xbtreetest.KVTxt(blkTab)
dTxt := "D" + dataTabTxt(dataTab)
req = tTxt + " " + dTxt
}
commit := t.CommitTree(req)
if newEpoch {
epochv = append(epochv, commit.At)
}
xat[commit.At] = fmt.Sprintf("at%d", i)
flags := ""
if newEpoch {
flags += "\tEPOCH"
}
t.Logf("# → @%s (%s) δT%s δD%s\t; %s\tδ%s%s", xat[commit.At], commit.At, xbtreetest.KVTxt(test.δblkTab), test.δdataTab, commit.Tree, δblk, flags)
//t.Logf("# vδf: %s", vδfstr(vδf))
// commit updated blkTab + dataTab
tTxt := "t" + xbtreetest.KVTxt(blkTab)
dTxt := "D" + dataTabTxt(dataTab)
commit := t.CommitTree(tTxt + " " + dTxt)
// update blkRevAt
var blkRevPrev map[int64]zodb.Tid
if i != 0 {
blkRevPrev = blkRevAt[δftail.Head()]
blkRevPrev = blkRevAt[δFtail.Head()]
}
blkRev := map[int64]zodb.Tid{}
for blk, rev := range blkRevPrev {
blkRev[blk] = rev
if newEpoch {
blkRev[blk] = commit.At
} else {
blkRev[blk] = rev
}
}
for blk := range δblk {
blkRev[blk] = commit.At
}
blkRevAt[commit.At] = blkRev
if false {
fmt.Printf("blkRevAt[@%s]:\n", xat[commit.At])
blkv := []int64{}
for blk := range blkRev {
blkv = append(blkv, blk)
}
sort.Slice(blkv, func(i, j int) bool {
return blkv[i] < blkv[j]
})
for _, blk := range blkv {
fmt.Printf(" #%d: %v\n", blk, blkRev[blk])
}
}
// update zfile
txn.Abort()
......@@ -307,23 +403,41 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
err = zconn.Resync(ctx, commit.At); X(err)
var δfok *ΔFile
if len(δblk) != 0 {
if newEpoch || len(δblk) != 0 {
δfok = &ΔFile{
Rev: commit.At,
Epoch: newEpoch,
Blocks: δblk,
Size: δtree, // not strictly ok, but matches current ΔFtail code
}
vδf = append(vδf, δfok)
}
xat[commit.At] = fmt.Sprintf("at%d", i)
t.Logf("# → @%s (%s) δT%s δD%s\t; %s\tδ%s", xat[commit.At], commit.At, xbtreetest.KVTxt(test.δblkTab), test.δdataTab, tTxt, δblk)
//t.Logf("# vδf: %s", vδfstr(vδf))
if newEpoch {
δE := _ΔFileEpoch{Rev: commit.At}
if delfile {
δE.oldRoot = blktabOid
δE.newRoot = zodb.InvalidOid
δE.newBlkSize = -1
// XXX oldBlkSize ?
} else {
δE.oldRoot = zodb.InvalidOid
δE.newRoot = blktabOid
δE.newBlkSize = blksize
// XXX oldBlkSize ?
}
oldTrackSetZBlk := map[zodb.Oid]setI64{}
for zblk, inblk := range ZinblkPrev {
oid, _ := commit.XGetBlkByName(zblk)
oldTrackSetZBlk[oid] = inblk
}
δE.oldTrackSetZBlk = oldTrackSetZBlk
vδE = append(vδE, δE)
}
//fmt.Printf("Zinblk: %v\n", Zinblk)
// update δftail
δF, err := δftail.Update(commit.ΔZ); X(err)
// update δFtail
δF, err := δFtail.Update(commit.ΔZ); X(err)
// assert δF points to the file if δfok != ø
if δF.Rev != commit.At {
......@@ -335,7 +449,7 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
}
δfilesOK := setOid{}
if δfok != nil {
δfilesOK.Add(zfileOid)
δfilesOK.Add(foid)
}
if !δfiles.Equal(δfilesOK) {
t.Errorf("wrong δF.ByFile:\nhave keys: %s\nwant keys: %s", δfiles, δfilesOK)
......@@ -343,14 +457,19 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
}
// verify δf
δf := δF.ByFile[zfileOid]
δf := δF.ByFile[foid]
if !reflect.DeepEqual(δf, δfok) {
t.Errorf("δf:\nhave: %v\nwant: %v", δf, δfok)
}
// verify δftail.trackSetZBlk
// track whole zfile again if new epoch was started
if newEpoch {
retrack()
}
// verify δFtail.trackSetZBlk
trackZinblk := map[string]setI64{}
for oid, zt := range δftail.trackSetZBlk {
for oid, zt := range δFtail.trackSetZBlk {
zblki := commit.ZBlkTab[oid]
for root, blocks := range zt.inroot {
if root != blktabOid {
......@@ -375,12 +494,35 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
if len(vδf) >= ncut {
revcut := vδf[0].Rev
t.Logf("# forget ≤ @%s", xat[revcut])
δftail.ForgetPast(revcut)
δFtail.ForgetPast(revcut)
vδf = vδf[1:]
//t.Logf("# vδf: %s", vδfstr(vδf))
//t.Logf("# vδt: %s", vδfstr(δftail.SliceByFileRev(zfile, δftail.Tail(), δftail.Head())))
//t.Logf("# vδt: %s", vδfstr(δFtail.SliceByFileRev(zfile, δFtail.Tail(), δFtail.Head())))
icut := 0;
for ; icut < len(vδE); icut++ {
if vδE[icut].Rev > revcut {
break
}
}
vδE = vδE[icut:]
}
// verify δftail.root
δftail := δFtail.byFile[foid]
rootOK := blktabOid
if delfile {
rootOK = zodb.InvalidOid
}
if δftail.root != rootOK {
t.Errorf(".root: have %s ; want %s", δftail.root, rootOK)
}
// verify vδE
if !reflect.DeepEqual(δftail.vδE, vδE) {
t.Errorf("vδE:\nhave: %v\nwant: %v", δftail.vδE, vδE)
}
// SliceByFileRev
for j := 0; j < len(vδf); j++ {
for k := j; k < len(vδf); k++ {
......@@ -393,7 +535,7 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
hi := vδf[k].Rev
vδf_ok := vδf[j:k+1] // [j,k]
vδf_ := δftail.SliceByFileRev(zfile, lo, hi)
vδf_ := δFtail.SliceByFileRev(zfile, lo, hi)
if !reflect.DeepEqual(vδf_, vδf_ok) {
t.Errorf("slice (@%s,@%s]:\nhave: %v\nwant: %v", xat[lo], xat[hi], vδfstr(vδf_), vδfstr(vδf_ok))
}
......@@ -418,10 +560,20 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
at := vδf[j].Rev
blkRev := blkRevAt[at]
for _, blk := range blkv {
rev, exact := δftail.LastBlkRev(ctx, zfile, blk, at)
revOK, exactOK := blkRev[blk], true
if revOK <= δftail.Tail() {
revOK, exactOK = δftail.Tail(), false
rev, exact := δFtail.LastBlkRev(ctx, zfile, blk, at)
revOK, ok := blkRev[blk]
if !ok {
k := len(epochv) - 1
for ; k >= 0; k-- {
if epochv[k] <= at {
break
}
}
revOK = epochv[k]
}
exactOK := true
if revOK <= δFtail.Tail() {
revOK, exactOK = δFtail.Tail(), false
}
if !(rev == revOK && exact == exactOK) {
t.Errorf("blkrev #%d @%s:\nhave: @%s, %v\nwant: @%s, %v", blk, xat[at], xat[rev], exact, xat[revOK], exactOK)
......
......@@ -882,28 +882,40 @@ retry:
sort.Slice(blkv, func(i, j int) bool {
return blkv[i] < blkv[j]
})
size := " "
flags := ""
if δfile.Size {
size = "S"
flags += "S"
}
log.Infof("S: \t- %s\t%s %v\n", foid, size, blkv)
if δfile.Epoch {
flags += "E"
}
log.Infof("S: \t- %s\t%2s %v\n", foid, flags, blkv)
}
log.Infof("\n\n")
}
// invalidate kernel cache for file data
wg := xsync.NewWorkGroup(ctx)
for foid, δfile := range δF.ByFile {
// // XXX needed?
// // XXX even though δBtail is complete, not all ZBlk are present here
// file.δtail.Append(δF.Rev, δfile.Blocks.Elements())
// file was requested to be tracked -> it must be present in fileTab
file := bfdir.fileTab[foid]
for blk := range δfile.Blocks {
blk := blk
wg.Go(func(ctx context.Context) error {
return file.invalidateBlk(ctx, blk)
})
if δfile.Epoch {
// XXX while invalidating whole file at epoch is easy,
// it becomes not so easy to handle isolation if epochs
// could be present. For this reason we forbid changes
// to ZBigFile objects for now.
return fmt.Errorf("ZBigFile<%s> changed @%s", foid, δF.Rev)
// wg.Go(func(ctx context.Context) error {
// return file.invalidateAll() // NOTE does not accept ctx
// })
} else {
for blk := range δfile.Blocks {
blk := blk
wg.Go(func(ctx context.Context) error {
return file.invalidateBlk(ctx, blk)
})
}
}
}
err = wg.Wait()
......@@ -1112,6 +1124,21 @@ func (f *BigFile) invalidateAttr() (err error) {
return nil
}
// invalidateAll invalidates file attributes and all file data in kernel cache.
//
// complements invalidateAttr and invalidateBlk and is used to completely reset
// kernel file cache on ΔFtail epoch.
// called with zheadMu wlocked.
func (f *BigFile) invalidateAll() (err error) {
defer xerr.Contextf(&err, "%s: invalidate all", f.path())
fsconn := gfsconn
st := fsconn.FileNotify(f.Inode(), 0, -1) // metadata + all data
if st != fuse.OK {
return syscall.Errno(st)
}
return nil
}
// lockRevFile makes sure inode ID of /@<rev>/bigfile/<fid> is known to kernel
// and won't change until unlock.
......@@ -1291,7 +1318,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
// and thus would trigger DB access again.
//
// TODO if direct-io: don't touch pagecache
// TODO upload parts only not covered by currrent read (not to e.g. wait for page lock)
// TODO upload parts only not covered by current read (not to e.g. wait for page lock)
// TODO skip upload completely if read is wide to cover whole blksize
go f.uploadBlk(blk, loading)
......@@ -1681,7 +1708,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// rlocked during pin setup.
//
// δ δ
// ----x----.------------]----x----
// ────x────.────────────]────x────
// ↑ ↑
// w.at head
//
......@@ -1700,6 +1727,21 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
δFtail := bfdir.δFtail
for _, δfile := range δFtail.SliceByFileRev(f.zfile, at, headAt) { // XXX locking δFtail
if δfile.Epoch {
// file epochs are currently forbidden (see watcher), so the only
// case when we could see an epoch here is creation of
// the file if w.at is before that time:
//
// create file
// ────.────────x────────]────
// ↑ ↑
// w.at head
//
// but then the file should not be normally accessed in that case.
//
// -> reject such watches with an error
return fmt.Errorf("file epoch detected @%s in between (at,head=@%s]", δfile.Rev, headAt)
}
for blk := range δfile.Blocks {
_, already := toPin[blk]
if already {
......@@ -1714,7 +1756,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// XXX adjust wcfs tests to not require only accessed
// blocks to be in setup pins? But that would mean that
// potentially more blocks would be potentially
// _unneccessarily_ pinned if they are not going to be
// _unnecessarily_ pinned if they are not going to be
// accessed at all.
if !f.accessed.Has(blk) {
continue
......@@ -2088,7 +2130,7 @@ func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) {
root.revMu.Unlock()
if already {
// XXX race wrt simlutaneous "FORGET @<rev>" ?
// XXX race wrt simultaneous "FORGET @<rev>" ?
return revDir, nil
}
......@@ -2533,7 +2575,7 @@ func _main() (err error) {
}
// wait for unmount
// XXX the kernel does not sentd FORGETs on unmount - release left node resources ourselves?
// XXX the kernel does not send FORGETs on unmount - release left node resources ourselves?
<-serveCtx.Done()
log.Infof("stop %q %q", mntpt, zurl)
return nil // XXX serveErr | zwatchErr ?
......
......@@ -1132,7 +1132,8 @@ def _expectPin(twlink, ctx, zf, expect): # -> []SrvReq
# _blkDataAt returns expected zf[blk] data and its revision as of @at database state.
#
# If the block is hole - (b'', at0) is returned. XXX -> @z64?
# XXX ret for when the file did not existed at all? blk was after file size?
# XXX ret for when the file did not existed at all?
# XXX ret ----//---- blk was after file size?
@func(tDB)
def _blkDataAt(t, zf, blk, at): # -> (data, rev)
if at is None:
......@@ -1417,7 +1418,7 @@ def test_wcfs_watch_robust():
wl.close()
# verify that `watch file @at` -> error, for @at when file did not existed.
@xfail # check that file exists @at
@xfail # check that file exists @at XXX
@func
def test_wcfs_watch_before_create():
t = tDB(); zf = t.zfile
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment