Commit 0853cc9f authored by Kirill Smelkov's avatar Kirill Smelkov

X ΔFtail + tests

- Reimplement ΔFtail queries via gluing ΔBtail and ΔZtail data on the fly.
  This helps to avoid implementing complex rebuild logic in ΔFtail.
  The only place that needs to have that complexity is now ΔBtail, and there it
  already works draftly.

- Add ΔFtail tests.

- Add notion of epochs to ΔFtail. Epochs correspond to ZBigFile objects changes
  (creation and deletion). Unfortunately handling ZBigFile object changes
  turned out to be necessary to keep wcfs tests in passing state.

- Move common testing infrastructure - that is used by both ΔBtail and ΔFtail - to xbtreetest package.

- Add tests for ΔBtail.SliceByRootRev aliasing

- Lazy rebuild is now on

- ΔBtail.GetAt reworked

...

* t2: (112 commits)
  X wcfs: v↑ NEO/go (checkpoint)
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  X ΔFtail: Rebuild vδE after first track
  .
  .
  .
  .
  .
  .
  .
  .
  ...
parents f91982af d13f11ca
......@@ -10,7 +10,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
lab.nexedi.com/kirr/go123 v0.0.0-20210302025843-863c4602a230
lab.nexedi.com/kirr/neo/go v0.0.0-20210524152903-d02d65559752
lab.nexedi.com/kirr/neo/go v0.0.0-20210720105030-d99bf118d61a
)
// we use kirr/go-fuse@y/nodefs-cancel
......
......@@ -197,3 +197,5 @@ lab.nexedi.com/kirr/neo/go v0.0.0-20210503113049-7fba56df234c h1:+M4xtOKZqy7oC6L
lab.nexedi.com/kirr/neo/go v0.0.0-20210503113049-7fba56df234c/go.mod h1:llI3hcJJMACe+rYuXUfS5dljjwIrlBMfJ1ZeRcey96A=
lab.nexedi.com/kirr/neo/go v0.0.0-20210524152903-d02d65559752 h1:knRAqs0xLytZrxWHkCccg9xyAbAgzGFnyHE2rdg7onI=
lab.nexedi.com/kirr/neo/go v0.0.0-20210524152903-d02d65559752/go.mod h1:llI3hcJJMACe+rYuXUfS5dljjwIrlBMfJ1ZeRcey96A=
lab.nexedi.com/kirr/neo/go v0.0.0-20210720105030-d99bf118d61a h1:ex8P5oGhvDDp4y3HSIwGfWx++waqU9dKnrAkITMeWQs=
lab.nexedi.com/kirr/neo/go v0.0.0-20210720105030-d99bf118d61a/go.mod h1:llI3hcJJMACe+rYuXUfS5dljjwIrlBMfJ1ZeRcey96A=
......@@ -453,8 +453,8 @@ func diffX(ctx context.Context, a, b Node, δZTC setOid, trackSet blib.PPTreeSub
// a, b point to top of subtrees @old and @new revisions.
// δZTC is connected set of objects covering δZT (objects changed in this tree in old..new).
func diffT(ctx context.Context, A, B *Tree, δZTC setOid, trackSet blib.PPTreeSubSet) (δ map[Key]ΔValue, δtrack *blib.ΔPPTreeSubSet, δtkeycov *blib.RangedKeySet, err error) {
tracefDiff(" diffT %s %s\n", xidOf(A), xidOf(B))
defer xerr.Contextf(&err, "diffT %s %s", xidOf(A), xidOf(B))
tracefDiff(" diffT %s %s\n", xzodb.XidOf(A), xzodb.XidOf(B))
defer xerr.Contextf(&err, "diffT %s %s", xzodb.XidOf(A), xzodb.XidOf(B))
δ = map[Key]ΔValue{}
δtrack = blib.NewΔPPTreeSubSet()
......@@ -887,8 +887,8 @@ func δMerge(δ, δ2 map[Key]ΔValue) error {
// diffB computes difference in between two buckets.
// see diffX for details.
func diffB(ctx context.Context, a, b *Bucket) (δ map[Key]ΔValue, err error) {
tracefDiff(" diffB %s %s\n", xidOf(a), xidOf(b))
defer xerr.Contextf(&err, "diffB %s %s", xidOf(a), xidOf(b))
tracefDiff(" diffB %s %s\n", xzodb.XidOf(a), xzodb.XidOf(b))
defer xerr.Contextf(&err, "diffB %s %s", xzodb.XidOf(a), xzodb.XidOf(b))
// XXX oid can be InvalidOid for T/B... (i.e. B is part of T and is not yet committed separately)
var av []BucketEntry
......@@ -952,13 +952,10 @@ func diffB(ctx context.Context, a, b *Bucket) (δ map[Key]ΔValue, err error) {
// zgetNodeOrNil returns btree node corresponding to zconn.Get(oid) .
// if the node does not exist, (nil, ok) is returned.
func zgetNodeOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (_ Node, err error) {
func zgetNodeOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (node Node, err error) {
defer xerr.Contextf(&err, "getnode %s@%s", oid, zconn.At())
xnode, err := zconn.Get(ctx, oid)
if err != nil {
if xzodb.IsErrNoData(err) {
err = nil
}
xnode, err := xzodb.ZGetOrNil(ctx, zconn, oid)
if xnode == nil || err != nil {
return nil, err
}
......@@ -966,20 +963,6 @@ func zgetNodeOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (_
if !ok {
return nil, fmt.Errorf("unexpected type: %s", zodb.ClassOf(xnode))
}
// activate the node to find out it really exists
// after removal on storage, the object might have stayed in Connection
// cache due to e.g. PCachePinObject, and it will be PActivate that
// will return "deleted" error.
err = node.PActivate(ctx)
if err != nil {
if xzodb.IsErrNoData(err) {
return nil, nil
}
return nil, err
}
node.PDeactivate()
return node, nil
}
......@@ -993,15 +976,6 @@ func vOid(xvalue interface{}) (zodb.Oid, error) {
return value.POid(), nil
}
// xidOf return string representation of object xid.
func xidOf(obj zodb.IPersistent) string {
if obj == nil || reflect.ValueOf(obj).IsNil() {
return "ø"
}
xid := zodb.Xid{At: obj.PJar().At(), Oid: obj.POid()}
return xid.String()
}
func (rn *nodeInRange) String() string {
done := " "; if rn.done { done = "*" }
return fmt.Sprintf("%s%s%s", done, rn.keycov, vnode(rn.node))
......
......@@ -26,13 +26,13 @@ import (
"strings"
)
// kvdiff returns difference in between kv1 and kv2.
// KVDiff returns difference in between kv1 and kv2.
const DEL = "ø" // DEL means deletion
type Δstring struct {
Old string
New string
}
func kvdiff(kv1, kv2 map[Key]string) map[Key]Δstring {
func KVDiff(kv1, kv2 map[Key]string) map[Key]Δstring {
delta := map[Key]Δstring{}
keys := setKey{}
for k := range kv1 { keys.Add(k) }
......@@ -51,8 +51,8 @@ func kvdiff(kv1, kv2 map[Key]string) map[Key]Δstring {
return delta
}
// kvtxt returns string representation of {} kv.
func kvtxt(kv map[Key]string) string {
// KVTxt returns string representation of {} kv.
func KVTxt(kv map[Key]string) string {
if len(kv) == 0 {
return "ø"
}
......
......@@ -27,7 +27,7 @@ import (
func TestKVDiff(t *testing.T) {
kv1 := map[Key]string{1:"a", 3:"c", 4:"d"}
kv2 := map[Key]string{1:"b", 4:"d", 5:"e"}
got := kvdiff(kv1, kv2)
got := KVDiff(kv1, kv2)
want := map[Key]Δstring{1:{"a","b"}, 3:{"c",DEL}, 5:{DEL,"e"}}
if !reflect.DeepEqual(got, want) {
t.Fatalf("error:\ngot: %v\nwant: %v", got, want)
......@@ -36,7 +36,7 @@ func TestKVDiff(t *testing.T) {
func TestKVTxt(t *testing.T) {
kv := map[Key]string{3:"hello", 1:"zzz", 4:"world"}
got := kvtxt(kv)
got := KVTxt(kv)
want := "1:zzz,3:hello,4:world"
if got != want {
t.Fatalf("error:\ngot: %q\nwant: %q", got, want)
......
......@@ -104,5 +104,5 @@ func (xkv RBucketSet) Flatten() map[Key]string {
}
func (b *RBucket) String() string {
return fmt.Sprintf("%sB%s{%s}", b.Keycov, b.Oid, kvtxt(b.KV))
return fmt.Sprintf("%sB%s{%s}", b.Keycov, b.Oid, KVTxt(b.KV))
}
// Copyright (C) 2020-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package xbtreetest
// testing-related support
import (
"flag"
"math/rand"
"testing"
"time"
)
var (
verylongFlag = flag.Bool("verylong", false, `switch tests to run in "very long" mode`)
randseedFlag = flag.Int64("randseed", -1, `seed for random number generator`)
)
// N returns short, medium, or long depending on whether tests were ran with
// -short, -verylong, or normally.
func N(short, medium, long int) int {
// -short
if testing.Short() {
return short
}
// -verylong
if *verylongFlag {
return long
}
// default
return medium
}
// NewRand returns new random-number generator and seed that was used to initialize it.
//
// The seed can be controlled via -randseed optiong.
func NewRand() (rng *rand.Rand, seed int64) {
seed = *randseedFlag
if seed == -1 {
seed = time.Now().UnixNano()
}
rng = rand.New(rand.NewSource(seed))
return rng, seed
}
......@@ -35,7 +35,7 @@ import (
// T is tree-based testing environment.
//
// It combines TreeSrv and client side access to ZODB with committed trees.
// It should be created it via NewT().
// It should be created it NewT().
type T struct {
*testing.T
......@@ -56,8 +56,13 @@ type Commit struct {
ΔZ *zodb.EventCommit // raw ZODB changes; δZ.tid == at
Xkv RBucketSet // full tree state as of @at
Δxkv map[Key]Δstring // full tree-diff against parent
zblkDataTab map[zodb.Oid]string // full snapshot of all ZBlk data @at
// δzblkData map[zodb.Oid]Δstring // full diff for zblkData against parent XXX ?
ZBlkTab map[zodb.Oid]ZBlkInfo // full snapshot of all ZBlk name/data @at
}
// ZBlkInfo describes one ZBlk object.
type ZBlkInfo struct {
Name string // this ZBlk comes under root['treegen/values'][Name]
Data string
}
// NewT creates new T.
......@@ -97,7 +102,7 @@ func NewT(t *testing.T) *T {
Prev: nil,
At: head,
Xkv: xGetTree(tt.DB, head, tt.Root()),
zblkDataTab: xGetBlkDataTab(tt.DB, head),
ZBlkTab: xGetBlkTab(tt.DB, head),
ΔZ: nil,
Δxkv: nil,
}
......@@ -182,26 +187,26 @@ func (t *T) CommitTree(tree string) *Commit {
At: δZ.Tid,
ΔZ: δZ,
Xkv: xkv,
zblkDataTab: xGetBlkDataTab(t.DB, δZ.Tid),
ZBlkTab: xGetBlkTab(t.DB, δZ.Tid),
}
tprev := t.Head()
ttree.Prev = tprev
ttree.Δxkv = kvdiff(tprev.Xkv.Flatten(), ttree.Xkv.Flatten())
ttree.Δxkv = KVDiff(tprev.Xkv.Flatten(), ttree.Xkv.Flatten())
t.commitv = append(t.commitv, ttree)
return ttree
}
// xGetBlkDataTab loads all ZBlk from db@at.
// xGetBlkTab loads all ZBlk from db@at.
//
// it returns {} oid -> blkdata.
func xGetBlkDataTab(db *zodb.DB, at zodb.Tid) map[zodb.Oid]string {
defer exc.Contextf("%s: @%s: get blkdatatab", db.Storage().URL(), at)
func xGetBlkTab(db *zodb.DB, at zodb.Tid) map[zodb.Oid]ZBlkInfo {
defer exc.Contextf("%s: @%s: get blktab", db.Storage().URL(), at)
X := exc.Raiseif
blkDataTab := map[zodb.Oid]string{}
blkTab := map[zodb.Oid]ZBlkInfo{}
txn, ctx := transaction.New(context.Background())
defer txn.Abort()
......@@ -228,18 +233,23 @@ func xGetBlkDataTab(db *zodb.DB, at zodb.Tid) map[zodb.Oid]string {
err = zblkdir.PActivate(ctx); X(err)
defer zblkdir.PDeactivate()
for k, xzblk := range zblkdir.Data {
for xname, xzblk := range zblkdir.Data {
name, ok := xname.(string)
if !ok {
exc.Raisef("root['treegen/values']: key [%q]: expected str, got %T", xname, xname)
}
zblk, ok := xzblk.(zodb.IPersistent)
if !ok {
exc.Raisef("root['treegen/values'][%q]: expected %s, got %s", k, xzodb.TypeOf(zblk), xzodb.TypeOf(xzblk))
exc.Raisef("root['treegen/values'][%q]: expected IPersistent, got %s", name, xzodb.TypeOf(xzblk))
}
oid := zblk.POid()
data := xzgetBlkData(ctx, zconn, oid)
blkDataTab[oid] = data
blkTab[oid] = ZBlkInfo{name, data}
}
return blkDataTab
return blkTab
}
// XGetBlkData loads blk data for ZBlk<oid> @t.at
......@@ -249,13 +259,23 @@ func (t *Commit) XGetBlkData(oid zodb.Oid) string {
if oid == VDEL {
return DEL
}
data, ok := t.zblkDataTab[oid]
zblki, ok := t.ZBlkTab[oid]
if !ok {
exc.Raisef("getBlkData ZBlk<%s> @%s: no such ZBlk", oid, t.At)
}
return data
return zblki.Data
}
// XGetBlkByName returns ZBlk info associated with ZBlk<name>
func (t *Commit) XGetBlkByName(name string) (zodb.Oid, ZBlkInfo) {
for oid, zblki := range t.ZBlkTab {
if zblki.Name == name {
return oid, zblki
}
}
panicf("ZBlk<%q> not found", name)
return zodb.InvalidOid, ZBlkInfo{} // XXX should be not needed
}
// xGetTree loads Tree from zurl@at->obj<root>.
//
......
......@@ -69,8 +69,8 @@ type AllStructsSrv struct {
*TreeGenSrv
}
// StartTreeGenSrv spawns `treegen ...` server.
func StartTreeGenSrv(argv ...string) (_ *TreeGenSrv, hello string, err error) {
// startTreeGenSrv spawns `treegen ...` server.
func startTreeGenSrv(argv ...string) (_ *TreeGenSrv, hello string, err error) {
defer xerr.Contextf(&err, "treesrv %v: start", argv)
// spawn `treegen ...`
......@@ -125,7 +125,7 @@ func (tg *TreeGenSrv) Close() (err error) {
// StartTreeSrv spawns `treegen trees` server.
func StartTreeSrv(zurl string) (_ *TreeSrv, err error) {
defer xerr.Contextf(&err, "tree.srv %s: start", zurl)
tgSrv, hello, err := StartTreeGenSrv("trees", zurl)
tgSrv, hello, err := startTreeGenSrv("trees", zurl)
if err != nil {
return nil, err
}
......@@ -160,7 +160,7 @@ func StartTreeSrv(zurl string) (_ *TreeSrv, err error) {
func StartAllStructsSrv() (_ *AllStructsSrv, err error) {
defer xerr.Context(&err, "allstructs.srv: start")
tgSrv, hello, err := StartTreeGenSrv("allstructs")
tgSrv, hello, err := startTreeGenSrv("allstructs")
if err != nil {
return nil, err
}
......@@ -209,7 +209,7 @@ func (tg *TreeSrv) Commit(tree string) (_ zodb.Tid, err error) {
// AllStructs returns response from `treegen allstructs`
func (tg *AllStructsSrv) AllStructs(kv map[Key]string, maxdepth, maxsplit, n int, seed int64) (_ []string, err error) {
req := fmt.Sprintf("%d %d %d/%d %s", maxdepth, maxsplit, n, seed, kvtxt(kv))
req := fmt.Sprintf("%d %d %d/%d %s", maxdepth, maxsplit, n, seed, KVTxt(kv))
defer xerr.Contextf(&err, "allstructs.srv: %s ", req)
_, err = io.WriteString(tg.pyin, req + "\n")
......
......@@ -20,7 +20,7 @@
# See https://www.nexedi.com/licensing for rationale and options.
"""Program treegen provides infrastructure to generate ZODB BTree states.
It is used as helper for ΔBtail tests.
It is used as helper for ΔBtail and ΔFtail tests.
The following subcommands are provided:
......@@ -39,7 +39,7 @@ trees
-----
`treegen trees <zurl>` transitions ZODB LOBTree through requested tree states.
Tree states are specified on stdin as topology-encoded strings, 1 state per 1 line.
Tree states are specified on stdin as topology-encoded strings(+), 1 state per 1 line.
For every request the tree is changed to have specified keys, values and
topology, and modifications are committed to database. For every made commit
corresponding transaction ID is printed to stdout.
......@@ -65,6 +65,8 @@ session example:
S: 03d85dd871718899
...
XXX describe ø command
allstructs
----------
......@@ -108,9 +110,17 @@ session example:
T3/T-T/B1:a,2:b-B3:c
# ----
ΔFtail support
--------------
XXX describe øf and `t... D...` commands.
--------
(*) 300-500ms, see https://github.com/pypa/setuptools/issues/510.
(+) see wcfs/internal/xbtree.py
"""
from __future__ import print_function, absolute_import
......@@ -127,7 +137,7 @@ import random
import six
from wendelin.wcfs.internal import xbtree, xbtree_test
from wendelin.bigfile.file_zodb import ZBlk
from wendelin.bigfile.file_zodb import ZBlk, ZBigFile
from zodbtools.util import storageFromURL, ashex
from persistent import CHANGED
......@@ -197,6 +207,9 @@ def TreesSrv(zstor, r):
defer(zctx.close)
ztree = zctx.root['treegen/tree'] = LOBTree()
zfile = zctx.root['treegen/file'] = ZBigFile(blksize=4) # for ΔFtail tests
zfile.blktab = ztree
zdummy = zctx.root['treegen/dummy'] = PersistentMapping() # anything for ._p_changed=True
head = commit('treegen/tree: init')
xprint("tree.srv start @%s root=%s" % (ashex(head), ashex(ztree._p_oid)))
treetxtPrev = zctx.ztreetxt(ztree)
......@@ -210,10 +223,57 @@ def TreesSrv(zstor, r):
xprint("%s" % ashex(head))
continue
# mark tree as changed if the same topology is requested twice.
# øf command to delete the file
if treetxt == "øf":
head = commitDelete(zfile, subj)
xprint("%s" % ashex(head))
continue
# make sure we continue with undeleted ztree/zfile
if deleted(ztree):
undelete(ztree)
if deleted(zfile):
undelete(zfile)
# t... D... commands to natively commit updates to tree and values
if treetxt.startswith('t'):
t, D = treetxt.split()
assert D.startswith('D')
kv = kvDecode(t[1:], zctx.vdecode)
zv = _kvDecode(D[1:], kdecode=lambda ktxt: ktxt, vdecode=lambda vtxt: vtxt)
patch(ztree, diff(ztree, kv), kv)
# ~ patch(valdict, diff(valdict,zv)) but sets zblk.value on change
valdict = zctx.root['treegen/values']
vkeys = set(valdict.keys())
vkeys.update(zv.keys())
for k in vkeys:
zblk = valdict.get(k)
v1 = None
if zblk is not None:
v1 = zblk.loadblkdata()
v2 = zv.get(k)
if v1 != v2:
if v1 is None:
zblk = ZBlk()
valdict[k] = zblk
if v2 is not None:
zblk.setblkdata(v2)
zblk._p_changed = True
elif v2 is None:
del valdict[k]
zdummy._p_changed = True # alayws non-empty commit
head = commit(subj)
xprint("%s" % ashex(head))
continue
# everything else is considerd to be a tree topology
# mark something as changed if the same topology is requested twice.
# this ensures we can actually make a non-empty commit
if treetxt == treetxtPrev:
ztree._p_changed = True
zdummy._p_changed = True
treetxtPrev = treetxt
tree = zctx.TopoDecode(treetxt)
......@@ -342,12 +402,15 @@ def kvEncode(kvDict, vencode): # -> kvText
# kvDecode decodes key->value mapping from text.
# e.g. '1:a,2:b' -> {1:'a', 2:'b'}
def kvDecode(kvText, vdecode): # -> kvDict
if kvText == "":
return _kvDecode(kvText, int, vdecode)
def _kvDecode(kvText, kdecode, vdecode): # -> kvDict
if kvText in ("", "ø"):
return {}
kv = {}
for item in kvText.split(','):
ktxt, vtxt = item.split(':')
k = int(ktxt)
k = kdecode(ktxt)
v = vdecode(vtxt)
if k in kv:
raise ValueError("key %s present multiple times" % k)
......@@ -372,7 +435,7 @@ def diff(d1, d2): # -> [] of (k,v) to change; DEL means del[k]
# diff = [] of (k,v) to change; DEL means del[k]
def patch(d, diff, verify):
for (k,v) in diff:
if v is DEL:
if v == DEL:
del d[k]
else:
d[k] = v
......@@ -431,8 +494,18 @@ def commitDelete(obj, description): # -> tid
# reset transaction to a new one
transaction.begin()
obj._v_deleted = True
return tid
# deleted reports whether obj was deleted via commitDelete.
def deleted(obj): # -> bool
return getattr(obj, '_v_deleted', False)
# undelete forces recreation for obj that was previously deleted via commitDelete.
def undelete(obj):
obj._p_changed = True
del obj._v_deleted
# ztreetxt returns text representation of a ZODB tree.
@func(ZCtx)
......
This diff is collapsed.
This diff is collapsed.
......@@ -24,8 +24,10 @@ import (
"context"
"errors"
"fmt"
"reflect"
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb"
......@@ -82,6 +84,33 @@ func ZOpen(ctx context.Context, zdb *zodb.DB, zopt *zodb.ConnOptions) (_ *ZConn,
}, nil
}
// ZGetOrNil returns zconn.Get(oid), or (nil,ok) if the object does not exist.
func ZGetOrNil(ctx context.Context, zconn *zodb.Connection, oid zodb.Oid) (_ zodb.IPersistent, err error) {
defer xerr.Contextf(&err, "zget %s@%s", oid, zconn.At())
obj, err := zconn.Get(ctx, oid)
if err != nil {
if IsErrNoData(err) {
err = nil
}
return nil, err
}
// activate the object to find out it really exists
// after removal on storage, the object might have stayed in Connection
// cache due to e.g. PCachePinObject, and it will be PActivate that
// will return "deleted" error.
err = obj.PActivate(ctx)
if err != nil {
if IsErrNoData(err) {
return nil, nil
}
return nil, err
}
obj.PDeactivate()
return obj, nil
}
// IsErrNoData returns whether err is due to NoDataError or NoObjectError.
func IsErrNoData(err error) bool {
var eNoData *zodb.NoDataError
......@@ -96,3 +125,12 @@ func IsErrNoData(err error) bool {
return false
}
}
// XidOf return string representation of object xid.
func XidOf(obj zodb.IPersistent) string {
if obj == nil || reflect.ValueOf(obj).IsNil() {
return "ø"
}
xid := zodb.Xid{At: obj.PJar().At(), Oid: obj.POid()}
return xid.String()
}
......@@ -390,9 +390,9 @@ func (bf *zBigFileState) PySetState(pystate interface{}) (err error) {
return fmt.Errorf("blksize: must be > 0; got %d", blksize)
}
blktab, ok := t[1].(*btree.LOBTree)
if !ok {
return fmt.Errorf("blktab: expect LOBTree; got %s", xzodb.TypeOf(t[1]))
blktab, err := vBlktab(t[1])
if err != nil {
return err
}
bf.blksize = blksize
......@@ -437,9 +437,9 @@ func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, treePath
return make([]byte, bf.blksize), treePath, nil, blkRevMax, nil
}
zblk, ok = xzblk.(ZBlk)
if !ok {
return nil, nil, nil, 0, fmt.Errorf("expect ZBlk*; got %s", xzodb.TypeOf(xzblk))
zblk, err = vZBlk(xzblk)
if err != nil {
return nil, nil, nil, 0, err
}
blkdata, zblkrev, err := zblk.LoadBlkData(ctx)
......@@ -493,6 +493,23 @@ func (bf *ZBigFile) Size(ctx context.Context) (_ int64, treePath []btree.LONode,
return size, treePath, nil
}
// vZBlk checks and converts xzblk to a ZBlk object.
func vZBlk(xzblk interface{}) (ZBlk, error) {
zblk, ok := xzblk.(ZBlk)
if !ok {
return nil, fmt.Errorf("expect ZBlk*; got %s", xzodb.TypeOf(xzblk))
}
return zblk, nil
}
// vBlktab checks and converts xblktab to LOBTree object.
func vBlktab(xblktab interface{}) (*btree.LOBTree, error) {
blktab, ok := xblktab.(*btree.LOBTree)
if !ok {
return nil, fmt.Errorf("blktab: expect LOBTree; got %s", xzodb.TypeOf(xblktab))
}
return blktab, nil
}
// ----------------------------------------
......
This diff is collapsed.
This diff is collapsed.
// Copyright (C) 2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package zdata_test
import (
_ "lab.nexedi.com/nexedi/wendelin.core/wcfs/internal/xbtree/xbtreetest/init"
)
......@@ -368,7 +368,7 @@ package main
// rev(blk) ≤ rev'(blk) rev'(blk) = min(^^^)
//
//
// XXX we delay recomputing δFtail.LastBlkRev(file, #blk, head) because
// XXX we delay recomputing δFtail.BlkRevAt(file, #blk, head) because
// using just cheap revmax estimate can frequently result in all watches
// being skipped.
//
......@@ -868,7 +868,8 @@ retry:
// invalidate kernel cache for data in changed files
// NOTE no δFmu lock needed because zhead is WLocked
δF, err := bfdir.δFtail.Update(δZ, zhead) // δF <- δZ |tracked
// δF, err := bfdir.δFtail.Update(δZ, zhead) // δF <- δZ |tracked
δF, err := bfdir.δFtail.Update(δZ) // δF <- δZ |tracked
if err != nil {
return err
}
......@@ -881,23 +882,34 @@ retry:
sort.Slice(blkv, func(i, j int) bool {
return blkv[i] < blkv[j]
})
size := " "
flags := ""
if δfile.Size {
size = "S"
flags += "S"
}
log.Infof("S: \t- %s\t%s %v\n", foid, size, blkv)
if δfile.Epoch {
flags += "E"
}
log.Infof("S: \t- %s\t%2s %v\n", foid, flags, blkv)
}
log.Infof("\n\n")
}
// invalidate kernel cache for file data
wg := xsync.NewWorkGroup(ctx)
for foid, δfile := range δF.ByFile {
// // XXX needed?
// // XXX even though δBtail is complete, not all ZBlk are present here
// file.δtail.Append(δF.Rev, δfile.Blocks.Elements())
// file was requested to be tracked -> it must be present in fileTab
file := bfdir.fileTab[foid]
if δfile.Epoch {
// XXX while invalidating whole file at epoch is easy,
// it becomes not so easy to handle isolation if epochs
// could be present. For this reason we forbid changes
// to ZBigFile objects for now.
return fmt.Errorf("ZBigFile<%s> changed @%s", foid, δF.Rev)
// wg.Go(func(ctx context.Context) error {
// return file.invalidateAll() // NOTE does not accept ctx
// })
} else {
for blk := range δfile.Blocks {
blk := blk
wg.Go(func(ctx context.Context) error {
......@@ -905,6 +917,7 @@ retry:
})
}
}
}
err = wg.Wait()
if err != nil {
return err
......@@ -948,6 +961,7 @@ retry:
file := bfdir.fileTab[foid] // must be present
zfile := file.zfile
// XXX need to do only if δfile.Size changed
size, sizePath, err := zfile.Size(ctx)
if err != nil {
return err
......@@ -973,7 +987,6 @@ retry:
}
// XXX δFtail.ForgetPast(...)
// XXX for f in δF: f.δtail.ForgetPast(...)
// notify zhead.At waiters
for hw := range head.hwait {
......@@ -1074,7 +1087,7 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) {
func() {
// store retrieved data back to OS cache for file @<rev>/file[blk]
δFtail := f.head.bfdir.δFtail
blkrev, _ := δFtail.LastBlkRev(ctx, f.zfile, blk, f.head.zconn.At())
blkrev, _ := δFtail.BlkRevAt(ctx, f.zfile, blk, f.head.zconn.At())
frev, funlock, err := groot.lockRevFile(blkrev, f.zfile.POid())
if err != nil {
log.Errorf("BUG: %s: invalidate blk #%d: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, err)
......@@ -1112,6 +1125,21 @@ func (f *BigFile) invalidateAttr() (err error) {
return nil
}
// invalidateAll invalidates file attributes and all file data in kernel cache.
//
// complements invalidateAttr and invalidateBlk and is used to completely reset
// kernel file cache on ΔFtail epoch.
// called with zheadMu wlocked.
func (f *BigFile) invalidateAll() (err error) {
defer xerr.Contextf(&err, "%s: invalidate all", f.path())
fsconn := gfsconn
st := fsconn.FileNotify(f.Inode(), 0, -1) // metadata + all data
if st != fuse.OK {
return syscall.Errno(st)
}
return nil
}
// lockRevFile makes sure inode ID of /@<rev>/bigfile/<fid> is known to kernel
// and won't change until unlock.
......@@ -1291,7 +1319,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
// and thus would trigger DB access again.
//
// TODO if direct-io: don't touch pagecache
// TODO upload parts only not covered by currrent read (not to e.g. wait for page lock)
// TODO upload parts only not covered by current read (not to e.g. wait for page lock)
// TODO skip upload completely if read is wide to cover whole blksize
go f.uploadBlk(blk, loading)
......@@ -1537,7 +1565,7 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btr
// we'll relock atMu again and recheck blkrev vs w.at after.
w.atMu.RUnlock()
blkrev, _ = δFtail.LastBlkRev(ctx, f.zfile, blk, f.head.zconn.At())
blkrev, _ = δFtail.BlkRevAt(ctx, f.zfile, blk, f.head.zconn.At())
blkrevRough = false
w.atMu.RLock()
......@@ -1553,7 +1581,7 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btr
// and most of them would be on different w.at - cache of the file will
// be lost. Via pinning to particular block revision, we make sure the
// revision to pin is the same on all clients, and so file cache is shared.
pinrev, _ := δFtail.LastBlkRev(ctx, w.file.zfile, blk, w.at) // XXX move into go?
pinrev, _ := δFtail.BlkRevAt(ctx, w.file.zfile, blk, w.at) // XXX move into go?
// XXX ^^^ w.file vs f ?
//fmt.Printf("S: read #%d: watch @%s: pin -> @%s\n", blk, w.at, pinrev)
......@@ -1681,7 +1709,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// rlocked during pin setup.
//
// δ δ
// ----x----.------------]----x----
// ────x────.────────────]────x────
// ↑ ↑
// w.at head
//
......@@ -1700,6 +1728,21 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
δFtail := bfdir.δFtail
for _, δfile := range δFtail.SliceByFileRev(f.zfile, at, headAt) { // XXX locking δFtail
if δfile.Epoch {
// file epochs are currently forbidden (see watcher), so the only
// case when we could see an epoch here is creation of
// the file if w.at is before that time:
//
// create file
// ────.────────x────────]────
// ↑ ↑
// w.at head
//
// but then the file should not be normally accessed in that case.
//
// -> reject such watches with an error
return fmt.Errorf("file epoch detected @%s in between (at,head=@%s]", δfile.Rev, headAt)
}
for blk := range δfile.Blocks {
_, already := toPin[blk]
if already {
......@@ -1714,13 +1757,13 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// XXX adjust wcfs tests to not require only accessed
// blocks to be in setup pins? But that would mean that
// potentially more blocks would be potentially
// _unneccessarily_ pinned if they are not going to be
// _unnecessarily_ pinned if they are not going to be
// accessed at all.
if !f.accessed.Has(blk) {
continue
}
toPin[blk], _ = δFtail.LastBlkRev(ctx, f.zfile, blk, at) // XXX err
toPin[blk], _ = δFtail.BlkRevAt(ctx, f.zfile, blk, at) // XXX err
}
}
......@@ -2088,7 +2131,7 @@ func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) {
root.revMu.Unlock()
if already {
// XXX race wrt simlutaneous "FORGET @<rev>" ?
// XXX race wrt simultaneous "FORGET @<rev>" ?
return revDir, nil
}
......@@ -2533,7 +2576,7 @@ func _main() (err error) {
}
// wait for unmount
// XXX the kernel does not sentd FORGETs on unmount - release left node resources ourselves?
// XXX the kernel does not send FORGETs on unmount - release left node resources ourselves?
<-serveCtx.Done()
log.Infof("stop %q %q", mntpt, zurl)
return nil // XXX serveErr | zwatchErr ?
......
......@@ -1132,7 +1132,8 @@ def _expectPin(twlink, ctx, zf, expect): # -> []SrvReq
# _blkDataAt returns expected zf[blk] data and its revision as of @at database state.
#
# If the block is hole - (b'', at0) is returned. XXX -> @z64?
# XXX ret for when the file did not existed at all? blk was after file size?
# XXX ret for when the file did not existed at all?
# XXX ret ----//---- blk was after file size?
@func(tDB)
def _blkDataAt(t, zf, blk, at): # -> (data, rev)
if at is None:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment