Commit 30f5ddc7 authored by Kirill Smelkov's avatar Kirill Smelkov

ΔFtail += .Epoch in δf

Unfortunately I could not avoid computing diff for ZBigFile objects
themselves, because after recent ΔFtail and ΔZtail fixes, wcfs tests
started to fail becase in the following scenario

    δFtail is created                        @at0
    file is created                          @at1
    file starts to be tracked when δFtail is @at2
    δFtail query is made about file block -> boom reports @at0, because the file does not exist there at all

To fix this ΔFtail now detects when ZBigFile objects are changed
themselves and indicate such a change with specifal δf with
δf.Epoch=true .

* t2+δfepoch: (38 commits)
  X ΔFtail: Rebuild vδE after first track
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  ...
parents 7547a4d2 bf3ace66
......@@ -453,8 +453,8 @@ func diffX(ctx context.Context, a, b Node, δZTC setOid, trackSet blib.PPTreeSub
// a, b point to top of subtrees @old and @new revisions.
// δZTC is connected set of objects covering δZT (objects changed in this tree in old..new).
func diffT(ctx context.Context, A, B *Tree, δZTC setOid, trackSet blib.PPTreeSubSet) (δ map[Key]ΔValue, δtrack *blib.ΔPPTreeSubSet, δtkeycov *blib.RangedKeySet, err error) {
tracefDiff(" diffT %s %s\n", xidOf(A), xidOf(B))
defer xerr.Contextf(&err, "diffT %s %s", xidOf(A), xidOf(B))
tracefDiff(" diffT %s %s\n", xzodb.XidOf(A), xzodb.XidOf(B))
defer xerr.Contextf(&err, "diffT %s %s", xzodb.XidOf(A), xzodb.XidOf(B))
δ = map[Key]ΔValue{}
δtrack = blib.NewΔPPTreeSubSet()
......@@ -887,8 +887,8 @@ func δMerge(δ, δ2 map[Key]ΔValue) error {
// diffB computes difference in between two buckets.
// see diffX for details.
func diffB(ctx context.Context, a, b *Bucket) (δ map[Key]ΔValue, err error) {
tracefDiff(" diffB %s %s\n", xidOf(a), xidOf(b))
defer xerr.Contextf(&err, "diffB %s %s", xidOf(a), xidOf(b))
tracefDiff(" diffB %s %s\n", xzodb.XidOf(a), xzodb.XidOf(b))
defer xerr.Contextf(&err, "diffB %s %s", xzodb.XidOf(a), xzodb.XidOf(b))
// XXX oid can be InvalidOid for T/B... (i.e. B is part of T and is not yet committed separately)
var av []BucketEntry
......@@ -952,13 +952,10 @@ func diffB(ctx context.Context, a, b *Bucket) (δ map[Key]ΔValue, err error) {
// zgetNodeOrNil returns btree node corresponding to zconn.Get(oid) .
// if the node does not exist, (nil, ok) is returned.
func zgetNodeOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (_ Node, err error) {
func zgetNodeOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (node Node, err error) {
defer xerr.Contextf(&err, "getnode %s@%s", oid, zconn.At())
xnode, err := zconn.Get(ctx, oid)
if err != nil {
if xzodb.IsErrNoData(err) {
err = nil
}
xnode, err := xzodb.ZGetOrNil(ctx, zconn, oid)
if xnode == nil || err != nil {
return nil, err
}
......@@ -966,20 +963,6 @@ func zgetNodeOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (_
if !ok {
return nil, fmt.Errorf("unexpected type: %s", zodb.ClassOf(xnode))
}
// activate the node to find out it really exists
// after removal on storage, the object might have stayed in Connection
// cache due to e.g. PCachePinObject, and it will be PActivate that
// will return "deleted" error.
err = node.PActivate(ctx)
if err != nil {
if xzodb.IsErrNoData(err) {
return nil, nil
}
return nil, err
}
node.PDeactivate()
return node, nil
}
......@@ -993,15 +976,6 @@ func vOid(xvalue interface{}) (zodb.Oid, error) {
return value.POid(), nil
}
// xidOf return string representation of object xid.
func xidOf(obj zodb.IPersistent) string {
if obj == nil || reflect.ValueOf(obj).IsNil() {
return "ø"
}
xid := zodb.Xid{At: obj.PJar().At(), Oid: obj.POid()}
return xid.String()
}
func (rn *nodeInRange) String() string {
done := " "; if rn.done { done = "*" }
return fmt.Sprintf("%s%s%s", done, rn.keycov, vnode(rn.node))
......
......@@ -266,6 +266,16 @@ func (t *Commit) XGetBlkData(oid zodb.Oid) string {
return zblki.Data
}
// XGetBlkByName returns ZBlk info associated with ZBlk<name>
func (t *Commit) XGetBlkByName(name string) (zodb.Oid, ZBlkInfo) {
for oid, zblki := range t.ZBlkTab {
if zblki.Name == name {
return oid, zblki
}
}
panicf("ZBlk<%q> not found", name)
return zodb.InvalidOid, ZBlkInfo{} // XXX should be not needed
}
// xGetTree loads Tree from zurl@at->obj<root>.
//
......
......@@ -221,6 +221,18 @@ def TreesSrv(zstor, r):
xprint("%s" % ashex(head))
continue
# øf command to delete the file
if treetxt == "øf":
head = commitDelete(zfile, subj)
xprint("%s" % ashex(head))
continue
# make sure we continue with undeleted ztree/zfile
if deleted(ztree):
undelete(ztree)
if deleted(zfile):
undelete(zfile)
# t... D... commands to natively commit updates to tree and values
if treetxt.startswith('t'):
t, D = treetxt.split()
......@@ -480,8 +492,18 @@ def commitDelete(obj, description): # -> tid
# reset transaction to a new one
transaction.begin()
obj._v_deleted = True
return tid
# deleted reports whether obj was deleted via commitDelete.
def deleted(obj): # -> bool
return getattr(obj, '_v_deleted', False)
# undelete forces recreation for obj that was previously deleted via commitDelete.
def undelete(obj):
obj._p_changed = True
del obj._v_deleted
# ztreetxt returns text representation of a ZODB tree.
@func(ZCtx)
......
......@@ -305,24 +305,42 @@ func (δBtail *ΔBtail) rebuildAll() (err error) {
defer xerr.Context(&err, "ΔBtail rebuildAll")
// XXX locking
trackNewRoots := δBtail.trackNewRoots
tracefΔBtail("\nRebuildAll @%s..@%s trackNewRoots: %s\n", δBtail.Tail(), δBtail.Head(), δBtail.trackNewRoots)
tracefΔBtail("\nRebuildAll @%s..@%s trackNewRoots: %s\n", δBtail.Tail(), δBtail.Head(), trackNewRoots)
for root := range δBtail.trackNewRoots {
delete(δBtail.trackNewRoots, root)
δBtail.rebuild1(root)
}
for root := range trackNewRoots {
δTtail := δBtail.vδTbyRoot[root] // must be there
δtrackSet, δrevSet, err := δTtail.rebuild(root, δBtail.δZtail, δBtail.db)
if err != nil {
return err
}
δBtail.trackSet.UnionInplace(δtrackSet)
δBtail.vδBroots_Update(root, δrevSet)
return nil
}
// rebuild1IfNeeded rebuilds ΔBtail for single root if that root needs rebuilding.
func (δBtail *ΔBtail) rebuild1IfNeeded(root zodb.Oid) error {
// XXX locking
_, ok := δBtail.trackNewRoots[root]
if !ok {
return nil
}
δBtail.trackNewRoots = setOid{}
delete(δBtail.trackNewRoots, root)
return δBtail.rebuild1(root)
}
// rebuild1 rebuilds ΔBtail for single root.
func (δBtail *ΔBtail) rebuild1(root zodb.Oid) error {
// XXX locking
δTtail := δBtail.vδTbyRoot[root] // must be there
δtrackSet, δrevSet, err := δTtail.rebuild(root, δBtail.δZtail, δBtail.db)
if err != nil {
return err
}
δBtail.trackSet.UnionInplace(δtrackSet)
δBtail.vδBroots_Update(root, δrevSet)
return nil
}
// rebuild rebuilds ΔTtail taking trackNew requests into account.
//
// It returns:
......@@ -333,7 +351,7 @@ func (δBtail *ΔBtail) rebuildAll() (err error) {
//
// XXX place
func (δTtail *ΔTtail) rebuild(root zodb.Oid, δZtail *zodb.ΔTail, db *zodb.DB) (δtrackSet blib.PPTreeSubSet, δrevSet setTid, err error) {
defer xerr.Context(&err, "ΔTtail rebuild")
defer xerr.Contextf(&err, "ΔTtail<%s> rebuild", root)
// XXX locking
tracefΔBtail("\nRebuild %s @%s .. @%s\n", root, δZtail.Tail(), δZtail.Head())
......@@ -732,6 +750,7 @@ func (δBtail *ΔBtail) ForgetPast(revCut zodb.Tid) {
}
func (δTtail *ΔTtail) forgetPast(revCut zodb.Tid) {
// XXX locking
// XXX lastRevOf
icut := 0
......@@ -760,13 +779,18 @@ func (δBtail *ΔBtail) GetAt(ctx context.Context, root *Tree, key Key, at zodb.
// XXX key not tracked -> panic
// XXX at not ∈ (tail, head] -> panic
// XXX dirty -> rebuild
// XXX locking
rootAt := root.PJar().At()
if rootAt != δBtail.Head() {
panicf("δBtail: root.at (@%s) != head (@%s)", rootAt, δBtail.Head())
}
err = δBtail.rebuild1IfNeeded(root.POid())
if err != nil {
return
}
δTtail := δBtail.vδTbyRoot[root.POid()]
if δTtail == nil {
panicf("δBtail: root<%s> not tracked", root.POid())
......@@ -805,6 +829,9 @@ func (δBtail *ΔBtail) GetAt(ctx context.Context, root *Tree, key Key, at zodb.
// @tail[key] is not present - key was not changing in (tail, head].
// since at ∈ (tail, head] we can use @head[key] as the result
xvalue, ok, err := root.Get(ctx, key)
if !ok {
value = VDEL
}
if err != nil || !ok {
return
}
......@@ -837,7 +864,12 @@ func (δBtail *ΔBtail) GetAt(ctx context.Context, root *Tree, key Key, at zodb.
func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) /*readonly*/[]ΔTree {
xtail.AssertSlice(δBtail, lo, hi)
// XXX locking
// XXX rebuild
err := δBtail.rebuild1IfNeeded(root)
if err != nil {
panic(err) // XXX
}
δTtail, ok := δBtail.vδTbyRoot[root]
if !ok {
return []ΔTree{}
......@@ -874,6 +906,11 @@ func (δBtail *ΔBtail) ΔZtail() /*readonly*/*zodb.ΔTail {
return δBtail.δZtail
}
// DB returns database handle that δBtail is using to access ZODB.
func (δBtail *ΔBtail) DB() *zodb.DB {
return δBtail.db
}
func tracefΔBtail(format string, argv ...interface{}) {
if traceΔBtail {
......
......@@ -1260,17 +1260,6 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
t.Errorf("%s:\nhave: %s\nwant: %s", subj, have, want)
}
// zblkByName returns oid of ZBlk that has .Name == name
zblkByName := func(name string) zodb.Oid {
for oid, zblki := range t0.ZBlkTab {
if zblki.Name == name {
return oid
}
}
panicf("ZBlk<%q> not found", name)
return zodb.InvalidOid // XXX should be not needed
}
s00 := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At)
s01 := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At)
s02 := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At)
......@@ -1286,8 +1275,8 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
assertvδT("t2.s22", s22)
// sXX should be all aliased to vδT
gg := zblkByName("g")
hh := zblkByName("h")
gg, _ := t0.XGetBlkByName("g")
hh, _ := t0.XGetBlkByName("h")
vδT[0].Rev = t0.At; δkv0 := vδT[0].ΔKV; vδT[0].ΔKV = map[Key]ΔValue{11:{gg,gg}}
vδT[1].Rev = t0.At; δkv1 := vδT[1].ΔKV; vδT[1].ΔKV = map[Key]ΔValue{12:{hh,hh}}
assertvδT("t2.vδT*", vδT, ΔT{t0.At, δ{11:{g,g}}}, ΔT{t0.At, δ{12:{h,h}}})
......@@ -1331,8 +1320,8 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
assertvδT("t12.s22_", s22_)
// sXX_ should be all aliased to vδT, but not sXX
bb := zblkByName("b")
cc := zblkByName("c")
bb, _ := t0.XGetBlkByName("b")
cc, _ := t0.XGetBlkByName("c")
vδT[0].Rev = t0.At; δkv0 = vδT[0].ΔKV; vδT[0].ΔKV = map[Key]ΔValue{111:{bb,bb}}
vδT[1].Rev = t0.At; δkv1 = vδT[1].ΔKV; vδT[1].ΔKV = map[Key]ΔValue{112:{cc,cc}}
assertvδT("t12.vδT*", vδT, ΔT{t0.At, δ{111:{b,b}}}, ΔT{t0.At, δ{112:{c,c}}})
......
......@@ -24,8 +24,10 @@ import (
"context"
"errors"
"fmt"
"reflect"
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb"
......@@ -82,6 +84,33 @@ func ZOpen(ctx context.Context, zdb *zodb.DB, zopt *zodb.ConnOptions) (_ *ZConn,
}, nil
}
// ZGetOrNil returns zconn.Get(oid), or (nil,ok) if the object does not exist.
func ZGetOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (_ zodb.IPersistent, err error) {
defer xerr.Contextf(&err, "zget %s@%s", oid, zconn.At())
obj, err := zconn.Get(ctx, oid)
if err != nil {
if IsErrNoData(err) {
err = nil
}
return nil, err
}
// activate the object to find out it really exists
// after removal on storage, the object might have stayed in Connection
// cache due to e.g. PCachePinObject, and it will be PActivate that
// will return "deleted" error.
err = obj.PActivate(ctx)
if err != nil {
if IsErrNoData(err) {
return nil, nil
}
return nil, err
}
obj.PDeactivate()
return obj, nil
}
// IsErrNoData returns whether err is due to NoDataError or NoObjectError.
func IsErrNoData(err error) bool {
var eNoData *zodb.NoDataError
......@@ -96,3 +125,12 @@ func IsErrNoData(err error) bool {
return false
}
}
// XidOf return string representation of object xid.
func XidOf(obj zodb.IPersistent) string {
if obj == nil || reflect.ValueOf(obj).IsNil() {
return "ø"
}
xid := zodb.Xid{At: obj.PJar().At(), Oid: obj.POid()}
return xid.String()
}
This diff is collapsed.
This diff is collapsed.
......@@ -882,28 +882,40 @@ retry:
sort.Slice(blkv, func(i, j int) bool {
return blkv[i] < blkv[j]
})
size := " "
flags := ""
if δfile.Size {
size = "S"
flags += "S"
}
log.Infof("S: \t- %s\t%s %v\n", foid, size, blkv)
if δfile.Epoch {
flags += "E"
}
log.Infof("S: \t- %s\t%2s %v\n", foid, flags, blkv)
}
log.Infof("\n\n")
}
// invalidate kernel cache for file data
wg := xsync.NewWorkGroup(ctx)
for foid, δfile := range δF.ByFile {
// // XXX needed?
// // XXX even though δBtail is complete, not all ZBlk are present here
// file.δtail.Append(δF.Rev, δfile.Blocks.Elements())
// file was requested to be tracked -> it must be present in fileTab
file := bfdir.fileTab[foid]
for blk := range δfile.Blocks {
blk := blk
wg.Go(func(ctx context.Context) error {
return file.invalidateBlk(ctx, blk)
})
if δfile.Epoch {
// XXX while invalidating whole file at epoch is easy,
// it becomes not so easy to handle isolation if epochs
// could be present. For this reason we forbid changes
// to ZBigFile objects for now.
return fmt.Errorf("ZBigFile<%s> changed @%s", foid, δF.Rev)
// wg.Go(func(ctx context.Context) error {
// return file.invalidateAll() // NOTE does not accept ctx
// })
} else {
for blk := range δfile.Blocks {
blk := blk
wg.Go(func(ctx context.Context) error {
return file.invalidateBlk(ctx, blk)
})
}
}
}
err = wg.Wait()
......@@ -1112,6 +1124,21 @@ func (f *BigFile) invalidateAttr() (err error) {
return nil
}
// invalidateAll invalidates file attributes and all file data in kernel cache.
//
// complements invalidateAttr and invalidateBlk and is used to completely reset
// kernel file cache on ΔFtail epoch.
// called with zheadMu wlocked.
func (f *BigFile) invalidateAll() (err error) {
defer xerr.Contextf(&err, "%s: invalidate all", f.path())
fsconn := gfsconn
st := fsconn.FileNotify(f.Inode(), 0, -1) // metadata + all data
if st != fuse.OK {
return syscall.Errno(st)
}
return nil
}
// lockRevFile makes sure inode ID of /@<rev>/bigfile/<fid> is known to kernel
// and won't change until unlock.
......@@ -1291,7 +1318,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
// and thus would trigger DB access again.
//
// TODO if direct-io: don't touch pagecache
// TODO upload parts only not covered by currrent read (not to e.g. wait for page lock)
// TODO upload parts only not covered by current read (not to e.g. wait for page lock)
// TODO skip upload completely if read is wide to cover whole blksize
go f.uploadBlk(blk, loading)
......@@ -1681,7 +1708,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// rlocked during pin setup.
//
// δ δ
// ----x----.------------]----x----
// ────x────.────────────]────x────
// ↑ ↑
// w.at head
//
......@@ -1700,6 +1727,21 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
δFtail := bfdir.δFtail
for _, δfile := range δFtail.SliceByFileRev(f.zfile, at, headAt) { // XXX locking δFtail
if δfile.Epoch {
// file epochs are currently forbidden (see watcher), so the only
// case when we could see an epoch here is creation of
// the file if w.at is before that time:
//
// create file
// ────.────────x────────]────
// ↑ ↑
// w.at head
//
// but then the file should not be normally accessed in that case.
//
// -> reject such watches with an error
return fmt.Errorf("file epoch detected @%s in between (at,head=@%s]", δfile.Rev, headAt)
}
for blk := range δfile.Blocks {
_, already := toPin[blk]
if already {
......@@ -1714,7 +1756,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// XXX adjust wcfs tests to not require only accessed
// blocks to be in setup pins? But that would mean that
// potentially more blocks would be potentially
// _unneccessarily_ pinned if they are not going to be
// _unnecessarily_ pinned if they are not going to be
// accessed at all.
if !f.accessed.Has(blk) {
continue
......@@ -2088,7 +2130,7 @@ func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) {
root.revMu.Unlock()
if already {
// XXX race wrt simlutaneous "FORGET @<rev>" ?
// XXX race wrt simultaneous "FORGET @<rev>" ?
return revDir, nil
}
......@@ -2533,7 +2575,7 @@ func _main() (err error) {
}
// wait for unmount
// XXX the kernel does not sentd FORGETs on unmount - release left node resources ourselves?
// XXX the kernel does not send FORGETs on unmount - release left node resources ourselves?
<-serveCtx.Done()
log.Infof("stop %q %q", mntpt, zurl)
return nil // XXX serveErr | zwatchErr ?
......
......@@ -1132,7 +1132,8 @@ def _expectPin(twlink, ctx, zf, expect): # -> []SrvReq
# _blkDataAt returns expected zf[blk] data and its revision as of @at database state.
#
# If the block is hole - (b'', at0) is returned. XXX -> @z64?
# XXX ret for when the file did not existed at all? blk was after file size?
# XXX ret for when the file did not existed at all?
# XXX ret ----//---- blk was after file size?
@func(tDB)
def _blkDataAt(t, zf, blk, at): # -> (data, rev)
if at is None:
......@@ -1417,7 +1418,7 @@ def test_wcfs_watch_robust():
wl.close()
# verify that `watch file @at` -> error, for @at when file did not existed.
@xfail # check that file exists @at
@xfail # check that file exists @at XXX
@func
def test_wcfs_watch_before_create():
t = tDB(); zf = t.zfile
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment