Commit 4430de41 authored by Kirill Smelkov's avatar Kirill Smelkov

wcfs: Handle ZODB invalidations

Use ΔFtail.Track on every READ, and query accumulated ΔFtail upon
receiving ZODB invalidation to query it about which blocks of which
files have been changed. Then invalidate those blocks in OS file cache.

See added documentation to wcfs.go and notes.txt for details.

Now the filesystem is no longer stale: it provides view of data
that is uptodate wrt changes on ZODB storage.

Some preliminary history:

kirr/wendelin.core@9b4a42a3    X invalidation design draftly settled
kirr/wendelin.core@27d91d47    X δFtail settled
kirr/wendelin.core@33e0dfce    X ΔTail draftly done
kirr/wendelin.core@822366a7    X keeping fd to root opened prevents the filesystem from being unmounted
kirr/wendelin.core@89ad3a79    X Don't keep ZBigFile activated during whole current transaction
kirr/wendelin.core@245511ac    X Give pointer on from where to get nxd-fuse.ko
kirr/wendelin.core@d1cd128c    X Hit FUSE-related deadlock
kirr/wendelin.core@d134ee44    X FUSE lookup deadlock should be hopefully fixed
kirr/wendelin.core@0e60e9ff    X wcfs: Don't noise ZWatcher trace logs with "select ..."
kirr/wendelin.core@bf9a7405    X No longer rely on ZODB cache invariant for invalidations
parent 46f3f3fd
==============================================
Additional notes to documentation in wcfs.go
==============================================
This file contains notes additional to usage documentation and internal
organization overview in wcfs.go .
Notes on OS pagecache control
=============================
The cache of snapshotted bigfile can be pre-made hot if invalidated region
was already in pagecache of head/bigfile/file:
- we can retrieve a region from pagecache of head/file with FUSE_NOTIFY_RETRIEVE.
- we can store that retrieved data into pagecache region of @<revX>/ with FUSE_NOTIFY_STORE.
- we can invalidate a region from pagecache of head/file with FUSE_NOTIFY_INVAL_INODE.
we have to disable FUSE_AUTO_INVAL_DATA to tell the kernel we are fully
responsible for invalidating pagecache. If we don't, the kernel will be
clearing whole cache of head/file on e.g. its mtime change.
Note: disabling FUSE_AUTO_INVAL_DATA does not fully prevent kernel from automatically
invalidating pagecache - e.g. it will invalidate whole cache on file size changes:
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/fs/fuse/inode.c?id=e0bc833d10#n233
It was hoped that we could workaround it with using writeback mode (see !is_wb
in the link above), but it turned out that in writeback mode the kernel indeed
does not invalidate data cache on file size change, but neither it allows the
filesystem to set the size due to external event (see https://git.kernel.org/linus/8373200b12
"fuse: Trust kernel i_size only"). This prevents us to use writeback workaround
as we cannot even update the file from being empty to have some data.
-> we did the patch for FUSE to have proper flag for filesystem server to tell
the kernel it is fully responsible for invalidating pagecache. The patch is
part of Linux 5.2:
https://git.kernel.org/linus/ad2ba64dd489
Kernel locks page on read/cache store/... - we have to be careful not to deadlock
=================================================================================
The kernel, when doing FUSE operations, locks corresponding pages. For example
it locks a page, where it is going to read data into, before issuing FUSE read
request. Correspondingly, on e.g. cache store, the kernel also locks page where
data has to be stored.
It is easy to deadlock if we don't take this locks into account. For example
if we try to upload data to kernel pagecache from under serving read request,
this can deadlock.
Another case that needs to be cared about is interaction between uploadBlk and
zwatcher: zheadMu being RWMutex, does not allow new RLocks to be taken once
Lock request has been issued. Thus the following scenario is possible::
uploadBlk os.Read zwatcher
page.Lock
zheadMu.Rlock
zheadMu.Lock
page.Lock
zheadMu.Rlock
- zwatcher is waiting for uploadBlk to release zheadMu;
- uploadBlk is waiting for os.Read to release page;
- os.Read is waiting for zwatcher to release zheadMu;
- deadlock.
To avoid such deadlocks zwatcher asks OS cache uploaders to pause while it is
running, and retries taking zheadMu.Lock until all uploaders are indeed paused.
...@@ -60,7 +60,7 @@ ...@@ -60,7 +60,7 @@
// ... // ...
// //
// where /bigfile/<bigfileX> represents latest bigfile data as stored in // where /bigfile/<bigfileX> represents latest bigfile data as stored in
// upstream ZODB. // upstream ZODB. As there can be some lag receiving updates from the database,
// /at describes precisely ZODB state for which bigfile data is currently // /at describes precisely ZODB state for which bigfile data is currently
// exposed. // exposed.
// //
...@@ -126,6 +126,80 @@ package main ...@@ -126,6 +126,80 @@ package main
// //
// 1) 1 ZODB connection for "latest data" for whole filesystem (zhead). // 1) 1 ZODB connection for "latest data" for whole filesystem (zhead).
// 2) head/bigfile/* of all bigfiles represent state as of zhead.At . // 2) head/bigfile/* of all bigfiles represent state as of zhead.At .
// 3) for head/bigfile/* the following invariant is maintained:
//
// #blk ∈ OS file cache => all BTree/Bucket/ZBlk that lead to blk are tracked(%)
//
// The invariant helps on invalidation: when δFtail (see below) sees a
// changed oid, it is guaranteed that if the change affects block that was
// ever provided to OS, δFtail will detect that this block has changed.
// And if oid relates to a file block but is not in δFtail's tracking set -
// we know that block is not cached and will trigger ZODB load on a future
// file read.
//
// Currently we maintain this invariant by adding ZBlk/LOBTree/LOBucket
// objects to δFtail on every access, and never shrinking that tracking set.
// In the future we may want to try to synchronize to kernel freeing its
// pagecache pages.
//
// 4) when we receive an invalidation message from ZODB - we process it and
// propagate invalidations to OS file cache of head/bigfile/*:
//
// invalidation message: δZ = (tid↑, []oid)
//
// 4.1) δF = δFtail.Update(δZ)
//
// δFtail (see below) converts ZODB-level changes into information about
// which blocks of which files were modified and need to be invalidated:
//
// δF = (tid↑, {} file -> []#blk)
//
// Note that δF might be not full and reflects only changes to files and
// blocks that were requested to be tracked. However because of the invariant
// δF covers in full what needs to be invalidated in the OS file cache.
//
// 4.2) for all file/blk to invalidate we do:
//
// - try to retrieve head/bigfile/file[blk] from OS file cache(*);
// - if retrieved successfully -> store retrieved data back into OS file
// cache for @<rev>/bigfile/file[blk], where
//
// rev = δFtail.BlkRevAt(file, #blk, zhead.at)
//
// - invalidate head/bigfile/file[blk] in OS file cache.
//
// This preserves previous data in OS file cache in case it will be needed
// by not-yet-uptodate clients, and makes sure file read of head/bigfile/file[blk]
// won't be served from OS file cache and instead will trigger a FUSE read
// request to wcfs.
//
// 4.4) processing ZODB invalidations and serving file reads (see 7) are
// organized to be mutually exclusive.
//
// 5) after OS file cache was invalidated, we resync zhead to new database
// view corresponding to tid.
//
// 6) a ZBigFile-level history tail is maintained in δFtail.
//
// δFtail translates ZODB object-level changes into information about which
// blocks of which ZBigFile were modified, and provides service to query
// that information.
//
// It semantically consists of
//
// []δF
//
// where δF represents a change in files space
//
// δF:
// .rev↑
// {} file -> {}blk
//
// Scalability of δFtail plays important role in scalability of WCFS.
//
// See documentation in internal/zdata/δftail.go for more details on ΔFtail
// and its scalability properties.
//
// 7) when we receive a FUSE read(#blk) request to a head/bigfile/file, we process it as follows: // 7) when we receive a FUSE read(#blk) request to a head/bigfile/file, we process it as follows:
// //
// 7.1) load blkdata for head/bigfile/file[blk] @zhead.at . // 7.1) load blkdata for head/bigfile/file[blk] @zhead.at .
...@@ -136,13 +210,45 @@ package main ...@@ -136,13 +210,45 @@ package main
// ZODB connection and without notifying any watches. // ZODB connection and without notifying any watches.
// //
// 9) for every ZODB connection (zhead + one per @<rev>) a dedicated read-only // 9) for every ZODB connection (zhead + one per @<rev>) a dedicated read-only
// transaction is maintained. // transaction is maintained. For zhead, every time it is resynced (see "5")
// the transaction associated with zhead is renewed.
// //
// TODO 10) gc @rev/ and @rev/bigfile/<bigfileX> automatically on atime timeout // TODO 10) gc @rev/ and @rev/bigfile/<bigfileX> automatically on atime timeout
//
//
// (*) see notes.txt -> "Notes on OS pagecache control"
// (%) no need to keep track of ZData - ZBlk1 is always marked as changed on blk data change.
// Wcfs locking organization
//
// As it was said processing ZODB invalidations (see "4") and serving file
// reads (see "7") are organized to be mutually exclusive. To do so a major RW
// lock - zheadMu - is used. Whenever ZODB invalidations are processed and
// zhead.at is updated - zheadMu.W is taken. Contrary whenever file read is
// served and in other situations - which needs zhead to remain viewing
// database at the same state - zheadMu.R is taken.
//
// Several locks that protect internal data structures are minor to zheadMu -
// they need to be taken only under zheadMu.R (to protect e.g. multiple readers
// running simultaneously to each other), but do not need to be taken at all if
// zheadMu.W is taken. In data structures such locks are noted as follows
//
// xMu sync.Mutex // zheadMu.W | zheadMu.R + xMu
//
// If a lock is not minor to zheadMu, it is still ok to lock it under zheadMu.R
// as zheadMu, being the most major lock in wcfs, always comes locked first, if
// it needs to be locked.
// Notation used // Notation used
// //
// δZ - change in ZODB space
// δB - change in BTree*s* space
// δT - change in BTree(1) space
// δF - change in File*s* space
// δfile - change in File(1) space
//
// f - BigFile // f - BigFile
// bfdir - BigFileDir // bfdir - BigFileDir
...@@ -155,13 +261,18 @@ import ( ...@@ -155,13 +261,18 @@ import (
"math" "math"
"os" "os"
"runtime" "runtime"
"sort"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"syscall"
"time"
log "github.com/golang/glog" log "github.com/golang/glog"
"lab.nexedi.com/kirr/go123/xcontext" "lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xio"
"lab.nexedi.com/kirr/go123/xruntime/race" "lab.nexedi.com/kirr/go123/xruntime/race"
"lab.nexedi.com/kirr/go123/xsync" "lab.nexedi.com/kirr/go123/xsync"
...@@ -202,6 +313,7 @@ type Root struct { ...@@ -202,6 +313,7 @@ type Root struct {
zdb *zodb.DB zdb *zodb.DB
// directory + ZODB connection for head/ // directory + ZODB connection for head/
// (zhead is Resync'ed and is kept outside zdb pool)
head *Head head *Head
// directories + ZODB connections for @<rev>/ // directories + ZODB connections for @<rev>/
...@@ -218,7 +330,28 @@ type Head struct { ...@@ -218,7 +330,28 @@ type Head struct {
// at - served by .readAt // at - served by .readAt
// ZODB connection for everything under this head // ZODB connection for everything under this head
zconn *xzodb.ZConn
// zheadMu protects zconn.At & live _objects_ associated with it.
// while it is rlocked zconn is guaranteed to stay viewing database at
// particular view.
//
// zwatcher write-locks this and knows noone is using ZODB objects and
// noone mutates OS file cache while zwatcher is running.
//
// it is also kept rlocked by OS cache uploaders (see BigFile.uploadBlk)
// with additional locking protocol to avoid deadlocks (see below for
// pauseOSCacheUpload + ...).
//
// TODO head.zheadMu -> special mutex with Lock(ctx) so that Lock wait could be canceled
zheadMu sync.RWMutex
zconn *xzodb.ZConn // for head/ zwatcher resyncs head.zconn; others only read zconn objects.
// zwatcher signals to uploadBlk to pause/continue uploads to OS cache to avoid deadlocks.
// see notes.txt -> "Kernel locks page on read/cache store/..." for details.
pauseOSCacheUpload bool
continueOSCacheUpload chan struct{}
// uploadBlk signals to zwatcher that there are so many inflight OS cache uploads currently.
inflightOSCacheUploads int32
} }
// /(head|<rev>)/bigfile/ - served by BigFileDir. // /(head|<rev>)/bigfile/ - served by BigFileDir.
...@@ -227,8 +360,12 @@ type BigFileDir struct { ...@@ -227,8 +360,12 @@ type BigFileDir struct {
head *Head // parent head/ or @<rev>/ head *Head // parent head/ or @<rev>/
// {} oid -> <bigfileX> // {} oid -> <bigfileX>
fileMu sync.Mutex fileMu sync.Mutex // zheadMu.W | zheadMu.R + fileMu
fileTab map[zodb.Oid]*BigFile fileTab map[zodb.Oid]*BigFile
// δ tail of tracked BTree nodes of all BigFiles + -> which file
// (used only for head/, not revX/)
δFtail *zdata.ΔFtail // read/write access protected by zheadMu.{R,W}
} }
// /(head|<rev>)/bigfile/<bigfileX> - served by BigFile. // /(head|<rev>)/bigfile/<bigfileX> - served by BigFile.
...@@ -247,13 +384,16 @@ type BigFile struct { ...@@ -247,13 +384,16 @@ type BigFile struct {
blksize int64 // zfile.blksize blksize int64 // zfile.blksize
size int64 // zfile.Size() size int64 // zfile.Size()
revApprox zodb.Tid // approx last revision that modified zfile data revApprox zodb.Tid // approx last revision that modified zfile data
// ( we can't know rev fully as some later blocks could be learnt only
// while populating δFtail lazily. For simplicity we don't delve into
// updating revApprox during lifetime of current transaction )
// inflight loadings of ZBigFile from ZODB. // inflight loadings of ZBigFile from ZODB.
// successful load results are kept here until blkdata is put into OS pagecache. // successful load results are kept here until blkdata is put into OS pagecache.
// //
// Being a staging area for data to enter OS cache, loading has to be // Being a staging area for data to enter OS cache, loading has to be
// consulted/invalidated whenever wcfs logic needs to consult/invalidate OS cache. // consulted/invalidated whenever wcfs logic needs to consult/invalidate OS cache.
loadMu sync.Mutex loadMu sync.Mutex // zheadMu.W | zheadMu.R + loadMu
loading map[int64]*blkLoadState // #blk -> {... blkdata} loading map[int64]*blkLoadState // #blk -> {... blkdata}
} }
...@@ -304,10 +444,377 @@ func (_ *zodbCacheControl) PCacheClassify(obj zodb.IPersistent) zodb.PCachePolic ...@@ -304,10 +444,377 @@ func (_ *zodbCacheControl) PCacheClassify(obj zodb.IPersistent) zodb.PCachePolic
} }
// -------- 4) ZODB invalidation -> OS cache --------
func traceZWatch(format string, argv ...interface{}) {
if !log.V(1) {
return
}
log.InfoDepth(1, fmt.Sprintf("zwatcher: " + format, argv...))
}
func debugZWatch(format string, argv ...interface{}) {
if !log.V(2) {
return
}
log.InfoDepth(1, fmt.Sprintf("zwatcher: " + format, argv...))
}
// zwatcher watches for ZODB changes.
//
// see "4) when we receive an invalidation message from ZODB ..."
func (root *Root) zwatcher(ctx context.Context, zwatchq chan zodb.Event) (err error) {
defer xerr.Contextf(&err, "zwatch %s", root.zstor.URL())
traceZWatch(">>>")
var zevent zodb.Event
var ok bool
for {
debugZWatch("select ...")
select {
case <-ctx.Done():
traceZWatch("cancel")
return ctx.Err()
case zevent, ok = <-zwatchq:
if !ok {
traceZWatch("zwatchq closed")
return nil // closed
}
}
traceZWatch("zevent: %s", zevent)
switch zevent := zevent.(type) {
default:
return fmt.Errorf("unexpected event: %T", zevent)
case *zodb.EventError:
return zevent.Err
case *zodb.EventCommit:
err = root.handleδZ(ctx, zevent)
if err != nil {
return err
}
}
}
}
// handleδZ handles 1 change event from ZODB notification.
func (root *Root) handleδZ(ctx context.Context, δZ *zodb.EventCommit) (err error) {
defer xerr.Contextf(&err, "handleδZ @%s", δZ.Tid)
head := root.head
// while we are invalidating OS cache, make sure that nothing, that
// even reads /head/bigfile/*, is running (see 4.4).
//
// also make sure that cache uploaders we spawned (uploadBlk) are all
// paused, or else they could overwrite OS cache with stale data.
// see notes.txt -> "Kernel locks page on read/cache store/..." for
// details on how to do this without deadlocks.
continueOSCacheUpload := make(chan struct{})
retry:
for {
// TODO ctx cancel
head.zheadMu.Lock()
head.pauseOSCacheUpload = true
head.continueOSCacheUpload = continueOSCacheUpload
// NOTE need atomic load, since inflightOSCacheUploads
// decrement is done not under zheadMu.
if atomic.LoadInt32(&head.inflightOSCacheUploads) != 0 {
head.zheadMu.Unlock()
continue retry
}
break
}
defer func() {
head.pauseOSCacheUpload = false
head.continueOSCacheUpload = nil
head.zheadMu.Unlock()
close(continueOSCacheUpload)
}()
// zheadMu.W taken and all cache uploaders are paused
zhead := head.zconn
bfdir := head.bfdir
// invalidate kernel cache for data in changed files
δF, err := bfdir.δFtail.Update(δZ) // δF <- δZ |tracked
if err != nil {
return err
}
if log.V(2) {
// debug dump δF
log.Infof("\n\nS: handleδZ: δF (#%d):\n", len(δF.ByFile))
for foid, δfile := range δF.ByFile {
blkv := δfile.Blocks.Elements()
sort.Slice(blkv, func(i, j int) bool {
return blkv[i] < blkv[j]
})
flags := ""
if δfile.Size {
flags += "S"
}
if δfile.Epoch {
flags += "E"
}
log.Infof("S: \t- %s\t%2s %v\n", foid, flags, blkv)
}
log.Infof("\n\n")
}
// invalidate kernel cache for file data
wg := xsync.NewWorkGroup(ctx)
for foid, δfile := range δF.ByFile {
// file was requested to be tracked -> it must be present in fileTab
file := bfdir.fileTab[foid]
if δfile.Epoch {
wg.Go(func(ctx context.Context) error {
return file.invalidateAll() // NOTE does not accept ctx
})
} else {
for blk := range δfile.Blocks {
blk := blk
wg.Go(func(ctx context.Context) error {
return file.invalidateBlk(ctx, blk)
})
}
}
}
err = wg.Wait()
if err != nil {
return err
}
// invalidate kernel cache for attributes
// we need to do it only if we see topology (i.e. btree) change
//
// do it after completing data invalidations.
wg = xsync.NewWorkGroup(ctx)
for foid, δfile := range δF.ByFile {
if !δfile.Size {
continue
}
file := bfdir.fileTab[foid] // must be present
wg.Go(func(ctx context.Context) error {
return file.invalidateAttr() // NOTE does not accept ctx
})
}
err = wg.Wait()
if err != nil {
return err
}
// resync .zhead to δZ.tid
// 1. abort old and resync to new txn/at
transaction.Current(zhead.TxnCtx).Abort()
_, ctx = transaction.New(context.Background())
err = zhead.Resync(ctx, δZ.Tid)
if err != nil {
return err
}
zhead.TxnCtx = ctx
// 2. restat invalidated ZBigFile
// NOTE no lock needed since .blksize and .size are constant during lifetime of one txn.
// TODO -> parallel
for foid, δfile := range δF.ByFile {
file := bfdir.fileTab[foid] // must be present
zfile := file.zfile
if δfile.Size {
size, sizePath, blkCov, err := zfile.Size(ctx)
if err != nil {
return err
}
file.size = size
// see "3) for */head/data the following invariant is maintained..."
bfdir.δFtail.Track(zfile, -1, sizePath, blkCov, nil)
}
// NOTE we can miss a change to file if δblk is not yet tracked
// that's why revision is only approximated
file.revApprox = zhead.At()
}
// notify .wcfs/zhead
for sk := range gdebug.zheadSockTab {
_, err := fmt.Fprintf(xio.BindCtxW(sk, ctx), "%s\n", δZ.Tid)
if err != nil {
log.Errorf("zhead: %s: write: %s (detaching reader)", sk, err)
sk.Close()
delete(gdebug.zheadSockTab, sk)
}
}
// shrink δFtail not to grow indefinitely.
// cover history for at least 1 minute.
//
// TODO shrink δFtail only once in a while - there is no need to
// cut δFtail on every transaction.
revCut := zodb.TidFromTime(zhead.At().Time().Add(-1*time.Minute))
bfdir.δFtail.ForgetPast(revCut)
return nil
}
// invalidateBlk invalidates 1 file block in kernel cache.
//
// see "4.2) for all file/blk to in invalidate we do"
// called with zheadMu wlocked.
func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) {
defer xerr.Contextf(&err, "%s: invalidate blk #%d:", f.path(), blk)
fsconn := gfsconn
blksize := f.blksize
off := blk*blksize
var blkdata []byte = nil
// first try to retrieve f.loading[blk];
// make sure f.loading[blk] is invalidated.
//
// we are running with zheadMu wlocked - no need to lock f.loadMu
loading, ok := f.loading[blk]
if ok {
if loading.err == nil {
blkdata = loading.blkdata
}
delete(f.loading, blk)
}
// TODO skip retrieve/store if len(f.watchTab) == 0
// try to retrieve cache of current head/data[blk], if we got nothing from f.loading
if blkdata == nil {
blkdata = make([]byte, blksize)
n, st := fsconn.FileRetrieveCache(f.Inode(), off, blkdata)
if st != fuse.OK {
log.Errorf("%s: retrieve blk #%d from cache: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, st)
}
blkdata = blkdata[:n]
}
// if less than blksize was cached - probably the kernel had to evict
// some data from its cache already. In such case we don't try to
// preserve the rest and drop what was read, to avoid keeping the
// system overloaded.
//
// if we have the data - preserve it under @revX/bigfile/file[blk].
if int64(len(blkdata)) == blksize {
err := func() error {
// store retrieved data back to OS cache for file @<rev>/file[blk]
δFtail := f.head.bfdir.δFtail
blkrev, _, err := δFtail.BlkRevAt(ctx, f.zfile, blk, f.head.zconn.At())
if err != nil {
return err
}
frev, funlock, err := groot.lockRevFile(blkrev, f.zfile.POid())
if err != nil {
return fmt.Errorf("BUG: %s", err)
}
defer funlock()
st := fsconn.FileNotifyStoreCache(frev.Inode(), off, blkdata)
if st != fuse.OK {
return fmt.Errorf("BUG: %s: store cache: %s", frev.path(), st)
}
return nil
}()
if err != nil {
log.Errorf("%s: invalidate blk #%d: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, err)
}
}
// invalidate file/head/data[blk] in OS file cache.
st := fsconn.FileNotify(f.Inode(), off, blksize)
if st != fuse.OK {
return syscall.Errno(st)
}
return nil
}
// invalidateAttr invalidates file attributes in kernel cache.
//
// complements invalidateBlk and is used to invalidate file size.
// called with zheadMu wlocked.
func (f *BigFile) invalidateAttr() (err error) {
defer xerr.Contextf(&err, "%s: invalidate attr", f.path())
fsconn := gfsconn
st := fsconn.FileNotify(f.Inode(), -1, -1) // metadata only
if st != fuse.OK {
return syscall.Errno(st)
}
return nil
}
// invalidateAll invalidates file attributes and all file data in kernel cache.
//
// complements invalidateAttr and invalidateBlk and is used to completely reset
// kernel file cache on ΔFtail epoch.
// called with zheadMu wlocked.
func (f *BigFile) invalidateAll() (err error) {
defer xerr.Contextf(&err, "%s: invalidate all", f.path())
fsconn := gfsconn
st := fsconn.FileNotify(f.Inode(), 0, -1) // metadata + all data
if st != fuse.OK {
return syscall.Errno(st)
}
return nil
}
// lockRevFile makes sure inode ID of /@<rev>/bigfile/<fid> is known to kernel
// and won't change until unlock.
//
// We need node ID to be know to the kernel, when we need to store data into
// file's kernel cache - if the kernel don't have the node ID for the file in
// question, FileNotifyStoreCache will just fail.
//
// For kernel to know the inode lockRevFile issues regular filesystem lookup
// request which goes to kernel and should go back to wcfs. It is thus not safe
// to use lockRevFile from under FUSE request handler as doing so might deadlock.
//
// Caller must call unlock when inode ID is no longer required to be present.
// It is safe to simultaneously call multiple lockRevFile with the same arguments.
func (root *Root) lockRevFile(rev zodb.Tid, fid zodb.Oid) (_ *BigFile, unlock func(), err error) {
fsconn := gfsconn
frevpath := fmt.Sprintf("@%s/bigfile/%s", rev, fid) // relative to fs root for now
defer xerr.Contextf(&err, "/: lockRevFile %s", frevpath)
// open through kernel
frevospath := gmntpt + "/" + frevpath // now starting from OS /
f, err := os.Open(frevospath)
if err != nil {
return nil, nil, err
}
xfrev := fsconn.LookupNode(root.Inode(), frevpath)
// must be !nil as open succeeded
return xfrev.Node().(*BigFile), func() { f.Close() }, nil
}
// -------- 7) FUSE read(#blk) -------- // -------- 7) FUSE read(#blk) --------
// /(head|<rev>)/bigfile/<bigfileX> -> Read serves reading bigfile data. // /(head|<rev>)/bigfile/<bigfileX> -> Read serves reading bigfile data.
func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) { func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
f.head.zheadMu.RLock() // TODO +fctx to cancel
defer f.head.zheadMu.RUnlock()
// cap read request to file size // cap read request to file size
end, ok := overflow.Add64(off, int64(len(dest))) end, ok := overflow.Add64(off, int64(len(dest)))
if !ok { if !ok {
...@@ -361,6 +868,7 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context ...@@ -361,6 +868,7 @@ func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context
// see "7) when we receive a FUSE read(#blk) request ..." in overview. // see "7) when we receive a FUSE read(#blk) request ..." in overview.
// //
// len(dest) == blksize. // len(dest) == blksize.
// called with head.zheadMu rlocked.
func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err error) { func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err error) {
defer xerr.Contextf(&err, "%s: readblk #%d", f.path(), blk) defer xerr.Contextf(&err, "%s: readblk #%d", f.path(), blk)
...@@ -390,7 +898,15 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro ...@@ -390,7 +898,15 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
} }
// noone was loading - we became responsible to load this block // noone was loading - we became responsible to load this block
blkdata, _, _, _, _, err := f.zfile.LoadBlk(ctx, blk) blkdata, treepath, blkcov, zblk, _, err := f.zfile.LoadBlk(ctx, blk)
// head/ - update δFtail
if f.head.rev == 0 && err == nil {
// update δFtail index
// see "3) for */head/data the following invariant is maintained..."
δFtail := f.head.bfdir.δFtail
δFtail.Track(f.zfile, blk, treepath, blkcov, zblk)
}
loading.blkdata = blkdata loading.blkdata = blkdata
loading.err = err loading.err = err
...@@ -433,8 +949,47 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro ...@@ -433,8 +949,47 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
// uploadBlk complements readBlk: it uploads loaded blkdata into OS cache. // uploadBlk complements readBlk: it uploads loaded blkdata into OS cache.
func (f *BigFile) uploadBlk(blk int64, loading *blkLoadState) { func (f *BigFile) uploadBlk(blk int64, loading *blkLoadState) {
head := f.head
// rlock zheadMu and make sure zwatcher is not asking us to pause.
// if it does - wait for a safer time not to deadlock.
// see notes.txt -> "Kernel locks page on read/cache store/..." for details.
retry:
for {
head.zheadMu.RLock()
// help zwatcher if it asks us to pause uploadings, so it can
// take zheadMu wlocked without deadlocks.
if head.pauseOSCacheUpload {
ready := head.continueOSCacheUpload
head.zheadMu.RUnlock()
<-ready
continue retry
}
break
}
// zheadMu rlocked.
// zwatcher is not currently trying to pause OS cache uploads.
// check if this block was already invalidated by zwatcher.
// if so don't upload the block into OS cache.
f.loadMu.Lock()
loading_ := f.loading[blk]
f.loadMu.Unlock()
if loading != loading_ {
head.zheadMu.RUnlock()
return
}
oid := f.zfile.POid() oid := f.zfile.POid()
// signal to zwatcher not to run while we are performing the upload.
// upload with released zheadMu so that zwatcher can lock it even if to
// check inflightOSCacheUploads status.
atomic.AddInt32(&head.inflightOSCacheUploads, +1)
head.zheadMu.RUnlock()
st := gfsconn.FileNotifyStoreCache(f.Inode(), blk*f.blksize, loading.blkdata) st := gfsconn.FileNotifyStoreCache(f.Inode(), blk*f.blksize, loading.blkdata)
f.loadMu.Lock() f.loadMu.Lock()
...@@ -444,6 +999,9 @@ func (f *BigFile) uploadBlk(blk int64, loading *blkLoadState) { ...@@ -444,6 +999,9 @@ func (f *BigFile) uploadBlk(blk int64, loading *blkLoadState) {
} }
f.loadMu.Unlock() f.loadMu.Unlock()
// signal to zwatcher that we are done and it can continue.
atomic.AddInt32(&head.inflightOSCacheUploads, -1)
if bug { if bug {
panicf("BUG: bigfile %s: blk %d: f.loading mutated while uploading data to pagecache", oid, blk) panicf("BUG: bigfile %s: blk %d: f.loading mutated while uploading data to pagecache", oid, blk)
} }
...@@ -481,6 +1039,9 @@ func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context) ...@@ -481,6 +1039,9 @@ func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context)
return nil, eINVALf("not oid") return nil, eINVALf("not oid")
} }
bfdir.head.zheadMu.RLock() // TODO +fctx -> cancel
defer bfdir.head.zheadMu.RUnlock()
defer func() { defer func() {
if f != nil { if f != nil {
f.getattr(out) f.getattr(out)
...@@ -588,6 +1149,7 @@ func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) { ...@@ -588,6 +1149,7 @@ func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) {
fsNode: newFSNode(&fsOptions{Sticky: false}), // TODO + BigFileDir.OnForget() fsNode: newFSNode(&fsOptions{Sticky: false}), // TODO + BigFileDir.OnForget()
head: revDir, head: revDir,
fileTab: make(map[zodb.Oid]*BigFile), fileTab: make(map[zodb.Oid]*BigFile),
δFtail: nil, // δFtail not needed/used for @revX/
} }
revDir.bfdir = bfdir revDir.bfdir = bfdir
...@@ -605,6 +1167,8 @@ func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) { ...@@ -605,6 +1167,8 @@ func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) {
// bigfopen opens BigFile corresponding to oid on head.zconn. // bigfopen opens BigFile corresponding to oid on head.zconn.
// //
// A ZBigFile corresponding to oid is activated and statted. // A ZBigFile corresponding to oid is activated and statted.
//
// head.zheadMu must be locked.
func (head *Head) bigfopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err error) { func (head *Head) bigfopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err error) {
zconn := head.zconn zconn := head.zconn
defer xerr.Contextf(&err, "bigfopen %s @%s", oid, zconn.At()) defer xerr.Contextf(&err, "bigfopen %s @%s", oid, zconn.At())
...@@ -642,7 +1206,7 @@ func (head *Head) bigfopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err e ...@@ -642,7 +1206,7 @@ func (head *Head) bigfopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err e
revApprox := zfile.PSerial() revApprox := zfile.PSerial()
zfile.PDeactivate() zfile.PDeactivate()
size, _, _, err := zfile.Size(ctx) size, sizePath, blkCov, err := zfile.Size(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -657,6 +1221,12 @@ func (head *Head) bigfopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err e ...@@ -657,6 +1221,12 @@ func (head *Head) bigfopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err e
loading: make(map[int64]*blkLoadState), loading: make(map[int64]*blkLoadState),
} }
// only head/ needs δFtail.
if head.rev == 0 {
// see "3) for */head/data the following invariant is maintained..."
head.bfdir.δFtail.Track(f.zfile, -1, sizePath, blkCov, nil)
}
return f, nil return f, nil
} }
...@@ -665,6 +1235,9 @@ func (head *Head) bigfopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err e ...@@ -665,6 +1235,9 @@ func (head *Head) bigfopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err e
// /(head|<rev>)/at -> readAt serves read. // /(head|<rev>)/at -> readAt serves read.
func (h *Head) readAt(fctx *fuse.Context) ([]byte, error) { func (h *Head) readAt(fctx *fuse.Context) ([]byte, error) {
h.zheadMu.RLock() // TODO +fctx -> cancel
defer h.zheadMu.RUnlock()
return []byte(h.zconn.At().String()), nil return []byte(h.zconn.At().String()), nil
} }
...@@ -672,7 +1245,9 @@ func (h *Head) readAt(fctx *fuse.Context) ([]byte, error) { ...@@ -672,7 +1245,9 @@ func (h *Head) readAt(fctx *fuse.Context) ([]byte, error) {
func (head *Head) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fuse.Status { func (head *Head) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fuse.Status {
at := head.rev at := head.rev
if at == 0 { if at == 0 {
head.zheadMu.RLock() // TODO +fctx -> cancel
at = head.zconn.At() at = head.zconn.At()
head.zheadMu.RUnlock()
} }
t := at.Time().Time t := at.Time().Time
...@@ -683,6 +1258,9 @@ func (head *Head) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fus ...@@ -683,6 +1258,9 @@ func (head *Head) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fus
// /(head|<rev>)/bigfile/<bigfileX> -> Getattr serves stat. // /(head|<rev>)/bigfile/<bigfileX> -> Getattr serves stat.
func (f *BigFile) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fuse.Status { func (f *BigFile) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fuse.Status {
f.head.zheadMu.RLock() // TODO +fctx -> cancel
defer f.head.zheadMu.RUnlock()
f.getattr(out) f.getattr(out)
return fuse.OK return fuse.OK
} }
...@@ -700,14 +1278,53 @@ func (f *BigFile) getattr(out *fuse.Attr) { ...@@ -700,14 +1278,53 @@ func (f *BigFile) getattr(out *fuse.Attr) {
// FIXME gfsconn is tmp workaround for lack of way to retrieve FileSystemConnector from nodefs.Inode // FIXME groot/gfsconn is tmp workaround for lack of way to retrieve FileSystemConnector from nodefs.Inode
// TODO: // TODO:
// - Inode += .Mount() -> nodefs.Mount // - Inode += .Mount() -> nodefs.Mount
// - Mount: // - Mount:
// .Root() -> root Inode of the fs // .Root() -> root Inode of the fs
// .Connector() -> FileSystemConnector through which fs is mounted // .Connector() -> FileSystemConnector through which fs is mounted
var groot *Root
var gfsconn *nodefs.FileSystemConnector var gfsconn *nodefs.FileSystemConnector
// root of the filesystem is mounted here.
//
// we need to talk to kernel and lookup @<rev>/bigfile/<fid> before uploading
// data to kernel cache there. Referencing root of the filesystem via path is
// vulnerable to bugs wrt e.g. `mount --move` and/or mounting something else
// over wcfs. However keeping opened root fd will prevent wcfs to be unmounted,
// so we still have to reference the root via path.
var gmntpt string
// debugging (protected by zhead.W)
var gdebug = struct {
// .wcfs/zhead opens
// protected by groot.head.zheadMu
zheadSockTab map[*FileSock]struct{}
}{}
func init() {
gdebug.zheadSockTab = make(map[*FileSock]struct{})
}
// _wcfs_Zhead serves .wcfs/zhead opens.
type _wcfs_Zhead struct {
fsNode
}
func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
// TODO(?) check flags
sk := NewFileSock()
sk.CloseRead()
groot.head.zheadMu.Lock() // TODO +fctx -> cancel
defer groot.head.zheadMu.Unlock()
// TODO del zheadSockTab[sk] on sk.File.Release (= client drops opened handle)
gdebug.zheadSockTab[sk] = struct{}{}
return sk.File(), fuse.OK
}
// TODO -> enable/disable fuse debugging dynamically (by write to .wcfs/debug ?) // TODO -> enable/disable fuse debugging dynamically (by write to .wcfs/debug ?)
func main() { func main() {
...@@ -760,12 +1377,21 @@ func _main() (err error) { ...@@ -760,12 +1377,21 @@ func _main() (err error) {
} }
defer xclose(zstor) defer xclose(zstor)
zwatchq := make(chan zodb.Event)
at0 := zstor.AddWatch(zwatchq)
defer zstor.DelWatch(zwatchq)
// TODO consider using zodbCacheControl for all connections // TODO consider using zodbCacheControl for all connections
// ( in addition to zhead, historic connections - that are used to access @rev - // ( in addition to zhead, historic connections - that are used to access @rev -
// also need to traverse BigFile.blktab btree ) // also need to traverse BigFile.blktab btree )
zdb := zodb.NewDB(zstor, &zodb.DBOptions{}) zdb := zodb.NewDB(zstor, &zodb.DBOptions{})
defer xclose(zdb) defer xclose(zdb)
zhead, err := xzodb.ZOpen(ctx, zdb, &zodb.ConnOptions{ zhead, err := xzodb.ZOpen(ctx, zdb, &zodb.ConnOptions{
At: at0,
// preserve zhead.cache across several transactions.
// see "ZODB cache control"
NoPool: true,
}) })
if err != nil { if err != nil {
return err return err
...@@ -785,6 +1411,7 @@ func _main() (err error) { ...@@ -785,6 +1411,7 @@ func _main() (err error) {
fsNode: newFSNode(fSticky), fsNode: newFSNode(fSticky),
head: head, head: head,
fileTab: make(map[zodb.Oid]*BigFile), fileTab: make(map[zodb.Oid]*BigFile),
δFtail: zdata.NewΔFtail(zhead.At(), zdb),
} }
head.bfdir = bfdir head.bfdir = bfdir
...@@ -800,6 +1427,17 @@ func _main() (err error) { ...@@ -800,6 +1427,17 @@ func _main() (err error) {
FsName: zurl, FsName: zurl,
Name: "wcfs", Name: "wcfs",
// We retrieve kernel cache in ZBlk.blksize chunks, which are 2MB in size.
// XXX currently go-fuse caps MaxWrite to 128KB.
// TODO -> teach go-fuse to handle Init.MaxPages (Linux 4.20+).
MaxWrite: 2*1024*1024,
// TODO(?) tune MaxReadAhead? MaxBackground?
// OS cache that we populate with bigfile data is precious;
// we explicitly propagate ZODB invalidations into file invalidations.
ExplicitDataCacheControl: true,
DisableXAttrs: true, // we don't use DisableXAttrs: true, // we don't use
Debug: *debug, Debug: *debug,
} }
...@@ -808,15 +1446,27 @@ func _main() (err error) { ...@@ -808,15 +1446,27 @@ func _main() (err error) {
if err != nil { if err != nil {
return err return err
} }
gfsconn = fsconn // FIXME temp workaround (see ^^^) groot = root // FIXME temp workaround (see ^^^)
gfsconn = fsconn // FIXME ----//----
gmntpt = mntpt
// we require proper pagecache control (added to Linux 2.6.36 in 2010) // we require proper pagecache control (added to Linux 2.6.36 in 2010)
kinit := fssrv.KernelSettings() kinit := fssrv.KernelSettings()
kfuse := fmt.Sprintf("kernel FUSE (API %d.%d)", kinit.Major, kinit.Minor) kfuse := fmt.Sprintf("kernel FUSE (API %d.%d)", kinit.Major, kinit.Minor)
supports := kinit.SupportsNotify supports := kinit.SupportsNotify
if !supports(fuse.NOTIFY_STORE_CACHE) { if !(supports(fuse.NOTIFY_STORE_CACHE) && supports(fuse.NOTIFY_RETRIEVE_CACHE)) {
return fmt.Errorf("%s does not support pagecache control", kfuse) return fmt.Errorf("%s does not support pagecache control", kfuse)
} }
// make a bold warning if kernel does not support explicit cache invalidation
// (patch is in Linux 5.2+; see notes.txt -> "Notes on OS pagecache control")
if kinit.Flags & fuse.CAP_EXPLICIT_INVAL_DATA == 0 {
w1 := fmt.Sprintf("%s does not support explicit data cache invalidation", kfuse)
w2 := "-> performance will be AWFUL."
w3 := "-> you need kernel which includes git.kernel.org/linus/ad2ba64dd489."
w4 := "-> (Linux 5.2+, or nxd-fuse-dkms package installed from navytux.spb.ru/pkg)"
log.Error(w1); log.Error(w2); log.Error(w3); log.Error(w4)
fmt.Fprintf(os.Stderr, "W: wcfs: %s\nW: wcfs: %s\nW: wcfs: %s\nW: wcfs: %s\n", w1, w2, w3, w4)
}
// add entries to / // add entries to /
mkdir(root, "head", head) mkdir(root, "head", head)
...@@ -828,6 +1478,17 @@ func _main() (err error) { ...@@ -828,6 +1478,17 @@ func _main() (err error) {
mkdir(root, ".wcfs", &_wcfs) mkdir(root, ".wcfs", &_wcfs)
mkfile(&_wcfs, "zurl", NewStaticFile([]byte(zurl))) mkfile(&_wcfs, "zurl", NewStaticFile([]byte(zurl)))
// .wcfs/zhead - special file channel that sends zhead.at.
//
// If a user opens it, it will start to get tids of through which
// zhead.at was, starting from the time when .wcfs/zhead was opened.
// There can be multiple openers. Once opened, the file must be read,
// as wcfs blocks waiting for data to be read when processing
// invalidations.
mkfile(&_wcfs, "zhead", &_wcfs_Zhead{
fsNode: newFSNode(fSticky),
})
// TODO handle autoexit // TODO handle autoexit
// (exit when kernel forgets all our inodes - wcfs.py keeps .wcfs/zurl // (exit when kernel forgets all our inodes - wcfs.py keeps .wcfs/zurl
// opened, so when all inodes has been forgotten - we know all wcfs.py clients exited) // opened, so when all inodes has been forgotten - we know all wcfs.py clients exited)
...@@ -852,6 +1513,15 @@ func _main() (err error) { ...@@ -852,6 +1513,15 @@ func _main() (err error) {
} }
// filesystem server is serving requests. // filesystem server is serving requests.
// run zwatcher and wait for it to complete.
// zwatcher completes either normally - due to filesystem unmount, or fails.
// if zwatcher fails - switch filesystem to return EIO instead of stale data.
err = root.zwatcher(serveCtx, zwatchq)
if errors.Cause(err) != context.Canceled {
log.Error(err)
log.Errorf("zwatcher failed -> switching filesystem to EIO mode (TODO)")
// TODO: switch fs to EIO mode
}
// wait for unmount // wait for unmount
// NOTE the kernel does not send FORGETs on unmount - but we don't need // NOTE the kernel does not send FORGETs on unmount - but we don't need
......
...@@ -40,7 +40,7 @@ from errno import EINVAL, ENOTCONN ...@@ -40,7 +40,7 @@ from errno import EINVAL, ENOTCONN
from resource import setrlimit, getrlimit, RLIMIT_MEMLOCK from resource import setrlimit, getrlimit, RLIMIT_MEMLOCK
from golang import go, chan, select, func, defer, b from golang import go, chan, select, func, defer, b
from golang import context, time from golang import context, time
from zodbtools.util import ashex as h from zodbtools.util import ashex as h, fromhex
import pytest; xfail = pytest.mark.xfail import pytest; xfail = pytest.mark.xfail
from pytest import raises, fail from pytest import raises, fail
from wendelin.wcfs.internal import io, mm from wendelin.wcfs.internal import io, mm
...@@ -299,6 +299,7 @@ def timeout(parent=context.background()): # -> ctx ...@@ -299,6 +299,7 @@ def timeout(parent=context.background()): # -> ctx
# DF represents a change in files space. # DF represents a change in files space.
# it corresponds to ΔF in wcfs.go .
class DF: class DF:
# .rev tid # .rev tid
# .byfile {} ZBigFile -> DFile # .byfile {} ZBigFile -> DFile
...@@ -307,6 +308,7 @@ class DF: ...@@ -307,6 +308,7 @@ class DF:
dF.byfile = {} dF.byfile = {}
# DFile represents a change to one file. # DFile represents a change to one file.
# it is similar to ΔFile in wcfs.go .
class DFile: class DFile:
# .rev tid # .rev tid
# .ddata {} blk -> data # .ddata {} blk -> data
...@@ -415,6 +417,10 @@ class tDB(tWCFS): ...@@ -415,6 +417,10 @@ class tDB(tWCFS):
t.tail = t.root._p_jar.db().storage.lastTransaction() t.tail = t.root._p_jar.db().storage.lastTransaction()
t.dFtail = [] # of DF; head = dFtail[-1].rev t.dFtail = [] # of DF; head = dFtail[-1].rev
# fh(.wcfs/zhead) + history of zhead read from there
t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead")
t._wc_zheadv = []
# tracked opened tFiles # tracked opened tFiles
t._files = set() t._files = set()
...@@ -443,6 +449,7 @@ class tDB(tWCFS): ...@@ -443,6 +449,7 @@ class tDB(tWCFS):
for tf in t._files.copy(): for tf in t._files.copy():
tf.close() tf.close()
assert len(t._files) == 0 assert len(t._files) == 0
t._wc_zheadfh.close()
# open opens wcfs file corresponding to zf@at and starts to track it. # open opens wcfs file corresponding to zf@at and starts to track it.
# see returned tFile for details. # see returned tFile for details.
...@@ -521,15 +528,18 @@ class tDB(tWCFS): ...@@ -521,15 +528,18 @@ class tDB(tWCFS):
# _wcsync makes sure wcfs is synchronized to latest committed transaction. # _wcsync makes sure wcfs is synchronized to latest committed transaction.
def _wcsync(t): def _wcsync(t):
# XXX stub: unmount/remount + close/reopen files until wcfs supports invalidations while len(t._wc_zheadv) < len(t.dFtail):
files = t._files.copy() l = t._wc_zheadfh.readline()
for tf in files: #print('> zhead read: %r' % l)
tf.close() l = l.rstrip('\n')
tWCFS.close(t) wchead = tAt(t, fromhex(l))
tWCFS.__init__(t) i = len(t._wc_zheadv)
for tf in files: if wchead != t.dFtail[i].rev:
tf.__init__(t, tf.zf, tf.at) raise RuntimeError("wcsync #%d: wczhead (%s) != zhead (%s)" % (i, wchead, t.dFtail[i].rev))
assert len(t._files) == len(files) t._wc_zheadv.append(wchead)
# head/at = last txn of whole db
assert t.wc._read("head/at") == h(t.head)
# tFile provides testing environment for one bigfile opened on wcfs. # tFile provides testing environment for one bigfile opened on wcfs.
...@@ -763,7 +773,7 @@ def _blkDataAt(t, zf, blk, at): # -> (data, rev) ...@@ -763,7 +773,7 @@ def _blkDataAt(t, zf, blk, at): # -> (data, rev)
# ---- actual tests to access data ---- # ---- actual tests to access data ----
# exercise wcfs functionality # exercise wcfs functionality
# plain data access. # plain data access + wcfs handling of ZODB invalidations.
@func @func
def test_wcfs_basic(): def test_wcfs_basic():
t = tDB(); zf = t.zfile t = tDB(); zf = t.zfile
...@@ -783,20 +793,20 @@ def test_wcfs_basic(): ...@@ -783,20 +793,20 @@ def test_wcfs_basic():
at1 = t.commit(zf, {2:'c1'}) at1 = t.commit(zf, {2:'c1'})
f.assertCache([0,0,0]) # initially not cached f.assertCache([0,0,0]) # initially not cached
f.assertData (['','','c1']) # TODO + mtime=t.head f.assertData (['','','c1'], mtime=t.head)
# >>> (@at2) commit again -> we can see both latest and snapshotted states # >>> (@at2) commit again -> we can see both latest and snapshotted states
# NOTE blocks e(4) and f(5) will be accessed only in the end # NOTE blocks e(4) and f(5) will be accessed only in the end
at2 = t.commit(zf, {2:'c2', 3:'d2', 5:'f2'}) at2 = t.commit(zf, {2:'c2', 3:'d2', 5:'f2'})
# f @head # f @head
#f.assertCache([1,1,0,0,0,0]) TODO enable after wcfs supports invalidations f.assertCache([1,1,0,0,0,0])
f.assertData (['','', 'c2', 'd2', 'x','x']) # TODO + mtime=t.head f.assertData (['','', 'c2', 'd2', 'x','x'], mtime=t.head)
f.assertCache([1,1,1,1,0,0]) f.assertCache([1,1,1,1,0,0])
# f @at1 # f @at1
f1 = t.open(zf, at=at1) f1 = t.open(zf, at=at1)
#f1.assertCache([0,0,1]) TODO enable after wcfs supports invalidations f1.assertCache([0,0,1])
f1.assertData (['','','c1']) # TODO + mtime=at1 f1.assertData (['','','c1']) # TODO + mtime=at1
...@@ -804,22 +814,59 @@ def test_wcfs_basic(): ...@@ -804,22 +814,59 @@ def test_wcfs_basic():
f2 = t.open(zf, at=at2) f2 = t.open(zf, at=at2)
at3 = t.commit(zf, {0:'a3', 2:'c3', 5:'f3'}) at3 = t.commit(zf, {0:'a3', 2:'c3', 5:'f3'})
#f.assertCache([0,1,0,1,0,0]) TODO enable after wcfs supports invalidations f.assertCache([0,1,0,1,0,0])
# f @head is opened again -> cache must not be lost
f_ = t.open(zf)
f_.assertCache([0,1,0,1,0,0])
f_.close()
f.assertCache([0,1,0,1,0,0])
# f @head # f @head
#f.assertCache([0,1,0,1,0,0]) TODO enable after wcfs supports invalidations f.assertCache([0,1,0,1,0,0])
f.assertData (['a3','','c3','d2','x','x']) # TODO + mtime=t.head f.assertData (['a3','','c3','d2','x','x'], mtime=t.head)
# f @at2 # f @at2
# NOTE f(2) is accessed but via @at/ not head/ ; f(2) in head/zf remains unaccessed # NOTE f(2) is accessed but via @at/ not head/ ; f(2) in head/zf remains unaccessed
#f2.assertCache([0,0,1,0,0,0]) TODO enable after wcfs supports invalidations f2.assertCache([0,0,1,0,0,0])
f2.assertData (['','','c2','d2','','f2']) # TODO mtime=at2 f2.assertData (['','','c2','d2','','f2']) # TODO mtime=at2
# f @at1 # f @at1
#f1.assertCache([1,1,1]) TODO enable after wcfs supports invalidations f1.assertCache([1,1,1])
f1.assertData (['','','c1']) # TODO mtime=at1 f1.assertData (['','','c1']) # TODO mtime=at1
# >>> f close / open again -> cache must not be lost
# XXX a bit flaky since OS can evict whole f cache under pressure
f.assertCache([1,1,1,1,0,0])
f.close()
f = t.open(zf)
if f.cached() != [1,1,1,1,0,0]:
assert sum(f.cached()) > 4*1/2 # > 50%
# verify all blocks
f.assertData(['a3','','c3','d2','','f3'])
f.assertCache([1,1,1,1,1,1])
# verify how wcfs processes ZODB invalidations when hole becomes a block with data.
@func
def test_wcfs_basic_hole2zblk():
t = tDB(); zf = t.zfile
defer(t.close)
f = t.open(zf)
t.commit(zf, {2:'c1'}) # b & a are holes
f.assertCache([0,0,0])
f.assertData(['','','c1'])
t.commit(zf, {1:'b2'}) # hole -> zblk
f.assertCache([1,0,1])
f.assertData(['','b2','c1'])
# TODO ZBlk copied from blk1 -> blk2 ; for the same file and for file1 -> file2
# TODO ZBlk moved from blk1 -> blk2 ; for the same file and for file1 -> file2
# verify that read after file size returns (0, ok) # verify that read after file size returns (0, ok)
# (the same behaviour as on e.g. ext4 and as requested by posix) # (the same behaviour as on e.g. ext4 and as requested by posix)
@func @func
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment