Commit 30f5ddc7 authored by Kirill Smelkov's avatar Kirill Smelkov

ΔFtail += .Epoch in δf

Unfortunately I could not avoid computing diff for ZBigFile objects
themselves, because after recent ΔFtail and ΔZtail fixes, wcfs tests
started to fail becase in the following scenario

    δFtail is created                        @at0
    file is created                          @at1
    file starts to be tracked when δFtail is @at2
    δFtail query is made about file block -> boom reports @at0, because the file does not exist there at all

To fix this ΔFtail now detects when ZBigFile objects are changed
themselves and indicate such a change with specifal δf with
δf.Epoch=true .

* t2+δfepoch: (38 commits)
  X ΔFtail: Rebuild vδE after first track
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  ...
parents 7547a4d2 bf3ace66
...@@ -453,8 +453,8 @@ func diffX(ctx context.Context, a, b Node, δZTC setOid, trackSet blib.PPTreeSub ...@@ -453,8 +453,8 @@ func diffX(ctx context.Context, a, b Node, δZTC setOid, trackSet blib.PPTreeSub
// a, b point to top of subtrees @old and @new revisions. // a, b point to top of subtrees @old and @new revisions.
// δZTC is connected set of objects covering δZT (objects changed in this tree in old..new). // δZTC is connected set of objects covering δZT (objects changed in this tree in old..new).
func diffT(ctx context.Context, A, B *Tree, δZTC setOid, trackSet blib.PPTreeSubSet) (δ map[Key]ΔValue, δtrack *blib.ΔPPTreeSubSet, δtkeycov *blib.RangedKeySet, err error) { func diffT(ctx context.Context, A, B *Tree, δZTC setOid, trackSet blib.PPTreeSubSet) (δ map[Key]ΔValue, δtrack *blib.ΔPPTreeSubSet, δtkeycov *blib.RangedKeySet, err error) {
tracefDiff(" diffT %s %s\n", xidOf(A), xidOf(B)) tracefDiff(" diffT %s %s\n", xzodb.XidOf(A), xzodb.XidOf(B))
defer xerr.Contextf(&err, "diffT %s %s", xidOf(A), xidOf(B)) defer xerr.Contextf(&err, "diffT %s %s", xzodb.XidOf(A), xzodb.XidOf(B))
δ = map[Key]ΔValue{} δ = map[Key]ΔValue{}
δtrack = blib.NewΔPPTreeSubSet() δtrack = blib.NewΔPPTreeSubSet()
...@@ -887,8 +887,8 @@ func δMerge(δ, δ2 map[Key]ΔValue) error { ...@@ -887,8 +887,8 @@ func δMerge(δ, δ2 map[Key]ΔValue) error {
// diffB computes difference in between two buckets. // diffB computes difference in between two buckets.
// see diffX for details. // see diffX for details.
func diffB(ctx context.Context, a, b *Bucket) (δ map[Key]ΔValue, err error) { func diffB(ctx context.Context, a, b *Bucket) (δ map[Key]ΔValue, err error) {
tracefDiff(" diffB %s %s\n", xidOf(a), xidOf(b)) tracefDiff(" diffB %s %s\n", xzodb.XidOf(a), xzodb.XidOf(b))
defer xerr.Contextf(&err, "diffB %s %s", xidOf(a), xidOf(b)) defer xerr.Contextf(&err, "diffB %s %s", xzodb.XidOf(a), xzodb.XidOf(b))
// XXX oid can be InvalidOid for T/B... (i.e. B is part of T and is not yet committed separately) // XXX oid can be InvalidOid for T/B... (i.e. B is part of T and is not yet committed separately)
var av []BucketEntry var av []BucketEntry
...@@ -952,13 +952,10 @@ func diffB(ctx context.Context, a, b *Bucket) (δ map[Key]ΔValue, err error) { ...@@ -952,13 +952,10 @@ func diffB(ctx context.Context, a, b *Bucket) (δ map[Key]ΔValue, err error) {
// zgetNodeOrNil returns btree node corresponding to zconn.Get(oid) . // zgetNodeOrNil returns btree node corresponding to zconn.Get(oid) .
// if the node does not exist, (nil, ok) is returned. // if the node does not exist, (nil, ok) is returned.
func zgetNodeOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (_ Node, err error) { func zgetNodeOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (node Node, err error) {
defer xerr.Contextf(&err, "getnode %s@%s", oid, zconn.At()) defer xerr.Contextf(&err, "getnode %s@%s", oid, zconn.At())
xnode, err := zconn.Get(ctx, oid) xnode, err := xzodb.ZGetOrNil(ctx, zconn, oid)
if err != nil { if xnode == nil || err != nil {
if xzodb.IsErrNoData(err) {
err = nil
}
return nil, err return nil, err
} }
...@@ -966,20 +963,6 @@ func zgetNodeOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (_ ...@@ -966,20 +963,6 @@ func zgetNodeOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (_
if !ok { if !ok {
return nil, fmt.Errorf("unexpected type: %s", zodb.ClassOf(xnode)) return nil, fmt.Errorf("unexpected type: %s", zodb.ClassOf(xnode))
} }
// activate the node to find out it really exists
// after removal on storage, the object might have stayed in Connection
// cache due to e.g. PCachePinObject, and it will be PActivate that
// will return "deleted" error.
err = node.PActivate(ctx)
if err != nil {
if xzodb.IsErrNoData(err) {
return nil, nil
}
return nil, err
}
node.PDeactivate()
return node, nil return node, nil
} }
...@@ -993,15 +976,6 @@ func vOid(xvalue interface{}) (zodb.Oid, error) { ...@@ -993,15 +976,6 @@ func vOid(xvalue interface{}) (zodb.Oid, error) {
return value.POid(), nil return value.POid(), nil
} }
// xidOf return string representation of object xid.
func xidOf(obj zodb.IPersistent) string {
if obj == nil || reflect.ValueOf(obj).IsNil() {
return "ø"
}
xid := zodb.Xid{At: obj.PJar().At(), Oid: obj.POid()}
return xid.String()
}
func (rn *nodeInRange) String() string { func (rn *nodeInRange) String() string {
done := " "; if rn.done { done = "*" } done := " "; if rn.done { done = "*" }
return fmt.Sprintf("%s%s%s", done, rn.keycov, vnode(rn.node)) return fmt.Sprintf("%s%s%s", done, rn.keycov, vnode(rn.node))
......
...@@ -266,6 +266,16 @@ func (t *Commit) XGetBlkData(oid zodb.Oid) string { ...@@ -266,6 +266,16 @@ func (t *Commit) XGetBlkData(oid zodb.Oid) string {
return zblki.Data return zblki.Data
} }
// XGetBlkByName returns ZBlk info associated with ZBlk<name>
func (t *Commit) XGetBlkByName(name string) (zodb.Oid, ZBlkInfo) {
for oid, zblki := range t.ZBlkTab {
if zblki.Name == name {
return oid, zblki
}
}
panicf("ZBlk<%q> not found", name)
return zodb.InvalidOid, ZBlkInfo{} // XXX should be not needed
}
// xGetTree loads Tree from zurl@at->obj<root>. // xGetTree loads Tree from zurl@at->obj<root>.
// //
......
...@@ -221,6 +221,18 @@ def TreesSrv(zstor, r): ...@@ -221,6 +221,18 @@ def TreesSrv(zstor, r):
xprint("%s" % ashex(head)) xprint("%s" % ashex(head))
continue continue
# øf command to delete the file
if treetxt == "øf":
head = commitDelete(zfile, subj)
xprint("%s" % ashex(head))
continue
# make sure we continue with undeleted ztree/zfile
if deleted(ztree):
undelete(ztree)
if deleted(zfile):
undelete(zfile)
# t... D... commands to natively commit updates to tree and values # t... D... commands to natively commit updates to tree and values
if treetxt.startswith('t'): if treetxt.startswith('t'):
t, D = treetxt.split() t, D = treetxt.split()
...@@ -480,8 +492,18 @@ def commitDelete(obj, description): # -> tid ...@@ -480,8 +492,18 @@ def commitDelete(obj, description): # -> tid
# reset transaction to a new one # reset transaction to a new one
transaction.begin() transaction.begin()
obj._v_deleted = True
return tid return tid
# deleted reports whether obj was deleted via commitDelete.
def deleted(obj): # -> bool
return getattr(obj, '_v_deleted', False)
# undelete forces recreation for obj that was previously deleted via commitDelete.
def undelete(obj):
obj._p_changed = True
del obj._v_deleted
# ztreetxt returns text representation of a ZODB tree. # ztreetxt returns text representation of a ZODB tree.
@func(ZCtx) @func(ZCtx)
......
...@@ -305,24 +305,42 @@ func (δBtail *ΔBtail) rebuildAll() (err error) { ...@@ -305,24 +305,42 @@ func (δBtail *ΔBtail) rebuildAll() (err error) {
defer xerr.Context(&err, "ΔBtail rebuildAll") defer xerr.Context(&err, "ΔBtail rebuildAll")
// XXX locking // XXX locking
trackNewRoots := δBtail.trackNewRoots tracefΔBtail("\nRebuildAll @%s..@%s trackNewRoots: %s\n", δBtail.Tail(), δBtail.Head(), δBtail.trackNewRoots)
tracefΔBtail("\nRebuildAll @%s..@%s trackNewRoots: %s\n", δBtail.Tail(), δBtail.Head(), trackNewRoots) for root := range δBtail.trackNewRoots {
delete(δBtail.trackNewRoots, root)
δBtail.rebuild1(root)
}
for root := range trackNewRoots { return nil
δTtail := δBtail.vδTbyRoot[root] // must be there }
δtrackSet, δrevSet, err := δTtail.rebuild(root, δBtail.δZtail, δBtail.db)
if err != nil { // rebuild1IfNeeded rebuilds ΔBtail for single root if that root needs rebuilding.
return err func (δBtail *ΔBtail) rebuild1IfNeeded(root zodb.Oid) error {
} // XXX locking
δBtail.trackSet.UnionInplace(δtrackSet) _, ok := δBtail.trackNewRoots[root]
δBtail.vδBroots_Update(root, δrevSet) if !ok {
return nil
} }
δBtail.trackNewRoots = setOid{} delete(δBtail.trackNewRoots, root)
return δBtail.rebuild1(root)
}
// rebuild1 rebuilds ΔBtail for single root.
func (δBtail *ΔBtail) rebuild1(root zodb.Oid) error {
// XXX locking
δTtail := δBtail.vδTbyRoot[root] // must be there
δtrackSet, δrevSet, err := δTtail.rebuild(root, δBtail.δZtail, δBtail.db)
if err != nil {
return err
}
δBtail.trackSet.UnionInplace(δtrackSet)
δBtail.vδBroots_Update(root, δrevSet)
return nil return nil
} }
// rebuild rebuilds ΔTtail taking trackNew requests into account. // rebuild rebuilds ΔTtail taking trackNew requests into account.
// //
// It returns: // It returns:
...@@ -333,7 +351,7 @@ func (δBtail *ΔBtail) rebuildAll() (err error) { ...@@ -333,7 +351,7 @@ func (δBtail *ΔBtail) rebuildAll() (err error) {
// //
// XXX place // XXX place
func (δTtail *ΔTtail) rebuild(root zodb.Oid, δZtail *zodb.ΔTail, db *zodb.DB) (δtrackSet blib.PPTreeSubSet, δrevSet setTid, err error) { func (δTtail *ΔTtail) rebuild(root zodb.Oid, δZtail *zodb.ΔTail, db *zodb.DB) (δtrackSet blib.PPTreeSubSet, δrevSet setTid, err error) {
defer xerr.Context(&err, "ΔTtail rebuild") defer xerr.Contextf(&err, "ΔTtail<%s> rebuild", root)
// XXX locking // XXX locking
tracefΔBtail("\nRebuild %s @%s .. @%s\n", root, δZtail.Tail(), δZtail.Head()) tracefΔBtail("\nRebuild %s @%s .. @%s\n", root, δZtail.Tail(), δZtail.Head())
...@@ -732,6 +750,7 @@ func (δBtail *ΔBtail) ForgetPast(revCut zodb.Tid) { ...@@ -732,6 +750,7 @@ func (δBtail *ΔBtail) ForgetPast(revCut zodb.Tid) {
} }
func (δTtail *ΔTtail) forgetPast(revCut zodb.Tid) { func (δTtail *ΔTtail) forgetPast(revCut zodb.Tid) {
// XXX locking
// XXX lastRevOf // XXX lastRevOf
icut := 0 icut := 0
...@@ -760,13 +779,18 @@ func (δBtail *ΔBtail) GetAt(ctx context.Context, root *Tree, key Key, at zodb. ...@@ -760,13 +779,18 @@ func (δBtail *ΔBtail) GetAt(ctx context.Context, root *Tree, key Key, at zodb.
// XXX key not tracked -> panic // XXX key not tracked -> panic
// XXX at not ∈ (tail, head] -> panic // XXX at not ∈ (tail, head] -> panic
// XXX dirty -> rebuild // XXX locking
rootAt := root.PJar().At() rootAt := root.PJar().At()
if rootAt != δBtail.Head() { if rootAt != δBtail.Head() {
panicf("δBtail: root.at (@%s) != head (@%s)", rootAt, δBtail.Head()) panicf("δBtail: root.at (@%s) != head (@%s)", rootAt, δBtail.Head())
} }
err = δBtail.rebuild1IfNeeded(root.POid())
if err != nil {
return
}
δTtail := δBtail.vδTbyRoot[root.POid()] δTtail := δBtail.vδTbyRoot[root.POid()]
if δTtail == nil { if δTtail == nil {
panicf("δBtail: root<%s> not tracked", root.POid()) panicf("δBtail: root<%s> not tracked", root.POid())
...@@ -805,6 +829,9 @@ func (δBtail *ΔBtail) GetAt(ctx context.Context, root *Tree, key Key, at zodb. ...@@ -805,6 +829,9 @@ func (δBtail *ΔBtail) GetAt(ctx context.Context, root *Tree, key Key, at zodb.
// @tail[key] is not present - key was not changing in (tail, head]. // @tail[key] is not present - key was not changing in (tail, head].
// since at ∈ (tail, head] we can use @head[key] as the result // since at ∈ (tail, head] we can use @head[key] as the result
xvalue, ok, err := root.Get(ctx, key) xvalue, ok, err := root.Get(ctx, key)
if !ok {
value = VDEL
}
if err != nil || !ok { if err != nil || !ok {
return return
} }
...@@ -837,7 +864,12 @@ func (δBtail *ΔBtail) GetAt(ctx context.Context, root *Tree, key Key, at zodb. ...@@ -837,7 +864,12 @@ func (δBtail *ΔBtail) GetAt(ctx context.Context, root *Tree, key Key, at zodb.
func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) /*readonly*/[]ΔTree { func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) /*readonly*/[]ΔTree {
xtail.AssertSlice(δBtail, lo, hi) xtail.AssertSlice(δBtail, lo, hi)
// XXX locking // XXX locking
// XXX rebuild
err := δBtail.rebuild1IfNeeded(root)
if err != nil {
panic(err) // XXX
}
δTtail, ok := δBtail.vδTbyRoot[root] δTtail, ok := δBtail.vδTbyRoot[root]
if !ok { if !ok {
return []ΔTree{} return []ΔTree{}
...@@ -874,6 +906,11 @@ func (δBtail *ΔBtail) ΔZtail() /*readonly*/*zodb.ΔTail { ...@@ -874,6 +906,11 @@ func (δBtail *ΔBtail) ΔZtail() /*readonly*/*zodb.ΔTail {
return δBtail.δZtail return δBtail.δZtail
} }
// DB returns database handle that δBtail is using to access ZODB.
func (δBtail *ΔBtail) DB() *zodb.DB {
return δBtail.db
}
func tracefΔBtail(format string, argv ...interface{}) { func tracefΔBtail(format string, argv ...interface{}) {
if traceΔBtail { if traceΔBtail {
......
...@@ -1260,17 +1260,6 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) { ...@@ -1260,17 +1260,6 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
t.Errorf("%s:\nhave: %s\nwant: %s", subj, have, want) t.Errorf("%s:\nhave: %s\nwant: %s", subj, have, want)
} }
// zblkByName returns oid of ZBlk that has .Name == name
zblkByName := func(name string) zodb.Oid {
for oid, zblki := range t0.ZBlkTab {
if zblki.Name == name {
return oid
}
}
panicf("ZBlk<%q> not found", name)
return zodb.InvalidOid // XXX should be not needed
}
s00 := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At) s00 := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At)
s01 := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At) s01 := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At)
s02 := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At) s02 := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At)
...@@ -1286,8 +1275,8 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) { ...@@ -1286,8 +1275,8 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
assertvδT("t2.s22", s22) assertvδT("t2.s22", s22)
// sXX should be all aliased to vδT // sXX should be all aliased to vδT
gg := zblkByName("g") gg, _ := t0.XGetBlkByName("g")
hh := zblkByName("h") hh, _ := t0.XGetBlkByName("h")
vδT[0].Rev = t0.At; δkv0 := vδT[0].ΔKV; vδT[0].ΔKV = map[Key]ΔValue{11:{gg,gg}} vδT[0].Rev = t0.At; δkv0 := vδT[0].ΔKV; vδT[0].ΔKV = map[Key]ΔValue{11:{gg,gg}}
vδT[1].Rev = t0.At; δkv1 := vδT[1].ΔKV; vδT[1].ΔKV = map[Key]ΔValue{12:{hh,hh}} vδT[1].Rev = t0.At; δkv1 := vδT[1].ΔKV; vδT[1].ΔKV = map[Key]ΔValue{12:{hh,hh}}
assertvδT("t2.vδT*", vδT, ΔT{t0.At, δ{11:{g,g}}}, ΔT{t0.At, δ{12:{h,h}}}) assertvδT("t2.vδT*", vδT, ΔT{t0.At, δ{11:{g,g}}}, ΔT{t0.At, δ{12:{h,h}}})
...@@ -1331,8 +1320,8 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) { ...@@ -1331,8 +1320,8 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
assertvδT("t12.s22_", s22_) assertvδT("t12.s22_", s22_)
// sXX_ should be all aliased to vδT, but not sXX // sXX_ should be all aliased to vδT, but not sXX
bb := zblkByName("b") bb, _ := t0.XGetBlkByName("b")
cc := zblkByName("c") cc, _ := t0.XGetBlkByName("c")
vδT[0].Rev = t0.At; δkv0 = vδT[0].ΔKV; vδT[0].ΔKV = map[Key]ΔValue{111:{bb,bb}} vδT[0].Rev = t0.At; δkv0 = vδT[0].ΔKV; vδT[0].ΔKV = map[Key]ΔValue{111:{bb,bb}}
vδT[1].Rev = t0.At; δkv1 = vδT[1].ΔKV; vδT[1].ΔKV = map[Key]ΔValue{112:{cc,cc}} vδT[1].Rev = t0.At; δkv1 = vδT[1].ΔKV; vδT[1].ΔKV = map[Key]ΔValue{112:{cc,cc}}
assertvδT("t12.vδT*", vδT, ΔT{t0.At, δ{111:{b,b}}}, ΔT{t0.At, δ{112:{c,c}}}) assertvδT("t12.vδT*", vδT, ΔT{t0.At, δ{111:{b,b}}}, ΔT{t0.At, δ{112:{c,c}}})
......
...@@ -24,8 +24,10 @@ import ( ...@@ -24,8 +24,10 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"reflect"
"lab.nexedi.com/kirr/go123/xcontext" "lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/transaction" "lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
...@@ -82,6 +84,33 @@ func ZOpen(ctx context.Context, zdb *zodb.DB, zopt *zodb.ConnOptions) (_ *ZConn, ...@@ -82,6 +84,33 @@ func ZOpen(ctx context.Context, zdb *zodb.DB, zopt *zodb.ConnOptions) (_ *ZConn,
}, nil }, nil
} }
// ZGetOrNil returns zconn.Get(oid), or (nil,ok) if the object does not exist.
func ZGetOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (_ zodb.IPersistent, err error) {
defer xerr.Contextf(&err, "zget %s@%s", oid, zconn.At())
obj, err := zconn.Get(ctx, oid)
if err != nil {
if IsErrNoData(err) {
err = nil
}
return nil, err
}
// activate the object to find out it really exists
// after removal on storage, the object might have stayed in Connection
// cache due to e.g. PCachePinObject, and it will be PActivate that
// will return "deleted" error.
err = obj.PActivate(ctx)
if err != nil {
if IsErrNoData(err) {
return nil, nil
}
return nil, err
}
obj.PDeactivate()
return obj, nil
}
// IsErrNoData returns whether err is due to NoDataError or NoObjectError. // IsErrNoData returns whether err is due to NoDataError or NoObjectError.
func IsErrNoData(err error) bool { func IsErrNoData(err error) bool {
var eNoData *zodb.NoDataError var eNoData *zodb.NoDataError
...@@ -96,3 +125,12 @@ func IsErrNoData(err error) bool { ...@@ -96,3 +125,12 @@ func IsErrNoData(err error) bool {
return false return false
} }
} }
// XidOf return string representation of object xid.
func XidOf(obj zodb.IPersistent) string {
if obj == nil || reflect.ValueOf(obj).IsNil() {
return "ø"
}
xid := zodb.Xid{At: obj.PJar().At(), Oid: obj.POid()}
return xid.String()
}
This diff is collapsed.
This diff is collapsed.
...@@ -882,28 +882,40 @@ retry: ...@@ -882,28 +882,40 @@ retry:
sort.Slice(blkv, func(i, j int) bool { sort.Slice(blkv, func(i, j int) bool {
return blkv[i] < blkv[j] return blkv[i] < blkv[j]
}) })
size := " " flags := ""
if δfile.Size { if δfile.Size {
size = "S" flags += "S"
} }
log.Infof("S: \t- %s\t%s %v\n", foid, size, blkv) if δfile.Epoch {
flags += "E"
}
log.Infof("S: \t- %s\t%2s %v\n", foid, flags, blkv)
} }
log.Infof("\n\n") log.Infof("\n\n")
} }
// invalidate kernel cache for file data
wg := xsync.NewWorkGroup(ctx) wg := xsync.NewWorkGroup(ctx)
for foid, δfile := range δF.ByFile { for foid, δfile := range δF.ByFile {
// // XXX needed?
// // XXX even though δBtail is complete, not all ZBlk are present here
// file.δtail.Append(δF.Rev, δfile.Blocks.Elements())
// file was requested to be tracked -> it must be present in fileTab // file was requested to be tracked -> it must be present in fileTab
file := bfdir.fileTab[foid] file := bfdir.fileTab[foid]
for blk := range δfile.Blocks {
blk := blk if δfile.Epoch {
wg.Go(func(ctx context.Context) error { // XXX while invalidating whole file at epoch is easy,
return file.invalidateBlk(ctx, blk) // it becomes not so easy to handle isolation if epochs
}) // could be present. For this reason we forbid changes
// to ZBigFile objects for now.
return fmt.Errorf("ZBigFile<%s> changed @%s", foid, δF.Rev)
// wg.Go(func(ctx context.Context) error {
// return file.invalidateAll() // NOTE does not accept ctx
// })
} else {
for blk := range δfile.Blocks {
blk := blk
wg.Go(func(ctx context.Context) error {
return file.invalidateBlk(ctx, blk)
})
}
} }
} }
err = wg.Wait() err = wg.Wait()
...@@ -1112,6 +1124,21 @@ func (f *BigFile) invalidateAttr() (err error) { ...@@ -1112,6 +1124,21 @@ func (f *BigFile) invalidateAttr() (err error) {
return nil return nil
} }
// invalidateAll invalidates file attributes and all file data in kernel cache.
//
// complements invalidateAttr and invalidateBlk and is used to completely reset
// kernel file cache on ΔFtail epoch.
// called with zheadMu wlocked.
func (f *BigFile) invalidateAll() (err error) {
defer xerr.Contextf(&err, "%s: invalidate all", f.path())
fsconn := gfsconn
st := fsconn.FileNotify(f.Inode(), 0, -1) // metadata + all data
if st != fuse.OK {
return syscall.Errno(st)
}
return nil
}
// lockRevFile makes sure inode ID of /@<rev>/bigfile/<fid> is known to kernel // lockRevFile makes sure inode ID of /@<rev>/bigfile/<fid> is known to kernel
// and won't change until unlock. // and won't change until unlock.
...@@ -1291,7 +1318,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro ...@@ -1291,7 +1318,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
// and thus would trigger DB access again. // and thus would trigger DB access again.
// //
// TODO if direct-io: don't touch pagecache // TODO if direct-io: don't touch pagecache
// TODO upload parts only not covered by currrent read (not to e.g. wait for page lock) // TODO upload parts only not covered by current read (not to e.g. wait for page lock)
// TODO skip upload completely if read is wide to cover whole blksize // TODO skip upload completely if read is wide to cover whole blksize
go f.uploadBlk(blk, loading) go f.uploadBlk(blk, loading)
...@@ -1681,7 +1708,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1681,7 +1708,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// rlocked during pin setup. // rlocked during pin setup.
// //
// δ δ // δ δ
// ----x----.------------]----x---- // ────x────.────────────]────x────
// ↑ ↑ // ↑ ↑
// w.at head // w.at head
// //
...@@ -1700,6 +1727,21 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1700,6 +1727,21 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
δFtail := bfdir.δFtail δFtail := bfdir.δFtail
for _, δfile := range δFtail.SliceByFileRev(f.zfile, at, headAt) { // XXX locking δFtail for _, δfile := range δFtail.SliceByFileRev(f.zfile, at, headAt) { // XXX locking δFtail
if δfile.Epoch {
// file epochs are currently forbidden (see watcher), so the only
// case when we could see an epoch here is creation of
// the file if w.at is before that time:
//
// create file
// ────.────────x────────]────
// ↑ ↑
// w.at head
//
// but then the file should not be normally accessed in that case.
//
// -> reject such watches with an error
return fmt.Errorf("file epoch detected @%s in between (at,head=@%s]", δfile.Rev, headAt)
}
for blk := range δfile.Blocks { for blk := range δfile.Blocks {
_, already := toPin[blk] _, already := toPin[blk]
if already { if already {
...@@ -1714,7 +1756,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1714,7 +1756,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// XXX adjust wcfs tests to not require only accessed // XXX adjust wcfs tests to not require only accessed
// blocks to be in setup pins? But that would mean that // blocks to be in setup pins? But that would mean that
// potentially more blocks would be potentially // potentially more blocks would be potentially
// _unneccessarily_ pinned if they are not going to be // _unnecessarily_ pinned if they are not going to be
// accessed at all. // accessed at all.
if !f.accessed.Has(blk) { if !f.accessed.Has(blk) {
continue continue
...@@ -2088,7 +2130,7 @@ func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) { ...@@ -2088,7 +2130,7 @@ func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) {
root.revMu.Unlock() root.revMu.Unlock()
if already { if already {
// XXX race wrt simlutaneous "FORGET @<rev>" ? // XXX race wrt simultaneous "FORGET @<rev>" ?
return revDir, nil return revDir, nil
} }
...@@ -2533,7 +2575,7 @@ func _main() (err error) { ...@@ -2533,7 +2575,7 @@ func _main() (err error) {
} }
// wait for unmount // wait for unmount
// XXX the kernel does not sentd FORGETs on unmount - release left node resources ourselves? // XXX the kernel does not send FORGETs on unmount - release left node resources ourselves?
<-serveCtx.Done() <-serveCtx.Done()
log.Infof("stop %q %q", mntpt, zurl) log.Infof("stop %q %q", mntpt, zurl)
return nil // XXX serveErr | zwatchErr ? return nil // XXX serveErr | zwatchErr ?
......
...@@ -1132,7 +1132,8 @@ def _expectPin(twlink, ctx, zf, expect): # -> []SrvReq ...@@ -1132,7 +1132,8 @@ def _expectPin(twlink, ctx, zf, expect): # -> []SrvReq
# _blkDataAt returns expected zf[blk] data and its revision as of @at database state. # _blkDataAt returns expected zf[blk] data and its revision as of @at database state.
# #
# If the block is hole - (b'', at0) is returned. XXX -> @z64? # If the block is hole - (b'', at0) is returned. XXX -> @z64?
# XXX ret for when the file did not existed at all? blk was after file size? # XXX ret for when the file did not existed at all?
# XXX ret ----//---- blk was after file size?
@func(tDB) @func(tDB)
def _blkDataAt(t, zf, blk, at): # -> (data, rev) def _blkDataAt(t, zf, blk, at): # -> (data, rev)
if at is None: if at is None:
...@@ -1417,7 +1418,7 @@ def test_wcfs_watch_robust(): ...@@ -1417,7 +1418,7 @@ def test_wcfs_watch_robust():
wl.close() wl.close()
# verify that `watch file @at` -> error, for @at when file did not existed. # verify that `watch file @at` -> error, for @at when file did not existed.
@xfail # check that file exists @at @xfail # check that file exists @at XXX
@func @func
def test_wcfs_watch_before_create(): def test_wcfs_watch_before_create():
t = tDB(); zf = t.zfile t = tDB(); zf = t.zfile
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment