Commit b5e4e424 authored by Kirill Smelkov's avatar Kirill Smelkov

Merge branch 't2' into t

* t2: (44 commits)
  .
  X wcfs: hook in δFtail.Forget
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  ...
parents c5341182 2ffa7d57
......@@ -183,6 +183,7 @@
// (*) see "Wcfs locking organization" in wcfs.go
// (%) see related comment in Conn.__pin1 for details.
// Handling of fork
//
// When a process calls fork, OS copies its memory and creates child process
......
......@@ -160,7 +160,7 @@ cdef extern from "<fcntl.h>" nogil:
int posix_fadvise(int fd, off_t offset, off_t len, int advice);
enum: POSIX_FADV_DONTNEED
# fadvise_dontneed teels the kernel that file<fd>[offset +len) is not needed.
# fadvise_dontneed tells the kernel that file<fd>[offset +len) is not needed.
#
# see fadvise(2) for details.
def fadvise_dontneed(int fd, off_t offset, off_t len):
......
......@@ -118,6 +118,6 @@ func TestPPTreeSubSetOps(t *testing.T) {
assert1("difference", tt.A, tt.A, Daa, S{})
assert1("difference", tt.B, tt.B, Dbb, S{})
// XXX also verify U/D properties like (A+B)\B + (A+B)\A + (A^B) == (A+B) ?
// TODO also verify U/D properties like (A+B)\B + (A+B)\A + (A^B) == (A+B) ?
}
}
......@@ -194,6 +194,8 @@ type _ΔFileTail struct {
vδE []_ΔFileEpoch // epochs (changes to ZBigFile object itself) ; nil if not yet rebuilt
rebuildJob *_RebuildJob // !nil if vδE rebuild is currently in-progress
btrackReqSet setI64 // set of blocks explicitly requested to be tracked in this file
}
// _ΔFileEpoch represent a change to ZBigFile object.
......@@ -260,7 +262,8 @@ func (δFtail *ΔFtail) Tail() zodb.Tid { return δFtail.δBtail.Tail() }
// One root can be associated with several files (each provided on different Track calls).
//
// zblk can be nil, which represents a hole.
// if zblk is nil -> blk is ignored and can be arbitrary.
// blk can be < 0, which requests not to establish file[blk] -> zblk
// association. zblk must be nil in this case.
//
// Objects in path and zblk must be with .PJar().At() == .head
func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, blkcov btree.LKeyRange, zblk ZBlk) {
......@@ -299,7 +302,11 @@ func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, bl
δftail, ok := δFtail.byFile[foid]
if !ok {
δftail = &_ΔFileTail{root: root, vδE: nil /*will need to be rebuilt to past till tail*/}
δftail = &_ΔFileTail{
root: root,
vδE: nil /*will need to be rebuilt to past till tail*/,
btrackReqSet: setI64{},
}
δFtail.byFile[foid] = δftail
δFtail.ftrackNew.Add(foid)
}
......@@ -307,10 +314,16 @@ func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, bl
// .root can change during epochs, but in between them it must be stable
panicf("BUG: zfile<%s> root mutated from %s -> %s", foid, δftail.root, root)
}
if blk >= 0 {
δftail.btrackReqSet.Add(blk)
}
// associate zblk with root, if it was not hole
if zblk != nil {
if blk < 0 {
panicf("BUG: zfile<%s>: blk=%d, but zblk != nil", foid, blk)
}
zoid := zblk.POid()
inroot, ok := δFtail.ztrackInRoot[zoid]
......@@ -333,15 +346,18 @@ func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, bl
//
// It builds vδE for that file if there is such need.
// The only case when vδE actually needs to be built is when the file just started to be tracked.
func (δFtail *ΔFtail) vδEForFile(foid zodb.Oid) (vδE []_ΔFileEpoch, headRoot zodb.Oid, err error) {
//
// It also returns δftail for convenience.
// NOTE access to returned δftail must be protected via δFtail.mu.
func (δFtail *ΔFtail) vδEForFile(foid zodb.Oid) (vδE []_ΔFileEpoch, headRoot zodb.Oid, δftail *_ΔFileTail, err error) {
δFtail.mu.Lock() // TODO verify that there is no in-progress writers
defer δFtail.mu.Unlock()
δftail := δFtail.byFile[foid]
δftail = δFtail.byFile[foid]
root := δftail.root
vδE = δftail.vδE
if vδE != nil {
return vδE, root, nil
return vδE, root, δftail, nil
}
// vδE needs to be built
......@@ -355,7 +371,7 @@ func (δFtail *ΔFtail) vδEForFile(foid zodb.Oid) (vδE []_ΔFileEpoch, headRoo
δFtail.mu.Lock()
vδE = δftail.vδE
}
return vδE, root, job.err
return vδE, root, δftail, job.err
}
// we become responsible to build vδE
......@@ -379,7 +395,7 @@ func (δFtail *ΔFtail) vδEForFile(foid zodb.Oid) (vδE []_ΔFileEpoch, headRoo
job.err = err
close(job.ready)
return vδE, root, err
return vδE, root, δftail, err
}
// _rebuildAll rebuilds vδE for all files from ftrackNew requests.
......@@ -473,6 +489,7 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) (_ ΔF, err error) {
// NOTE no need to clone vδE: we are writer, vδE is never returned to
// outside, append does not invalidate previous vδE retrievals.
δftail.vδE = append(δftail.vδE, δE)
δftail.btrackReqSet = setI64{}
}
}
......@@ -674,16 +691,32 @@ type _ZinblkOverlay struct {
//
// Note: contrary to regular go slicing, low is exclusive while high is inclusive.
func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) (/*readonly*/[]*ΔFile, error) {
return δFtail.SliceByFileRevEx(zfile, lo, hi, QueryOptions{})
}
// SliceByFileRevEx is extended version of SliceByFileRev with options.
func (δFtail *ΔFtail) SliceByFileRevEx(zfile *ZBigFile, lo, hi zodb.Tid, opt QueryOptions) (/*readonly*/[]*ΔFile, error) {
foid := zfile.POid()
//fmt.Printf("\nslice f<%s> (@%s,@%s]\n", foid, lo, hi)
vδf, err := δFtail._SliceByFileRev(foid, lo, hi)
vδf, err := δFtail._SliceByFileRev(foid, lo, hi, opt)
if err != nil {
err = fmt.Errorf("slice f<%s> (@%s,@%s]: %e", foid, lo, hi, err)
}
return vδf, err
}
func (δFtail *ΔFtail) _SliceByFileRev(foid zodb.Oid, lo, hi zodb.Tid) (/*readonly*/[]*ΔFile, error) {
// QueryOptions represents options for SliceBy* queries.
type QueryOptions struct {
// OnlyExplicitlyTracked requests that only blocks, that were
// explicitly tracked, are included into result.
//
// By default SliceBy* return information about both blocks that
// were explicitly tracked, and blocks that became tracked due to being
// adjacent to a tracked block in BTree bucket.
OnlyExplicitlyTracked bool
}
func (δFtail *ΔFtail) _SliceByFileRev(foid zodb.Oid, lo, hi zodb.Tid, opt QueryOptions) (/*readonly*/[]*ΔFile, error) {
xtail.AssertSlice(δFtail, lo, hi)
// query .δBtail.SliceByRootRev(file.blktab, lo, hi) +
......@@ -703,7 +736,7 @@ func (δFtail *ΔFtail) _SliceByFileRev(foid zodb.Oid, lo, hi zodb.Tid) (/*reado
// δFile ────────o───────o──────x─────x────────────────────────
vδE, headRoot, err := δFtail.vδEForFile(foid)
vδE, headRoot, δftail, err := δFtail.vδEForFile(foid)
if err != nil {
return nil, err
}
......@@ -926,6 +959,40 @@ func (δFtail *ΔFtail) _SliceByFileRev(foid zodb.Oid, lo, hi zodb.Tid) (/*reado
vδf[i], vδf[j] = vδf[j], vδf[i]
}
// take opt.OnlyExplicitlyTracked into account
// XXX epochs not handled (currently ok as epochs are rejected by wcfs)
if opt.OnlyExplicitlyTracked {
δblk := setI64{}
for _, δf := range vδf {
δblk.Update(δf.Blocks)
}
δFtail.mu.Lock()
for blk := range δblk {
if !δftail.btrackReqSet.Has(blk) {
δblk.Del(blk)
}
}
δFtail.mu.Unlock()
for i := len(vδf)-1; i >= 0; i-- {
δf := vδf[i]
if δf.Epoch {
continue
}
for blk := range δf.Blocks {
if !δblk.Has(blk) {
δf.Blocks.Del(blk)
}
}
if len(δf.Blocks) == 0 {
// delete @i
copy(vδf[i:], vδf[i+1:])
vδf = vδf[:len(vδf)-1]
}
}
}
return vδf, nil
}
......@@ -1017,7 +1084,7 @@ func (δFtail *ΔFtail) BlkRevAt(ctx context.Context, zfile *ZBigFile, blk int64
panicf("zconn.at out of bounds: zconn.at: @%s, (tail, head] = (@%s, @%s]", zconnAt, tail, head)
}
vδE, headRoot, err := δFtail.vδEForFile(foid)
vδE, headRoot, _, err := δFtail.vδEForFile(foid)
if err != nil {
return zodb.InvalidTid, false, err
}
......
......@@ -470,7 +470,7 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
trackZinroot := map[string]setOid{}
for zoid, inroot := range δFtail.ztrackInRoot {
zblki := commit.ZBlkTab[zoid]
trackZinroot[zblki.Name] = inroot.Clone() // XXX clone needed?
trackZinroot[zblki.Name] = inroot
}
Zinroot := map[string]setOid{}
for zblk := range Zinblk {
......@@ -494,7 +494,7 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
} else {
for zoid, inblk := range rt.ztrackInBlk {
zblki := commit.ZBlkTab[zoid]
trackZinblk[zblki.Name] = inblk.Clone() // XXX clone needed?
trackZinblk[zblki.Name] = inblk
}
}
......@@ -609,6 +609,8 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
// SliceByFileRev returns all changes to that untracked block. In other words
// we verify that no change to untracked block is missed, if any change to that
// block is ever present in returned slice.
//
// This test also verifies handling of OnlyExplicitlyTracked query option.
func TestΔFtailSliceUntrackedUniform(t_ *testing.T) {
t := newT(t_)
X := exc.Raiseif
......@@ -651,37 +653,48 @@ func TestΔFtailSliceUntrackedUniform(t_ *testing.T) {
// blktab[2] remains unnoticed because it is not changed past at1.
xtrackBlk(0)
// (at1, at4] -> changes to both 0 and 1, because they both are changed in the same bucket @at2
lo := t1.At
hi := t4.At
vδf, err := δFtail.SliceByFileRev(zfile, lo, hi); X(err)
vδf_ok := []*ΔFile{
&ΔFile{Rev: t2.At, Blocks: b(0,1), Size: true},
&ΔFile{Rev: t3.At, Blocks: b(0,1), Size: false},
&ΔFile{Rev: t4.At, Blocks: b( 1), Size: false},
}
if !reflect.DeepEqual(vδf, vδf_ok) {
t.Errorf("slice (@%s,@%s]:\nhave: %v\nwant: %v", t.AtSymb(lo), t.AtSymb(hi), t.vδfstr(vδf), t.vδfstr(vδf_ok))
// assertSliceByFileRev verifies result of SliceByFileRev and SliceByFileRevEx(OnlyExplicitlyTracked=y).
assertSliceByFileRev := func(lo, hi zodb.Tid, vδf_ok, vδfT_ok []*ΔFile) {
t.Helper()
Tonly := QueryOptions{OnlyExplicitlyTracked: true}
vδf, err := δFtail.SliceByFileRev (zfile, lo, hi); X(err)
vδfT, err := δFtail.SliceByFileRevEx(zfile, lo, hi, Tonly); X(err)
if !reflect.DeepEqual(vδf, vδf_ok) {
t.Errorf("slice (@%s,@%s]:\nhave: %v\nwant: %v", t.AtSymb(lo), t.AtSymb(hi), t.vδfstr(vδf), t.vδfstr(vδf_ok))
}
if !reflect.DeepEqual(vδfT, vδfT_ok) {
t.Errorf("sliceT (@%s,@%s]:\nhave: %v\nwant: %v", t.AtSymb(lo), t.AtSymb(hi), t.vδfstr(vδfT), t.vδfstr(vδfT_ok))
}
}
// (at1, at4] -> changes to both 0 and 1, because they both are changed in the same bucket @at2
assertSliceByFileRev(t1.At, t4.At,
/*vδf*/ []*ΔFile{
&ΔFile{Rev: t2.At, Blocks: b(0,1), Size: true},
&ΔFile{Rev: t3.At, Blocks: b(0,1), Size: false},
&ΔFile{Rev: t4.At, Blocks: b( 1), Size: false},
},
/*vδfT*/ []*ΔFile{
&ΔFile{Rev: t2.At, Blocks: b(0 ), Size: true},
&ΔFile{Rev: t3.At, Blocks: b(0 ), Size: false},
// no change @at4
})
// (at2, at4] -> changes to only 0, because there is no change to 2 via blktab
lo = t2.At
vδf, err = δFtail.SliceByFileRev(zfile, lo, hi); X(err)
vδf_ok = []*ΔFile{
&ΔFile{Rev: t3.At, Blocks: b(0), Size: false},
}
if !reflect.DeepEqual(vδf, vδf_ok) {
t.Errorf("slice (@%s,@%s]:\nhave: %v\nwant: %v", t.AtSymb(lo), t.AtSymb(hi), t.vδfstr(vδf), t.vδfstr(vδf_ok))
}
assertSliceByFileRev(t2.At, t4.At,
/*vδf*/ []*ΔFile{
&ΔFile{Rev: t3.At, Blocks: b(0), Size: false},
},
/*vδfT*/ []*ΔFile{
&ΔFile{Rev: t3.At, Blocks: b(0), Size: false},
})
// (at3, at4] -> changes to only 0, ----/----
lo = t3.At
vδf, err = δFtail.SliceByFileRev(zfile, lo, hi); X(err)
vδf_ok = []*ΔFile(nil)
if !reflect.DeepEqual(vδf, vδf_ok) {
t.Errorf("slice (@%s,@%s]:\nhave: %v\nwant: %v", t.AtSymb(lo), t.AtSymb(hi), t.vδfstr(vδf), t.vδfstr(vδf_ok))
}
assertSliceByFileRev(t3.At, t4.At,
/*vδf*/ []*ΔFile(nil),
/*vδfT*/ []*ΔFile(nil))
}
......
......@@ -57,10 +57,13 @@ Trees _and_ Buckets nodes - would be required.
-> we took the approach where we send invalidation to client about a block
lazily only when the block is actually accessed.
XXX building δFtail lazily along serving fuse reads during scope of one
transaction is not trivial and creates concurrency bottlenecks if simple
Rejected alternative:
Building δFtail lazily along serving FUSE reads during scope of one
transaction is not trivial and would create concurrency bottlenecks if simple
locking scheme is used. With the main difficulty being to populate tracking set
of δBtree lazily. However as the first approach we can still build complete
of δBtree lazily. However as the first approach we could still build complete
tracking set for a BTree at the time of file open: we need to scan through all
trees but _not_ buckets: this way we'll know oid of all tree nodes: trees _and_
buckets, while avoiding loading buckets makes this approach practical: with
......@@ -69,9 +72,11 @@ require ~ 20 trees to cover 1TB of data. And we can scan those trees very
quickly even if doing so serially. For 1PB of data it will require to scan ~
10⁴ trees. If RTT to load 1 object is ~1ms this will become 10 seconds if done
serially. However if we load all those tree objects in parallel it will be
much less. Still the number of trees to scan is linear to the amount of data
and it would be good to address the shortcoming of doing whole file index scan
later.
much less. Still the number of trees to scan is linear to the amount of data.
-> rejected: ΔFtail and ΔBtail were instead fixed to allow several Track and
queries requests to run in parallel. See "Concurrency" section in ΔFtail/ΔBtail
organization overview.
Changing mmapping while under pagefault is possible
......@@ -107,7 +112,7 @@ We can change a mapping while a page from it is under pagefault:
* https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/mm/filemap.c?id=v4.20-rc3-83-g06e68fed3282#n2457
* https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/mm/filemap.c?id=v4.20-rc3-83-g06e68fed3282#n1301
- the filesystem server upon receiving the read request can manipulate
- the filesystem server, upon receiving the read request, can manipulate
client's address space. This requires to write-lock client->mm->mmap_sem,
but we can be sure it won't deadlock because the kernel releases it
before waiting (see previous point).
......
......@@ -42,7 +42,7 @@ digraph {
zobj2file -> zblk2file;
zobj2file -> zbtree2file;
zbtree2file -> δBTree [color=grey];
zbtree2file -> δBTree;
// wcfs_simple -> Btree_read;
// wcfs_simple -> ZBlk_read;
......@@ -75,8 +75,8 @@ digraph {
wcfsInvProcess [label="process\nZODB invalidations", style=filled fillcolor=grey95]
zconnCacheGet [label="zonn.\n.Cache.Get", style=filled fillcolor=lightyellow]
zobj2file [label="Z* → file/[]#blk", style=filled fillcolor=grey95]
zblk2file [label="ZBlk*\n↓\nfile/[]#blk", style=filled fillcolor=lightyellow]
zbtree2file [label="BTree/Bucket\n↓\nfile/[]#blk"]
zblk2file [label="ZBlk*\n↓\nfile/[]#blk", style=filled fillcolor=grey95]
zbtree2file [label="BTree/Bucket\n↓\nfile/[]#blk", style=filled fillcolor=grey95]
δBTree [label="δ(BTree)", style=filled fillcolor=grey95]
fuseRetrieveCache [label="FUSE:\nretrieve cache", style=filled fillcolor=lightyellow]
......
......@@ -316,7 +316,7 @@
<!-- zblk2file -->
<g id="node22" class="node">
<title>zblk2file</title>
<ellipse fill="lightyellow" stroke="black" cx="62.23" cy="-109.48" rx="62.45" ry="37.45"/>
<ellipse fill="#f2f2f2" stroke="black" cx="62.23" cy="-109.48" rx="62.45" ry="37.45"/>
<text text-anchor="middle" x="62.23" y="-120.78" font-family="Times,serif" font-size="14.00">ZBlk*</text>
<text text-anchor="middle" x="62.23" y="-105.78" font-family="Times,serif" font-size="14.00"></text>
<text text-anchor="middle" x="62.23" y="-90.78" font-family="Times,serif" font-size="14.00">file/[]#blk</text>
......@@ -330,7 +330,7 @@
<!-- zbtree2file -->
<g id="node23" class="node">
<title>zbtree2file</title>
<ellipse fill="none" stroke="black" cx="222.23" cy="-109.48" rx="79.81" ry="37.45"/>
<ellipse fill="#f2f2f2" stroke="black" cx="222.23" cy="-109.48" rx="79.81" ry="37.45"/>
<text text-anchor="middle" x="222.23" y="-120.78" font-family="Times,serif" font-size="14.00">BTree/Bucket</text>
<text text-anchor="middle" x="222.23" y="-105.78" font-family="Times,serif" font-size="14.00"></text>
<text text-anchor="middle" x="222.23" y="-90.78" font-family="Times,serif" font-size="14.00">file/[]#blk</text>
......@@ -368,8 +368,8 @@
<!-- zbtree2file&#45;&gt;δBTree -->
<g id="edge24" class="edge">
<title>zbtree2file&#45;&gt;δBTree</title>
<path fill="none" stroke="grey" d="M222.23,-71.82C222.23,-63.33 222.23,-54.43 222.23,-46.42"/>
<polygon fill="grey" stroke="grey" points="225.73,-46.15 222.23,-36.15 218.73,-46.15 225.73,-46.15"/>
<path fill="none" stroke="black" d="M222.23,-71.82C222.23,-63.33 222.23,-54.43 222.23,-46.42"/>
<polygon fill="black" stroke="black" points="225.73,-46.15 222.23,-36.15 218.73,-46.15 225.73,-46.15"/>
</g>
<!-- clientInvHandle&#45;&gt;headWatch -->
<g id="edge29" class="edge">
......
......@@ -79,7 +79,6 @@
// @<revX>/ has the following structure:
//
// @<revX>/
// at
// bigfile/ ; bigfiles' data as of revision <revX>
// <oid(ZBigFile1)>
// <oid(ZBigFile2)>
......@@ -114,11 +113,9 @@
// The server sends "ok" reply only after head/at is ≥ requested <at>, and only
// after all initial pin/unpin messages are fully acknowledged by the client.
// The client can start to use mmapped data after it gets "ok".
// The server sends "error" reply if requested <at> is too far away back from
// head/at.
// XXX other errors are possible (e.g. "no such file", or error handling pin).
// XXX error handling pin -> then client is killed?
// XXX if not - specify that watch state is lost after error.
// The server sends "error" reply e.g. if requested <at> is too far away back
// from head/at, or on any other error.
// TODO specify watch state after error.
//
// Upon watch request, either initially, or after sending "ok", the server will be notifying the
// client about file blocks that client needs to pin in order to observe file's
......@@ -181,6 +178,10 @@
// files to @<revX>/bigfile/<bigfileX> drops to zero, and automatically
// destroys @<revX>/bigfile/<bigfileX> after reasonable timeout.
//
// The client should send "bye" before closing head/watch file:
//
// C: <2·k+1> bye
//
//
// Protection against slow or faulty clients
//
......@@ -261,7 +262,7 @@ package main
//
// The invariant helps on invalidation: when δFtail (see below) sees a
// changed oid, it is guaranteed that if the change affects block that was
// ever provided to OS, δFtail will detect that this block has changed. XXX review
// ever provided to OS, δFtail will detect that this block has changed.
// And if oid relates to a file block but is not in δFtail's tracking set -
// we know that block is not cached and will trigger ZODB load on a future
// file read.
......@@ -274,42 +275,26 @@ package main
// 4) when we receive an invalidation message from ZODB - we process it and
// propagate invalidations to OS file cache of head/bigfile/*:
//
// invalidation message: (tid↑, []oid)
//
// 4.1) zhead.cache.lookup(oid) XXX -> δFtail
// 4.2) ø: nothing to do - see invariant ^^^.
// 4.3) obj found:
// invalidation message: δZ = (tid↑, []oid)
//
// - ZBlk* -> [] of file/[]#blk
// - BTree/Bucket -> δ(BTree) -> file/[]#blk
// 4.1) δF = δFtail.Update(δZ)
//
// in the end after processing all []oid from invalidation message we have
// δFtail (see below) converts ZODB-level changes into information about
// which blocks of which files were modified and need to be invalidated:
//
// [] of file/[]#blk
// δF = (tid↑, {} file -> []#blk)
//
// that describes which file(s) parts needs to be invalidated.
// Note that δF might be not full and reflects only changes to files and
// blocks that were requested to be tracked. However because of the invariant
// δF covers in full what needs to be invalidated in the OS file cache.
//
// FIXME no - we can build it but not in full - since we consider only zobj in live cache.
// FIXME and even if we consider all δ'ed zobj, building complete set of
// file.δtail requires to first do complete scan of file.blktab
// which is prohibitively expensive.
// XXX -> we'll do the scan, but only Trees _without_ Buckets. This
// makes the scan practical until 1PB while helping to build
// initial tracking set for δFtail.
// Eager invalidation would require full scan - Trees _and_
// Buckets, which makes it prohibitively expensive - see (+).
//
// FIXME all ^^^ is outdated -> XXX δFtail
//
// 4.4) for all file/blk to invalidate we do:
// 4.2) for all file/blk to invalidate we do:
//
// - try to retrieve head/bigfile/file[blk] from OS file cache(*);
// - if retrieved successfully -> store retrieved data back into OS file
// cache for @<rev>/bigfile/file[blk], where
//
// # see below about file.δtail
// # XXX -> file.BlkRevAt(#blk, zhead.at)
// rev = max(file.δtail.by(#blk)) || min(rev ∈ file.δtail) || zhead.at
// rev = δFtail.BlkRevAt(file, #blk, zhead.at)
//
// - invalidate head/bigfile/file[blk] in OS file cache.
//
......@@ -318,30 +303,41 @@ package main
// won't be served from OS file cache and instead will trigger a FUSE read
// request to wcfs.
//
// 4.5) no invalidation messages are sent to wcfs clients at this point(+).
// 4.3) no invalidation messages are sent to wcfs clients at this point(+).
//
// 4.6) processing ZODB invalidations and serving file reads (see 7) are
// 4.4) processing ZODB invalidations and serving file reads (see 7) are
// organized to be mutually exclusive.
//
// 5.5) similarly, processing ZODB invalidations and setting up watches (see
// 7.2) are organized to be mutually exclusive.
//
// 5) after OS file cache was invalidated, we resync zhead to new database
// view corresponding to tid.
//
// 6) for every file δtail invalidation info about head/data is maintained: XXX -> δFtail
// 6) a ZBigFile-level history tail is maintained in δFtail.
//
// - tailv: [](rev↑, []#blk)
// - by: {} #blk -> []rev↑ in tail
// δFtail translates ZODB object-level changes into information about which
// blocks of which ZBigFile were modified, and provides service to query
// that information.
//
// δtail.tail describes invalidations to file we learned from ZODB invalidation.
// δtail.by allows to quickly lookup information by #blk.
// It semantically consists of
//
// min(rev) in δtail is min(@at) at which head/bigfile/file is currently watched (see below).
// []δF
//
// XXX δtail can miss ...
// where δF represents a change in files space
//
// δF:
// .rev↑
// {} file -> {}blk
//
// min(rev) in δFtail is min(@at) at which head/bigfile/file is currently watched (see below).
//
// to support initial openings with @at being slightly in the past, we also
// make sure that min(rev) is enough to cover last 10 minutes of history
// make sure that min(rev) is enough to cover last 1 minute of history
// from head/at.
//
// See ΔFtail documentation in internal/zdata/δftail.go for more details.
//
// 7) when we receive a FUSE read(#blk) request to a head/bigfile/file, we process it as follows:
//
// 7.1) load blkdata for head/bigfile/file[blk] @zhead.at .
......@@ -354,21 +350,16 @@ package main
// it is not exact because BTree/Bucket can change (e.g. rebalance)
// but still point to the same k->ZBlk.
//
// we also use file.δtail to find either exact blk revision: XXX δFtail
//
// rev(blk) = max(file.δtail.by(#blk) -> []rev↑)
//
// or another upper bound if #blk ∉ δtail:
//
// rev(blk) ≤ min(rev ∈ δtail) ; #blk ∉ δtail
// we also use δFtail to find either exact blk revision or another upper
// bound if file[blk] has no change during δFtail coverage:
//
// rev(blk) = δFtail.BlkRevAt(file, #blk, zhead.at)
//
// below rev'(blk) is min(of the estimates found):
//
// rev(blk) ≤ rev'(blk) rev'(blk) = min(^^^)
//
//
// XXX we delay recomputing δFtail.BlkRevAt(file, #blk, head) because
// Note: we delay recomputing δFtail.BlkRevAt(file, #blk, head) because
// using just cheap revmax estimate can frequently result in all watches
// being skipped.
//
......@@ -377,7 +368,7 @@ package main
// - rev'(blk) ≤ at: -> do nothing
// - rev'(blk) > at:
// - if blk ∈ watch.pinned -> do nothing
// - rev = max(δtail.by(#blk) : _ ≤ at) || min(rev ∈ δtail : rev ≤ at) || at
// - rev = δFtail.BlkRevAt(file, #blk, at)
// - watch.pin(file, #blk, @rev)
// - watch.pinned += blk
//
......@@ -416,7 +407,7 @@ package main
// transaction is maintained. For zhead, every time it is resynced (see "5")
// the transaction associated with zhead is renewed.
//
// XXX 10) gc @rev/ and @rev/bigfile/<bigfileX> automatically on atime timeout
// TODO 10) gc @rev/ and @rev/bigfile/<bigfileX> automatically on atime timeout
//
//
// (*) see notes.txt -> "Notes on OS pagecache control"
......@@ -425,6 +416,7 @@ package main
// (^) see notes.txt -> "Client cannot be ptraced while under pagefault"
// (%) no need to keep track of ZData - ZBlk1 is always marked as changed on blk data change.
// Wcfs locking organization
//
// As it was said processing ZODB invalidations (see "4") and serving file
......@@ -463,13 +455,14 @@ package main
// WatchLink.byfileMu > BigFileDir.fileMu
// WatchLink.byfileMu > Watch.atMu
// Notation used
//
// δZ - change in ZODB space
// δB - change in BTree*s* space
// δT - change in BTree(1) space
// δF - change in File*s* space
// δfile - change in File(1) space XXX -> δf ?
// δfile - change in File(1) space
//
// f - BigFile
// bfdir - BigFileDir
......@@ -491,7 +484,7 @@ import (
"sync"
"sync/atomic"
"syscall"
// "time"
"time"
log "github.com/golang/glog"
......@@ -511,23 +504,20 @@ import (
"github.com/hanwen/go-fuse/v2/fuse/nodefs"
"github.com/pkg/errors"
"lab.nexedi.com/nexedi/wendelin.core/wcfs/internal/set"
"lab.nexedi.com/nexedi/wendelin.core/wcfs/internal/xzodb"
"lab.nexedi.com/nexedi/wendelin.core/wcfs/internal/zdata"
)
// ZBigFile-related types
// XXX place
// shorthands for ZBigFile and ZBlk*
type (
ZBlk = zdata.ZBlk
ZBlk0 = zdata.ZBlk0
ZBlk1 = zdata.ZBlk1
ZData = zdata.ZData
ZBigFile = zdata.ZBigFile
setI64 = set.I64
)
// Root represents root of wcfs filesystem.
type Root struct {
fsNode
......@@ -570,6 +560,8 @@ type Head struct {
// it is also kept rlocked by OS cache uploaders (see BigFile.uploadBlk)
// with additional locking protocol to avoid deadlocks (see below for
// pauseOSCacheUpload + ...).
//
// TODO head.zheadMu -> special mutex with Lock(ctx) so that Lock wait could be canceled
zheadMu sync.RWMutex
zconn *xzodb.ZConn // for head/ zwatcher resyncs head.zconn; others only read zconn objects.
......@@ -600,8 +592,7 @@ type BigFileDir struct {
// δ tail of tracked BTree nodes of all BigFiles + -> which file
// (used only for head/, not revX/)
δFmu sync.RWMutex // zheadMu.W | zheadMu.R + δFmu.X
δFtail *zdata.ΔFtail
δFtail *zdata.ΔFtail // read/write access protected by zheadMu.{R,W}
}
// /(head|<rev>)/bigfile/<bigfileX> - served by BigFile.
......@@ -617,23 +608,12 @@ type BigFile struct {
// things read/computed from .zfile; constant during lifetime of current transaction.
// i.e. changed under zhead.W
blksize int64 // zfile.blksize
size int64 // zfile.Size()
rev zodb.Tid // last revision that modified zfile data
// XXX we can't know rev fully as some later blocks could be learnt only
// while populating δFtail lazily
// XXX or then it is not "constant during lifetime of current txn"
// // tail change history of this file.
// //
// // XXX computationally expensive to start - see "Invalidations to wcfs
// // clients are delayed ..." in notes.txt
// δtail *ΔTailI64 // [](rev↑, []#blk)
// blocks that were ever read-accessed (head/ only) XXX locking by bfdir.δFmu ?
// XXX = δFtail.Tracked(f) ?
// XXX goes away if δFtail query returns only tracked blocks
accessed setI64
blksize int64 // zfile.blksize
size int64 // zfile.Size()
revApprox zodb.Tid // approx last revision that modified zfile data
// ( we can't know rev fully as some later blocks could be learnt only
// while populating δFtail lazily. For simplicity we don't delve into
// updating revApprox during lifetime of current transaction )
// inflight loadings of ZBigFile from ZODB.
// successful load results are kept here until blkdata is put into OS pagecache.
......@@ -754,16 +734,6 @@ func (_ *zodbCacheControl) PCacheClassify(obj zodb.IPersistent) zodb.PCachePolic
return 0
}
/*
// -------- zhead lock/wait --------
// XXX needed?
// TODO head.zheadMu -> special mutex with Lock(ctx) so that Lock wait could be canceled
func (head *Head) zheadRLock() { head.zheadMu.RLock() }
func (head *Head) zheadRUnlock() { head.zheadMu.RUnlock() }
func (head *Head) zheadLock() { head.zheadMu.Lock() }
func (head *Head) zheadUnlock() { head.zheadMu.Unlock() }
*/
// -------- 4) ZODB invalidation -> OS cache --------
......@@ -800,7 +770,7 @@ func (root *Root) zwatcher(ctx context.Context, zwatchq chan zodb.Event) (err er
case zevent, ok = <-zwatchq:
if !ok {
traceZWatch("zwatchq closed")
return nil // closed XXX ok?
return nil // closed
}
}
......@@ -830,7 +800,7 @@ func (root *Root) handleδZ(ctx context.Context, δZ *zodb.EventCommit) (err err
head := root.head
// while we are invalidating OS cache, make sure that nothing, that
// even reads /head/bigfile/*, is running (see 4.6).
// even reads /head/bigfile/*, is running (see 4.4).
//
// also make sure that cache uploaders we spawned (uploadBlk) are all
// paused, or else they could overwrite OS cache with stale data.
......@@ -839,7 +809,7 @@ func (root *Root) handleδZ(ctx context.Context, δZ *zodb.EventCommit) (err err
continueOSCacheUpload := make(chan struct{})
retry:
for {
// XXX ctx cancel
// TODO ctx cancel
head.zheadMu.Lock()
head.pauseOSCacheUpload = true
head.continueOSCacheUpload = continueOSCacheUpload
......@@ -867,9 +837,7 @@ retry:
bfdir := head.bfdir
// invalidate kernel cache for data in changed files
// NOTE no δFmu lock needed because zhead is WLocked
// δF, err := bfdir.δFtail.Update(δZ, zhead) // δF <- δZ |tracked
δF, err := bfdir.δFtail.Update(δZ) // δF <- δZ |tracked
if err != nil {
return err
......@@ -944,11 +912,10 @@ retry:
}
// resync .zhead to δZ.tid
// XXX -> Head.Resync() ?
// 1. abort old and resync to new txn/at
transaction.Current(zhead.TxnCtx).Abort()
_, ctx = transaction.New(context.Background()) // XXX bg ok?
_, ctx = transaction.New(context.Background())
err = zhead.Resync(ctx, δZ.Tid)
if err != nil {
return err
......@@ -957,37 +924,53 @@ retry:
// 2. restat invalidated ZBigFile
// NOTE no lock needed since .blksize and .size are constant during lifetime of one txn.
// XXX -> parallel
for foid := range δF.ByFile {
// TODO -> parallel
for foid, δfile := range δF.ByFile {
file := bfdir.fileTab[foid] // must be present
zfile := file.zfile
// XXX need to do only if δfile.Size changed
size, sizePath, blkCov, err := zfile.Size(ctx)
if err != nil {
return err
}
if δfile.Size {
size, sizePath, blkCov, err := zfile.Size(ctx)
if err != nil {
return err
}
file.size = size
// see "3) for */head/data the following invariant is maintained..."
bfdir.δFtail.Track(zfile, -1, sizePath, blkCov, nil)
file.size = size
// see "3) for */head/data the following invariant is maintained..."
bfdir.δFtail.Track(zfile, -1, sizePath, blkCov, nil)
}
// XXX we can miss a change to file if δblk is not yet tracked
// -> need to update file.rev at read time -> locking=XXX
file.rev = zhead.At()
// NOTE we can miss a change to file if δblk is not yet tracked
// that's why revision is only approximated
file.revApprox = zhead.At()
}
// notify .wcfs/zhead
for sk := range gdebug.zheadSockTab {
_, err := fmt.Fprintf(xio.BindCtxW(sk, ctx), "%s\n", δZ.Tid)
if err != nil {
log.Errorf("%s", err) // XXX errctx + file, handle, reader pid
log.Errorf("zhead: %s: write: %s (detaching reader)", sk, err)
sk.Close()
delete(gdebug.zheadSockTab, sk)
}
}
// XXX δFtail.ForgetPast(...)
// shrink δFtail not to grow indefinitely.
// cover history for at least 1 minute, but including all watches.
// No need to lock anything because we are holding zheadMu and
// setupWatch too runs with zheadMu locked.
//
// TODO shrink δFtail only once in a while - there is no need to compute
// revCut and cut δFtail on every transaction.
revCut := zodb.TidFromTime(zhead.At().Time().Add(-1*time.Minute))
for wlink := range head.wlinkTab {
for _, w := range wlink.byfile {
if w.at < revCut {
revCut = w.at
}
}
}
bfdir.δFtail.ForgetPast(revCut)
// notify zhead.At waiters
for hw := range head.hwait {
......@@ -1016,7 +999,7 @@ func (head *Head) zheadWait(ctx context.Context, at zodb.Tid) (err error) {
panic("must be called only for head/, not @revX/")
}
// XXX check wcfs.down
// TODO check wcfs.down
// check if zhead is already ≥ at
head.zheadMu.RLock()
......@@ -1044,7 +1027,7 @@ func (head *Head) zheadWait(ctx context.Context, at zodb.Tid) (err error) {
// invalidateBlk invalidates 1 file block in kernel cache.
//
// see "4.4) for all file/blk to in invalidate we do"
// see "4.2) for all file/blk to in invalidate we do"
// called with zheadMu wlocked.
func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) {
defer xerr.Contextf(&err, "%s: invalidate blk #%d:", f.path(), blk)
......@@ -1073,7 +1056,7 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) {
blkdata = make([]byte, blksize)
n, st := fsconn.FileRetrieveCache(f.Inode(), off, blkdata)
if st != fuse.OK {
// XXX warn
log.Errorf("%s: retrieve blk #%d from cache: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, st)
}
blkdata = blkdata[:n]
}
......@@ -1168,27 +1151,7 @@ func (root *Root) lockRevFile(rev zodb.Tid, fid zodb.Oid) (_ *BigFile, unlock fu
frevpath := fmt.Sprintf("@%s/bigfile/%s", rev, fid) // relative to fs root for now
defer xerr.Contextf(&err, "/: lockRevFile %s", frevpath)
// FIXME checking for "node{0}" is fragile:
// XXX the node could be still forgotten since we are not holding open on it
// XXX -> always os.open unconditionally for now
// or is it ok since it is just a cache?
// -> no, not ok: if inode ID is forgotten, the same ID could be
// reallocated to another file and then we'll corrupt in-kernel
// cache by wrongly storing data of one file into cache of
// another file.
// -> to avoid this we need to always lock the inode ID with real open.
// XXX (also disabled for now due to race-detector)
/*
// first check without going through kernel, whether the inode maybe known already
xfrev := fsconn.LookupNode(root.Inode(), frevpath)
if xfrev != nil {
if xfrev.String() != "node{0}" {
return xfrev.Node().(*BigFile), func(){}, nil
}
}
*/
// we have to ping the kernel
// open through kernel
frevospath := gmntpt + "/" + frevpath // now starting from OS /
f, err := os.Open(frevospath)
if err != nil {
......@@ -1204,7 +1167,7 @@ func (root *Root) lockRevFile(rev zodb.Tid, fid zodb.Oid) (_ *BigFile, unlock fu
// /(head|<rev>)/bigfile/<bigfileX> -> Read serves reading bigfile data.
func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
f.head.zheadMu.RLock() // XXX +fctx to cancel
f.head.zheadMu.RLock() // TODO +fctx to cancel
defer f.head.zheadMu.RUnlock()
// cap read request to file size
......@@ -1228,10 +1191,10 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context
if re := end % f.blksize; re != 0 {
aend += f.blksize - re
}
// XXX use original dest if it can fit the data
// TODO use original dest if it can fit the data
dest = make([]byte, aend - aoff) // ~> [aoff:aend) in file
// XXX better ctx = transaction.PutIntoContext(ctx, txn)
// TODO better ctx = transaction.PutIntoContext(ctx, txn)
ctx, cancel := xcontext.Merge(fctx, f.head.zconn.TxnCtx)
defer cancel()
......@@ -1283,7 +1246,7 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
case <-loading.ready:
if loading.err == nil {
copy(dest, loading.blkdata) // XXX copy
copy(dest, loading.blkdata) // TODO avoid copy
}
return loading.err
}
......@@ -1291,6 +1254,22 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
// noone was loading - we became responsible to load this block
blkdata, treepath, blkcov, zblk, blkrevMax, err := f.zfile.LoadBlk(ctx, blk)
// head/ - update δFtail + pin watchers
if f.head.rev == 0 && err == nil {
// update δFtail index
// see "3) for */head/data the following invariant is maintained..."
δFtail := f.head.bfdir.δFtail
δFtail.Track(f.zfile, blk, treepath, blkcov, zblk)
// we have the data - it can be used after watchers are updated
// XXX should we use ctx here? (see readPinWatchers comments)
err = f.readPinWatchers(ctx, blk, blkrevMax)
if err != nil {
blkdata = nil
}
}
loading.blkdata = blkdata
loading.err = err
......@@ -1303,13 +1282,9 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
return err
}
// we have the data - it can be used after watchers are updated
// XXX should we use ctx here? (see readPinWatchers comments)
f.readPinWatchers(ctx, blk, treepath, blkcov, zblk, blkrevMax)
// data can be used now
close(loading.ready)
copy(dest, blkdata) // XXX copy
copy(dest, blkdata) // TODO avoid copy
// store to kernel pagecache whole block that we've just loaded from database.
// This way, even if the user currently requested to read only small portion from it,
......@@ -1423,7 +1398,7 @@ func traceIso(format string, argv ...interface{}) {
//
// must be called with atMu rlocked.
//
// XXX error - when? or close watch on any error?
// TODO close watch on any error
func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
defer xerr.Contextf(&err, "wlink%d: f<%s>", w.link.id, w.file.zfile.POid())
return w._pin(ctx, blk, rev)
......@@ -1453,7 +1428,7 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
}
w.pinnedMu.Unlock()
<-blkpin.ready // XXX + ctx ? (or just keep ready ?)
<-blkpin.ready // TODO +ctx cancel
if blkpin.rev == rev {
// already pinned
......@@ -1462,7 +1437,7 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
}
// relock the watch and check that w.pinned[blk] is the same. Retry if it is not.
// ( w.pinned[blk] could have changed while w.mu was not held e.g. by XXX recheck
// ( w.pinned[blk] could have changed while w.mu was not held e.g. by
// simultaneous setupWatch if we were called by readPinWatchers )
w.pinnedMu.Lock()
if blkpin == w.pinned[blk] {
......@@ -1518,29 +1493,21 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
//
// See "7.2) for all registered client@at watchers ..."
//
// Called with f.head.zheadMu rlocked.
// Must be called only for f under head/
// Must be called with f.head.zheadMu rlocked.
//
// XXX do we really need to use/propagate caller context here? ideally update
// watchers should be synchronous, and in practice we just use 30s timeout.
// watchers should be synchronous, and in practice we just use 30s timeout (TODO).
// Should a READ interrupt cause watch update failure? -> probably no
func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btree.LONode, blkcov btree.LKeyRange, zblk ZBlk, blkrevMax zodb.Tid) {
func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb.Tid) (err error) {
defer xerr.Context(&err, "pin watchers") // f.path and blk is already put into context by readBlk
// only head/ is being watched for
if f.head.rev != 0 {
return
panic("BUG: readPinWatchers: called for file under @revX/")
}
// fmt.Printf("S: read #%d -> pin watchers (#%d)\n", blk, len(f.watchTab))
// update δFtail index XXX -> move upper into readBlk ?
// (δFtail is just for δZ -> δF invalidation handling and is needed without isolation protocol)
// XXX ^^^ no - also need to query to send pins
// see "3) for */head/data the following invariant is maintained..."
bfdir := f.head.bfdir
δFtail := bfdir.δFtail
bfdir.δFmu.Lock() // XXX locking correct? XXX -> better push down?
δFtail.Track(f.zfile, blk, treepath, blkcov, zblk) // XXX pass in zblk.rev here?
f.accessed.Add(blk)
bfdir.δFmu.Unlock()
//fmt.Printf("S: read #%d -> pin watchers (#%d)\n", blk, len(f.watchTab))
// make sure that file[blk] on clients side stays as of @w.at state.
......@@ -1548,6 +1515,7 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btr
// This is likely to be the case, since most watchers should be usually close to head.
// If using blkrevMax only turns out to be not sufficient, we'll
// consult δFtail, which might involve recomputing it.
δFtail := f.head.bfdir.δFtail
blkrev := blkrevMax
blkrevRough := true
......@@ -1576,7 +1544,7 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btr
var err error
blkrev, _, err = δFtail.BlkRevAt(ctx, f.zfile, blk, f.head.zconn.At())
if err != nil {
panic(err) // XXX
return err
}
blkrevRough = false
......@@ -1593,32 +1561,25 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btr
// and most of them would be on different w.at - cache of the file will
// be lost. Via pinning to particular block revision, we make sure the
// revision to pin is the same on all clients, and so file cache is shared.
pinrev, _, err := δFtail.BlkRevAt(ctx, w.file.zfile, blk, w.at) // XXX move into go?
// XXX ^^^ w.file vs f ?
if err != nil {
panic(err) // XXX
}
//fmt.Printf("S: read #%d: watch @%s: pin -> @%s\n", blk, w.at, pinrev)
wg.Go(func(ctx context.Context) error {
defer w.atMu.RUnlock()
// XXX close watcher on any error
pinrev, _, err := δFtail.BlkRevAt(ctx, f.zfile, blk, w.at)
if err != nil {
return err
}
//fmt.Printf("S: read #%d: watch @%s: pin -> @%s\n", blk, w.at, pinrev)
// TODO close watcher on any error
return w.pin(ctx, blk, pinrev)
})
}
f.watchMu.RUnlock()
err := wg.Wait()
if err != nil {
panic(err) // XXX
}
return wg.Wait()
}
// setupWatch sets up or updates a Watch when client sends `watch <file> @<at>` request.
//
// XXX sends "pin" notifications; final "ok" must be sent by caller.
//
// XXX called synchronously - only 1 setupWatch call at a time?
// It sends "pin" notifications; final "ok" or "error" must be sent by caller.
func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.Tid) (err error) {
defer xerr.Contextf(&err, "setup watch f<%s> @%s", foid, at)
head := wlink.head
......@@ -1638,7 +1599,6 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
defer head.zheadMu.RUnlock()
headAt := head.zconn.At()
// XXX δFtail locking? (or ForgetPast is called only with zheadMu.W ?)
if at != zodb.InvalidTid && at < bfdir.δFtail.Tail() {
return fmt.Errorf("too far away back from head/at (@%s); δt = %s",
headAt, headAt.Time().Sub(at.Time().Time))
......@@ -1686,7 +1646,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
w.atMu.Lock()
// check at >= w.at
// XXX we might want to allow going back in history if we need it.
// TODO(?) we might want to allow going back in history if we need it.
if !(at >= w.at) {
w.atMu.Unlock()
f.watchMu.Unlock()
......@@ -1697,7 +1657,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// register w to f early, so that READs going in parallel to us
// preparing and processing initial pins, also send pins to w for read
// blocks. If we don't, we can miss to send pin to w for a freshly read
// block which could have revision > w.at: XXX test
// block which could have revision > w.at:
//
// 1 3 2 4
// ─────.────x───o────x───x──────]──────────
......@@ -1736,15 +1696,24 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
f.watchMu.Unlock()
wlink.byfileMu.Unlock()
// XXX defer -> unregister watch if error?
// TODO defer -> unregister watch if error
// pin all tracked file blocks that were changed in (at, head] range.
toPin := map[int64]zodb.Tid{} // blk -> @rev
δFtail := bfdir.δFtail
vδf, err := δFtail.SliceByFileRev(f.zfile, at, headAt) // XXX locking δFtail
vδf, err := δFtail.SliceByFileRevEx(f.zfile, at, headAt, zdata.QueryOptions{
// blk might be in δFtail because it is adjacent in
// ZBigFile.blktab to another blk that was explicitly tracked.
// We do not want to get those to avoid unnecessarily pinning
// potentially more blocks than needed.
//
// wcfs tests also verify that only blocks that were previously
// explicitly accessed are included into watch setup pins.
OnlyExplicitlyTracked: true,
})
if err != nil {
panic(err) // XXX
return err
}
for _, δfile := range vδf {
if δfile.Epoch {
......@@ -1767,24 +1736,10 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
if already {
continue
}
// blk might be in δFtail because it is adjacent in
// ZBigFile.blktab to another blk that was explicitly
// tracked. However wcfs tests expect that only blocks
// that were previously explicitly accessed are
// included into watch setup pins.
//
// XXX adjust wcfs tests to not require only accessed
// blocks to be in setup pins? But that would mean that
// potentially more blocks would be potentially
// _unnecessarily_ pinned if they are not going to be
// accessed at all.
if !f.accessed.Has(blk) {
continue
}
toPin[blk], _, err = δFtail.BlkRevAt(ctx, f.zfile, blk, at)
if err != nil {
panic(err) // XXX
return err
}
}
}
......@@ -1792,7 +1747,6 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// if a block was previously pinned, but ∉ δ(at, head] -> unpin it to head.
for blk, pinPrev := range w.pinned {
// only 1 setupWatch can be run simultaneously for one file
// XXX assert pinPrev.rev != zodb.TidMax
pinNew, pinning := toPin[blk]
if !pinning {
......@@ -1808,7 +1762,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
}
// downgrade atMu.W -> atMu.R to let other clients to access the file.
// XXX there is no primitive to do Wlock->Rlock atomically, but we are
// NOTE there is no primitive to do Wlock->Rlock atomically, but we are
// ok with that since we prepared everything to handle simultaneous pins
// from other reads.
w.atMu.Unlock()
......@@ -1833,7 +1787,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// Open serves /head/watch opens.
func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
// XXX check flags?
// TODO(?) check flags
head := wnode.head
wlink := &WatchLink{
......@@ -1857,7 +1811,6 @@ func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fus
// wcfs initiated pin requests.
func (wlink *WatchLink) serve() {
err := wlink._serve()
// XXX log error if !(close || EOF)
if err != nil {
log.Error(err)
}
......@@ -1871,7 +1824,7 @@ func (wlink *WatchLink) serve() {
func (wlink *WatchLink) _serve() (err error) {
defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id)
ctx0 := context.TODO() // XXX ctx = ? -> merge(ctx of wcfs running, ctx of wlink timeout)
ctx0 := context.TODO() // TODO ctx = merge(ctx of wcfs running, ctx of wlink timeout)
ctx, cancel := context.WithCancel(ctx0)
wg := xsync.NewWorkGroup(ctx)
......@@ -1934,10 +1887,8 @@ func (wlink *WatchLink) _serve() (err error) {
return e
})
// XXX recheck that it is safe to handle multiple simultaneous watch requests.
for {
l, err := r.ReadString('\n') // XXX limit accepted line len to prevent DOS
l, err := r.ReadString('\n') // TODO limit accepted line len to prevent DOS
if err != nil {
// r.Read is woken up by sk.CloseRead when serve decides to exit
if err == io.ErrClosedPipe || err == io.EOF {
......@@ -1969,7 +1920,7 @@ func (wlink *WatchLink) _serve() (err error) {
// client-initiated request
// bye TODO document in "Isolation protocol"
// bye
if msg == "bye" {
return nil // deferred sk.Close will wake-up rx on client side
}
......@@ -2011,16 +1962,37 @@ func (wlink *WatchLink) _handleWatch(ctx context.Context, msg string) error {
// sendReq sends wcfs-originated request to client and returns client response.
func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, err error) {
// XXX err ctx
defer xerr.Context(&err, "sendReq") // wlink is already put into ctx by caller
var stream uint64
for stream == 0 {
stream = atomic.AddUint64(&wlink.reqNext, +2)
}
rxq := make(chan string) // XXX cap=1? (so that if we return canceled we do not block client)
rxq := make(chan string, 1)
wlink.rxMu.Lock()
wlink.rxTab[stream] = rxq // XXX assert .stream is not there?
_, already := wlink.rxTab[stream]
if !already {
wlink.rxTab[stream] = rxq
}
wlink.rxMu.Unlock()
if already {
panic("BUG: to-be-sent stream is present in rxtab")
}
defer func() {
if err != nil {
// remove rxq from rxTab
// ( _serve could have already deleted it if unexpected
// reply came to the stream, but no other rxq should
// have registered on the [stream] slot )
wlink.rxMu.Lock()
delete(wlink.rxTab, stream)
wlink.rxMu.Unlock()
// no need to drain rxq - it was created with cap=1
}
}()
err = wlink.send(ctx, stream, req)
if err != nil {
......@@ -2029,7 +2001,6 @@ func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string,
select {
case <-ctx.Done():
// XXX del rxTab[stream] ?
return "", ctx.Err()
case reply = <-rxq:
......@@ -2040,16 +2011,20 @@ func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string,
// send sends a message to client over specified stream ID.
//
// Multiple send can be called simultaneously; send serializes writes.
func (wlink *WatchLink) send(ctx context.Context, stream uint64, msg string) error {
// XXX err ctx
// XXX assert '\n' not in msg
func (wlink *WatchLink) send(ctx context.Context, stream uint64, msg string) (err error) {
defer xerr.Contextf(&err, "send .%d", stream) // wlink is already put into ctx by caller
// assert '\n' not in msg
if strings.ContainsRune(msg, '\n') {
panicf("BUG: msg contains \\n ; msg: %q", msg)
}
wlink.txMu.Lock()
defer wlink.txMu.Unlock()
pkt := []byte(fmt.Sprintf("%d %s\n", stream, msg))
traceIso("S: wlink%d: tx: %q\n", wlink.id, pkt)
_, err := wlink.sk.Write(ctx, pkt)
_, err = wlink.sk.Write(ctx, pkt)
if err != nil {
return err
}
......@@ -2079,7 +2054,7 @@ func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context)
return nil, eINVALf("not oid")
}
bfdir.head.zheadMu.RLock() // XXX +fctx -> cancel
bfdir.head.zheadMu.RLock() // TODO +fctx -> cancel
defer bfdir.head.zheadMu.RUnlock()
defer func() {
......@@ -2098,7 +2073,7 @@ func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context)
}
// not there - without bfdir lock proceed to open BigFile from ZODB
f, err = bfdir.head.bigopen(fctx, oid)
f, err = bfdir.head.bigfopen(fctx, oid)
if err != nil {
return nil, err
}
......@@ -2109,7 +2084,7 @@ func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context)
f2, already := bfdir.fileTab[oid]
if already {
bfdir.fileMu.Unlock()
f.Close()
// f.Close() not needed - BigFile is all just garbage-collected
return f2, nil
}
......@@ -2158,7 +2133,6 @@ func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) {
}
// not there - without revMu lock proceed to open @rev view of ZODB
// zconnRev, err := root.zopenAt(fctx, rev)
zconnRev, err := xzodb.ZOpen(fctx, root.zdb, &zodb.ConnOptions{At: rev})
if err != nil {
return nil, err
......@@ -2177,17 +2151,17 @@ func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) {
}
revDir = &Head{
// XXX how to test forgets:
// TODO how to test forgets:
// echo 2 >/proc/sys/vm/drop_caches (root)
// mount -i -oremount $mntpt (root ?) (shrinks dcache)
// notify invalidate dentry from inside fs
fsNode: newFSNode(&fsOptions{Sticky: false}), // XXX + Head.OnForget() -> del root.revTab[]
fsNode: newFSNode(&fsOptions{Sticky: false}), // TODO + Head.OnForget() -> del root.revTab[]
rev: rev,
zconn: zconnRev, // XXX + Head.OnForget() -> release zconn (= abort zconn.TxnCtx)
zconn: zconnRev, // TODO + Head.OnForget() -> release zconn (= abort zconn.TxnCtx)
}
bfdir := &BigFileDir{
fsNode: newFSNode(&fsOptions{Sticky: false}), // XXX + BigFileDir.OnForget()
fsNode: newFSNode(&fsOptions{Sticky: false}), // TODO + BigFileDir.OnForget()
head: revDir,
fileTab: make(map[zodb.Oid]*BigFile),
δFtail: nil, // δFtail not needed/used for @revX/
......@@ -2200,22 +2174,21 @@ func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) {
// mkdir takes filesystem treeLock - do it outside revMu.
mkdir(root, name, revDir)
mkdir(revDir, "bigfile", bfdir)
// XXX + "at"
return revDir, nil
}
// bigopen opens BigFile corresponding to oid on head.zconn.
// bigfopen opens BigFile corresponding to oid on head.zconn.
//
// A ZBigFile corresponding to oid is activated and statted.
//
// head.zconn must be locked.
func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err error) {
// head.zheadMu must be locked.
func (head *Head) bigfopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err error) {
zconn := head.zconn
defer xerr.Contextf(&err, "bigopen %s @%s", oid, zconn.At())
defer xerr.Contextf(&err, "bigfopen %s @%s", oid, zconn.At())
// XXX better ctx = transaction.PutIntoContext(ctx, txn)
// TODO better ctx = transaction.PutIntoContext(ctx, txn)
ctx, cancel := xcontext.Merge(ctx, zconn.TxnCtx)
defer cancel()
......@@ -2225,7 +2198,7 @@ func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err er
case *zodb.NoObjectError:
return nil, eINVAL(err)
case *zodb.NoDataError:
return nil, eINVAL(err) // XXX what to do if it was existing and got deleted?
return nil, eINVAL(err)
default:
return nil, err
}
......@@ -2242,12 +2215,10 @@ func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err er
return nil, err
}
blksize := zfile.BlkSize()
// XXX it should be revision of both ZBigFile and its data. But we
// NOTE file revision should be revision of both ZBigFile and its data. But we
// cannot get data revision without expensive scan of all ZBigFile's objects.
// -> approximate mtime initially with ZBigFile object mtime.
//
// XXX for @rev/... we can know initial mtime more exactly?
rev := zfile.PSerial()
revApprox := zfile.PSerial()
zfile.PDeactivate()
size, sizePath, blkCov, err := zfile.Size(ctx)
......@@ -2256,50 +2227,32 @@ func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err er
}
f := &BigFile{
fsNode: newFSNode(&fsOptions{Sticky: false}), // XXX + BigFile.OnForget -> del .head.bfdir.fileTab[]
head: head,
zfile: zfile,
blksize: blksize,
size: size,
rev: rev,
loading: make(map[int64]*blkLoadState),
fsNode: newFSNode(&fsOptions{Sticky: false}), // TODO + BigFile.OnForget -> del .head.bfdir.fileTab[]
head: head,
zfile: zfile,
blksize: blksize,
size: size,
revApprox: revApprox,
loading: make(map[int64]*blkLoadState),
}
// only head/ needs δFtail, f.δtail and watches.
// only head/ needs δFtail and watches.
if head.rev == 0 {
// see "3) for */head/data the following invariant is maintained..."
head.bfdir.δFmu.Lock() // XXX locking ok?
head.bfdir.δFtail.Track(f.zfile, -1, sizePath, blkCov, nil)
head.bfdir.δFmu.Unlock()
// FIXME: scan zfile.blktab - so that we can detect all btree changes
// see "XXX building δFtail lazily ..." in notes.txt
f.accessed = make(setI64)
f.watchTab = make(map[*Watch]struct{})
}
return f, nil
}
// Close release all resources of BigFile. XXX needed?
func (f *BigFile) Close() error {
// XXX locking?
f.zfile = nil
// f.zconn.Release()
// f.zconn = nil
f.head = nil
return nil
}
// ---- misc ---
// /(head|<rev>)/at -> readAt serves read.
func (h *Head) readAt(fctx *fuse.Context) ([]byte, error) {
// XXX cancel on fctx cancel
h.zheadMu.RLock()
h.zheadMu.RLock() // TODO +fctx -> cancel
defer h.zheadMu.RUnlock()
return []byte(h.zconn.At().String()), nil
......@@ -2309,7 +2262,7 @@ func (h *Head) readAt(fctx *fuse.Context) ([]byte, error) {
func (head *Head) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fuse.Status {
at := head.rev
if at == 0 {
head.zheadMu.RLock() // XXX +fctx -> cancel
head.zheadMu.RLock() // TODO +fctx -> cancel
at = head.zconn.At()
head.zheadMu.RUnlock()
}
......@@ -2322,7 +2275,7 @@ func (head *Head) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fus
// /(head|<rev>)/bigfile/<bigfileX> -> Getattr serves stat.
func (f *BigFile) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fuse.Status {
f.head.zheadMu.RLock() // XXX +fctx -> cancel
f.head.zheadMu.RLock() // TODO +fctx -> cancel
defer f.head.zheadMu.RUnlock()
f.getattr(out)
......@@ -2332,10 +2285,10 @@ func (f *BigFile) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fus
func (f *BigFile) getattr(out *fuse.Attr) {
out.Mode = fuse.S_IFREG | 0444
out.Size = uint64(f.size)
out.Blksize = uint32(f.blksize) // XXX 64 -> 32
out.Blksize = uint32(f.blksize) // NOTE truncating 64 -> 32
// .Blocks
mtime := f.rev.Time().Time
mtime := f.revApprox.Time().Time
out.SetTimes(/*atime=*/nil, /*mtime=*/&mtime, /*ctime=*/&mtime)
}
......@@ -2377,14 +2330,14 @@ type _wcfs_Zhead struct {
}
func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
// XXX check flags?
// TODO(?) check flags
sk := NewFileSock()
sk.CloseRead()
groot.head.zheadMu.Lock() // XXX +fctx -> cancel
groot.head.zheadMu.Lock() // TODO +fctx -> cancel
defer groot.head.zheadMu.Unlock()
// XXX del zheadSockTab[sk] on sk.File.Release (= client drops opened handle)
// TODO del zheadSockTab[sk] on sk.File.Release (= client drops opened handle)
gdebug.zheadSockTab[sk] = struct{}{}
return sk.File(), fuse.OK
}
......@@ -2392,8 +2345,8 @@ func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse
// TODO -> enable/disable fuse debugging dynamically (by write to .wcfs/debug ?)
func main() {
stdlog.SetPrefix("wcfs: ")
//log.CopyStandardLogTo("WARNING") // XXX -> "DEBUG" if -d ?
//stdlog.SetPrefix("wcfs: ") NOTE conflicts with log.CopyStandardLogTo
log.CopyStandardLogTo("WARNING")
defer log.Flush()
err := _main()
......@@ -2405,8 +2358,6 @@ func main() {
func _main() (err error) {
debug := flag.Bool("d", false, "debug")
autoexit := flag.Bool("autoexit", false, "automatically stop service when there is no client activity")
// XXX option to prevent starting if wcfs was already started/mounted on mntpt ?
// XXX do the check unconditionally?
flag.Parse()
if len(flag.Args()) != 2 {
......@@ -2420,7 +2371,7 @@ func _main() (err error) {
err = xerr.First(err, c.Close())
}
// debug -> precise t, no dates (XXX -> always precise t?)
// debug -> precise t, no dates (TODO(?) -> always precise t?)
if *debug {
stdlog.SetFlags(stdlog.Lmicroseconds)
}
......@@ -2434,7 +2385,7 @@ func _main() (err error) {
log.Info(gover)
// open zodb storage/watch/db/connection
ctx := context.Background() // XXX + timeout?
ctx := context.Background() // TODO(?) + timeout?
zstor, err := zodb.Open(ctx, zurl, &zodb.OpenOptions{
ReadOnly: true,
})
......@@ -2505,7 +2456,7 @@ func _main() (err error) {
// TODO -> teach go-fuse to handle Init.MaxPages (Linux 4.20+).
MaxWrite: 2*1024*1024,
// XXX tune MaxReadAhead? MaxBackground?
// TODO(?) tune MaxReadAhead? MaxBackground?
// OS cache that we populate with bigfile data is precious;
// we explicitly propagate ZODB invalidations into file invalidations.
......@@ -2519,8 +2470,8 @@ func _main() (err error) {
if err != nil {
return err
}
groot = root // FIXME temp workaround (see ^^^)
gfsconn = fsconn // FIXME ----//----
groot = root // FIXME temp workaround (see ^^^)
gfsconn = fsconn // FIXME ----//----
gmntpt = mntpt
// we require proper pagecache control (added to Linux 2.6.36 in 2010)
......@@ -2544,7 +2495,7 @@ func _main() (err error) {
// add entries to /
mkdir(root, "head", head)
mkdir(head, "bigfile", bfdir)
mkfile(head, "at", NewSmallFile(head.readAt)) // TODO mtime(at) = tidtime(at)
mkfile(head, "at", NewSmallFile(head.readAt)) // TODO mtime(at) = tidtime(at)
mkfile(head, "watch", wnode)
// for debugging/testing
......@@ -2594,12 +2545,13 @@ func _main() (err error) {
if errors.Cause(err) != context.Canceled {
log.Error(err)
log.Errorf("zwatcher failed -> switching filesystem to EIO mode (TODO)")
//panic("TODO: switch fs to EIO mode") // XXX
// TODO: switch fs to EIO mode
}
// wait for unmount
// XXX the kernel does not send FORGETs on unmount - release left node resources ourselves?
// NOTE the kernel does not send FORGETs on unmount - but we don't need
// to release left node resources ourselves, because it is just memory.
<-serveCtx.Done()
log.Infof("stop %q %q", mntpt, zurl)
return nil // XXX serveErr | zwatchErr ?
return nil
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment