Commit 1ffd04a5 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 0ec8ce51
- doc: notes on how things are organized in wendelin.core 2
wcfs:
- SIGSEGV is used only to track writes
# -*- coding: utf-8 -*-
# Copyright (C) 2018-2020 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Module wcfs.py provides python gateway for spawning and interoperating with wcfs server.
Join(zurl) joins wcfs server for ZODB at zurl and returns WCFS object that
represents filesystem-level connection to joined wcfs server. If wcfs server
for zurl is not yet running, it will be automatically started if join is given
`autostart=True` option.
The rest of wcfs.py merely wraps C++ wcfs client package:
- `WCFS` represents filesystem-level connection to wcfs server.
- `Conn` represents logical connection that provides view of data on wcfs
filesystem as of particular database state.
- `FileH` represent isolated file view under Conn.
- `Mapping` represents one memory mapping of FileH.
A path from WCFS to Mapping is as follows:
WCFS.connect(at) -> Conn
Conn.open(foid) -> FileH
FileH.mmap([blk_start +blk_len)) -> Mapping
Classes in wcfs.py logically mirror classes in ZODB:
wcfs.WCFS <-> ZODB.DB
wcfs.Conn <-> ZODB.Connection
Please see wcfs/client/wcfs.h for more thorough overview and further details.
Environment variables
---------------------
The following environment variables can be used to control wcfs.py client:
$WENDELIN_CORE_WCFS_AUTOSTART
yes join: spawn wcfs server if no one was found and no explicit
autostart=X was given (default)
no join: don't spawn wcfs server unless explicitly requested via autostart=True
$WENDELIN_CORE_WCFS_OPTIONS
"" join: additional options to pass to wcfs server if one is spawned
"""
from __future__ import print_function, absolute_import
import os, sys, hashlib, tempfile, subprocess, time, stat
import logging as log
from os.path import dirname
from errno import ENOENT, EEXIST
from golang import chan, select, default, func
from golang import sync, context
from golang.gcompat import qq
from persistent import Persistent
from zodbtools.util import ashex as h
from .client._wcfs import \
PyWCFS as _WCFS, \
PyWatchLink as WatchLink, \
PyPinReq as PinReq \
# WCFS represents filesystem-level connection to wcfs server.
#
# Use join to create it.
#
# The primary way to access wcfs is to open logical connection viewing on-wcfs
# data as of particular database state, and use that logical connection to create
# base-layer mappings. See .connect and Conn in C++ API for details.
#
# Raw files on wcfs can be accessed with ._path/._read/._stat/._open .
#
# WCFS logically mirrors ZODB.DB .
class WCFS(_WCFS):
# .mountpoint path to wcfs mountpoint
# ._fwcfs /.wcfs/zurl opened to keep the server from going away (at least cleanly)
# ._njoin this connection was returned for so many joins
# ._proc wcfs process if it was opened by this WCFS | None
pass
# ---- WCFS raw file access (primarily for tests) ----
# _path returns path for object on wcfs.
# - str: wcfs root + obj;
# - Persistent: wcfs root + (head|@<at>)/bigfile/obj
@func(WCFS)
def _path(wc, obj, at=None):
if isinstance(obj, Persistent):
#assert type(obj) is ZBigFile XXX import cycle
objtypestr = type(obj).__module__ + "." + type(obj).__name__
assert objtypestr == "wendelin.bigfile.file_zodb.ZBigFile", objtypestr
head = "head/" if at is None else ("@%s/" % h(at))
obj = "%s/bigfile/%s" % (head, h(obj._p_oid))
at = None
assert isinstance(obj, str)
assert at is None # must not be used with str
return os.path.join(wc.mountpoint, obj)
# _read reads file corresponding to obj on wcfs.
@func(WCFS)
def _read(wc, obj, at=None):
path = wc._path(obj, at=at)
with open(path, 'rb') as f:
return f.read()
# _stat stats file corresponding to obj on wcfs.
@func(WCFS)
def _stat(wc, obj, at=None):
path = wc._path(obj, at=at)
return os.stat(path)
# _open opens file corresponding to obj on wcfs.
@func(WCFS)
def _open(wc, obj, mode='rb', at=None):
path = wc._path(obj, at=at)
return open(path, mode, 0) # unbuffered
# ---- join/run wcfs ----
_wcmu = sync.Mutex()
_wcregistry = {} # mntpt -> WCFS
@func(WCFS)
def __init__(wc, mountpoint, fwcfs, proc):
wc.mountpoint = mountpoint
wc._fwcfs = fwcfs
wc._njoin = 1
wc._proc = proc
# close must be called to release joined connection after it is no longer needed.
@func(WCFS)
def close(wc):
with _wcmu:
wc._njoin -= 1
if wc._njoin == 0:
# NOTE not unmounting wcfs - it either runs as separate service, or
# is spawned on demand with -autoexit.
wc._fwcfs.close()
del _wcregistry[wc.mountpoint]
# _default_autostart returns default autostart setting for join.
#
# Out-of-the-box we want wcfs to be automatically started, to ease developer
# experience when wendelin.core is standalone installed. However in environments
# like SlapOS, it is more preferable to start and monitor wcfs service explicitly.
# SlapOS & co. should thus set $WENDELIN_CORE_WCFS_AUTOSTART=no.
def _default_autostart():
autostart = os.environ.get("WENDELIN_CORE_WCFS_AUTOSTART", "yes")
autostart = autostart.lower()
return {"yes": True, "no": False}[autostart]
# join connects to wcfs server for ZODB @ zurl.
#
# If wcfs for that zurl was already started, join connects to it.
# Otherwise it starts wcfs for zurl if autostart is True.
#
# For the same zurl join returns the same WCFS object.
def join(zurl, autostart=_default_autostart()): # -> WCFS
mntpt = _mntpt_4zurl(zurl)
with _wcmu:
# check if we already have connection to wcfs server from this process
wc = _wcregistry.get(mntpt)
if wc is not None:
wc._njoin += 1
return wc
# no. try opening .wcfs - if we succeed - wcfs is already mounted.
try:
f = open(mntpt + "/.wcfs/zurl")
except IOError as e:
if e.errno != ENOENT:
raise
else:
# already have it
wc = WCFS(mntpt, f, None)
_wcregistry[mntpt] = wc
return wc
if not autostart:
raise RuntimeError("wcfs: join %s: server not started" % zurl)
# start wcfs with telling it to automatically exit when there is no client activity.
# XXX race window if external process starts after ^^^ check
# TODO -> fs-level locking
optv_extra = os.environ.get("WENDELIN_CORE_WCFS_OPTIONS", "").split()
return _start(zurl, "-autoexit", *optv_extra)
# _start starts wcfs server for ZODB @ zurl.
#
# optv can be optionally given to pass flags to wcfs.
# called under _wcmu.
def _start(zurl, *optv): # -> WCFS
mntpt = _mntpt_4zurl(zurl)
log.info("wcfs: starting for %s ...", zurl)
# XXX errctx "wcfs: start"
# spawn wcfs and wait till filesystem-level access to it is ready
wc = WCFS(mntpt, None, None)
wg = sync.WorkGroup(context.background())
fsready = chan(dtype='C.structZ')
def _(ctx):
# XXX errctx "spawn"
argv = [_wcfs_exe()] + list(optv) + [zurl, mntpt]
proc = subprocess.Popen(argv, close_fds=True)
while 1:
ret = proc.poll()
if ret is not None:
raise RuntimeError("exited with %s" % ret)
_, _rx = select(
ctx.done().recv, # 0
fsready.recv, # 1
default, # 2
)
if _ == 0:
proc.terminate()
raise ctx.err()
if _ == 1:
# startup was ok - don't monitor spawned wcfs any longer
wc._proc = proc
return
time.sleep(0.1)
wg.go(_)
def _(ctx):
# XXX errctx "waitmount"
while 1:
try:
f = open("%s/.wcfs/zurl" % mntpt)
except IOError as e:
if e.errno != ENOENT:
raise
else:
dotwcfs = f.read()
if dotwcfs != zurl:
raise RuntimeError(".wcfs/zurl != zurl (%s != %s)" % (qq(dotwcfs), qq(zurl)))
wc._fwcfs = f
fsready.close()
return
_, _rx = select(
ctx.done().recv, # 0
default, # 1
)
if _ == 0:
raise ctx.err()
time.sleep(0.1)
wg.go(_)
wg.wait()
assert mntpt not in _wcregistry
_wcregistry[mntpt] = wc
return wc
# ---- misc ----
# _wcfs_exe returns path to wcfs executable.
def _wcfs_exe():
return '%s/wcfs' % dirname(__file__)
# _mntpt_4zurl returns wcfs should-be mountpoint for ZODB @ zurl.
#
# it also makes sure the mountpoint exists.
def _mntpt_4zurl(zurl):
m = hashlib.sha1()
m.update(zurl)
# mkdir /tmp/wcfs with stiky bit. This way multiple users can create subdirectories inside.
wcfsroot = "%s/wcfs" % (tempfile.gettempdir())
wcfsmode = 0777 | stat.S_ISVTX
if _mkdir_p(wcfsroot):
os.chmod(wcfsroot, wcfsmode)
else:
# migration workaround for the situation when /tmp/wcfs was created by
# code that did not yet set sticky bit.
_ = os.stat(wcfsroot)
if _.st_uid == os.getuid():
if _.st_mode != wcfsmode:
os.chmod(wcfsroot, wcfsmode)
mntpt = "%s/%s" % (wcfsroot, m.hexdigest())
_mkdir_p(mntpt)
return mntpt
# mkdir -p.
def _mkdir_p(path, mode=0777): # -> created(bool)
try:
os.makedirs(path, mode)
except OSError as e:
if e.errno != EEXIST:
raise
return False
return True
# serve starts and runs wcfs server for ZODB @ zurl.
#
# it mounts wcfs at a location that is with 1-1 correspondence with zurl.
# it then waits for wcfs to exit (either due to unmount or an error).
#
# it is an error if wcfs was already started.
#
# optv is list of options to pass to wcfs server.
# if exec_ is True, wcfs is not spawned, but executed into.
#
# serve(zurl, exec_=False).
def serve(zurl, optv, exec_=False):
mntpt = _mntpt_4zurl(zurl)
# try opening .wcfs - it is an error if we can do it.
# XXX -> option to wcfs itself to verify wcfs/something is already mounted?
try:
f = open(mntpt + "/.wcfs/zurl")
except IOError as e:
if e.errno != ENOENT:
raise
else:
f.close()
raise RuntimeError("wcfs: start %s: already started" % zurl)
# seems to be ok to start
# XXX race window if external process starts after ^^^ check
# TODO -> fs-level locking
argv = [_wcfs_exe()] + list(optv) + [zurl, mntpt]
if not exec_:
subprocess.check_call(argv, close_fds=True)
else:
os.execv(argv[0], argv)
# if called as main just -> exec serve()
def _usage(w):
progname = os.path.basename(sys.argv[0])
print("Wcfs serves WCFS filesystem for ZODB at zurl for wendelin.core .\n", file=w)
print("Usage: %s [-h | wcfs.go options] zurl" % progname, file=w)
sys.exit(2)
def main():
argv = sys.argv[1:]
if len(argv) == 0:
_usage(sys.stderr)
if argv[0] == '-h':
os.execv(_wcfs_exe(), [_wcfs_exe(), '-h'])
zurl = argv[-1] # -a -b zurl -> zurl
optv = argv[:-1] # -a -b zurl -> -a -b
serve(zurl, optv, exec_=True)
==============================================
Additional notes to documentation in wcfs.go
==============================================
This file contains notes additional to usage documentation and internal
organization overview in wcfs.go .
Notes on OS pagecache control
=============================
The cache of snapshotted bigfile can be pre-made hot if invalidated region
was already in pagecache of head/bigfile/file:
- we can retrieve a region from pagecache of head/file with FUSE_NOTIFY_RETRIEVE.
- we can store that retrieved data into pagecache region of @<revX>/ with FUSE_NOTIFY_STORE.
- we can invalidate a region from pagecache of head/file with FUSE_NOTIFY_INVAL_INODE.
we have to disable FUSE_AUTO_INVAL_DATA to tell the kernel we are fully
responsible for invalidating pagecache. If we don't, the kernel will be
clearing whole cache of head/file on e.g. its mtime change.
Note: FUSE_AUTO_INVAL_DATA does not fully prevent kernel from automatically
invalidating pagecache - e.g. it will invalidate whole cache on file size changes:
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/fs/fuse/inode.c?id=e0bc833d10#n233
It was hoped that we could workaround it with using writeback mode (see !is_wb
in the link above), but it turned out that in writeback mode the kernel indeed
does not invalidate data cache on file size change, but neither it allows the
filesystem to set the size due to external event (see https://git.kernel.org/linus/8373200b12
"fuse: Trust kernel i_size only"). This prevents us to use writeback workaround
as we cannot even update the file from being empty to have some data.
-> we did the patch for FUSE to have proper flag for filesystem server to tell
the kernel it is fully responsible for invalidating pagecache. The patch is
part of Linux 5.2:
https://git.kernel.org/linus/ad2ba64dd489
Invalidations to wcfs clients are delayed until block access
============================================================
Initially it was planned that wcfs would send invalidation messages to its
clients right after receiving invalidation message from ZODB at transaction
boundary time. That simplifies logic but requires that for a particular file,
wcfs has to send to clients whole range of where the file was changed.
Emitting whole δR right at transaction-boundary time requires to keep whole
ZBigFile.blktab index in RAM. Even though from space point of view it is
somewhat acceptable (~ 0.01% of whole-file data size, i.e. ~ 128MB of index for
~ 1TB of data), it is not good from time overhead point of view - initial open
of a file this way would be potentially slow as full blktab scan - including
Trees _and_ Buckets nodes - would be required.
-> we took the approach where we send invalidation to client about a block
lazily only when the block is actually accessed.
XXX building δFtail lazily along serving fuse reads during scope of one
transaction is not trivial and creates concurrency bottlenecks if simple
locking scheme is used. With the main difficulty being to populate tracking set
of δBtree lazily. However as the first approach we can still build complete
tracking set for a BTree at the time of file open: we need to scan through all
trees but _not_ buckets: this way we'll know oid of all tree nodes: trees _and_
buckets, while avoiding loading buckets makes this approach practical: with
default LOBTree settings (1 bucket = 60·objects, 1 tree = 500·buckets) it will
require ~ 20 trees to cover 1TB of data. And we can scan those trees very
quickly even if doing so serially. For 1PB of data it will require to scan ~
10⁴ trees. If RTT to load 1 object is ~1ms this will become 10 seconds if done
serially. However if we load all those tree objects in parallel it will be
much less. Still the number of trees to scan is linear to the amount of data
and it would be good to address the shortcoming of doing whole file index scan
later.
Changing mmapping while under pagefault is possible
===================================================
We can change a mapping while a page from it is under pagefault:
- the kernel, upon handling pagefault, queues read request to filesystem
server. As of Linux 4.20 this is done _with_ holding client->mm->mmap_sem:
kprobe:fuse_readpages (client->mm->mmap_sem.count: 1)
fuse_readpages+1
read_pages+109
__do_page_cache_readahead+401
filemap_fault+635
__do_fault+31
__handle_mm_fault+3403
handle_mm_fault+220
__do_page_fault+598
page_fault+30
- however the read request is queued to be performed asynchronously -
the kernel does not wait for it in fuse_readpages, because
* git.kernel.org/linus/c1aa96a5,
* git.kernel.org/linus/9cd68455,
* and go-fuse initially negotiating CAP_ASYNC_READ to the kernel.
- the kernel then _releases_ client->mm->mmap_sem and then waits
for to-read pages to become ready:
* https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/mm/filemap.c?id=v4.20-rc3-83-g06e68fed3282#n2411
* https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/mm/filemap.c?id=v4.20-rc3-83-g06e68fed3282#n2457
* https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/mm/filemap.c?id=v4.20-rc3-83-g06e68fed3282#n1301
- the filesystem server upon receiving the read request can manipulate
client's address space. This requires to write-lock client->mm->mmap_sem,
but we can be sure it won't deadlock because the kernel releases it
before waiting (see previous point).
in practice the manipulation is done by another client thread, because
on Linux it is not possible to change mm of another process. However
the main point here is that the manipulation is possible because
there will be no deadlock on client->mm->mmap_sem.
For the reference here is how filesystem server reply looks under trace:
kprobe:fuse_readpages_end
fuse_readpages_end+1
request_end+188
fuse_dev_do_write+1921
fuse_dev_write+78
do_iter_readv_writev+325
do_iter_write+128
vfs_writev+152
do_writev+94
do_syscall_64+85
entry_SYSCALL_64_after_hwframe+68
and a test program that demonstrates that it is possible to change
mmapping while under pagefault to it:
https://lab.nexedi.com/kirr/go-fuse/commit/f822c9db
Starting from Linux 5.1 mmap_sem should be generally released while doing any IO:
https://git.kernel.org/linus/6b4c9f4469
but before that the analysis remains FUSE-specific.
The property that changing mmapping while under pagefault is possible is
verified by wcfs testsuite in `test_wcfs_remmap_on_pin` test.
Client cannot be ptraced while under pagefault
==============================================
We cannot use ptrace to run code on client thread that is under pagefault:
The kernel sends SIGSTOP to interrupt tracee, but the signal will be
processed only when the process returns from kernel space, e.g. here
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/arch/x86/entry/common.c?id=v4.19-rc8-151-g23469de647c4#n160
This way the tracer won't receive obligatory information that tracee
stopped (via wait...) and even though ptrace(ATTACH) succeeds, all other
ptrace commands will fail:
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/kernel/ptrace.c?id=v4.19-rc8-151-g23469de647c4#n1140
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/kernel/ptrace.c?id=v4.19-rc8-151-g23469de647c4#n207
My original idea was to use ptrace to run code in process to change it's
memory mappings, while the triggering process is under pagefault/read
to wcfs, and the above shows it won't work - trying to ptrace the
client from under wcfs will just block forever (the kernel will be
waiting for read operation to finish for ptrace, and read will be first
waiting on ptrace stopping to complete = deadlock)
Kernel locks page on read/cache store/... - we have to be careful not to deadlock
=================================================================================
The kernel, when doing FUSE operations, locks corresponding pages. For example
it locks a page, where it is going to read data into, before issuing FUSE read
request. Correspondingly, on e.g. cache store, the kernel also locks page where
data has to be stored.
It is easy to deadlock if we don't take this locks into account. For example
if we try to upload data to kernel pagecache from under serving read request,
this can deadlock.
Another case that needs to be cared about is interaction between uploadBlk and
zwatcher: zheadMu being RWMutex, does not allow new RLocks to be taken once
Lock request has been issued. Thus the following scenario is possible::
uploadBlk os.Read zwatcher
page.Lock
zheadMu.Rlock
zheadMu.Lock
page.Lock
zheadMu.Rlock
- zwatcher is waiting for uploadBlk to release zheadMu;
- uploadBlk is waiting for os.Read to release page;
- os.Read is waiting for zwatcher to release zheadMu;
- deadlock.
To avoid such deadlocks zwatcher asks OS cache uploaders to pause while it is
running, and retries taking zheadMu.Lock until all uploaders are indeed paused.
digraph {
subgraph {
rank=same;
ordering=in;
wcfs [label="wcfs"]
invProto [label="open/isolation\nprotocol", style=filled fillcolor=grey95]
client [label="client", style=filled fillcolor=grey97]
wcfs -> invProto;
invProto -> client [dir=back]; // XXX = invProto <- client
}
// wcfs -> wcfs_simple;
// wcfs -> Sinvtree;
// wcfs -> δR;
wcfs -> liveCacheControl;
wcfs -> autoexit [color=grey];
wcfs -> wcfsInvProcess;
wcfs -> wcfsRead;
wcfs -> wcfsGC [color=grey];
wcfsInvProcess -> ZODB_go_inv;
wcfsInvProcess -> zconnCacheGet;
wcfsInvProcess -> zobj2file;
wcfsInvProcess -> δFtail;
wcfsInvProcess -> fuseRetrieveCache;
wcfsInvProcess -> _wcfs_zhead;
ZODB_go_inv -> fs1_go_inv;
ZODB_go_inv -> zeo_go_inv;
ZODB_go_inv -> neo_go_inv;
ZODB_go_inv -> zcache_go_inv [style=dashed, color=grey]; // wcfs works without raw cache now
wcfsRead -> blktabGet;
wcfsRead -> δFtail;
wcfsRead -> setupWatch;
wcfsRead -> headWatch;
zobj2file -> zblk2file;
zobj2file -> zbtree2file;
zbtree2file -> δBTree [color=grey];
// wcfs_simple -> Btree_read;
// wcfs_simple -> ZBlk_read;
// wcfs_simple -> autoexit;
client -> wcfsRead;
client -> setupWatch;
client -> clientInvHandle;
// client -> δR;
client -> nowcfs;
// client -> zodburl;
// client -> wcfs_spawn;
clientInvHandle -> headWatch;
headWatch -> fileSock;
_wcfs_zhead -> fileSock;
// Btree_read -> ZODB_read;
// ZBlk_read -> ZODB_read;
// ZODB_read -> ogorek_persref;
// wcfs_simple [label="wcfs no\ninvalidations", style=filled fillcolor=grey95]
// wcfs_spawn [label="spawn wcfs", style=filled fillcolor=lightyellow]
nowcfs [label="!wcfs mode", style=filled fillcolor=grey95]
wcfsInvProcess [label="process\nZODB invalidations", style=filled fillcolor=grey95]
zconnCacheGet [label="zonn.\n.Cache.Get", style=filled fillcolor=lightyellow]
zobj2file [label="Z* → file/[]#blk", style=filled fillcolor=grey95]
zblk2file [label="ZBlk*\n↓\nfile/[]#blk", style=filled fillcolor=lightyellow]
zbtree2file [label="BTree/Bucket\n↓\nfile/[]#blk"]
δBTree [label="δ(BTree)", style=filled fillcolor=grey95]
fuseRetrieveCache [label="FUSE:\nretrieve cache", style=filled fillcolor=lightyellow]
_wcfs_zhead [label=".wcfs/\nzhead", style=filled fillcolor=lightyellow]
wcfsRead [label="read(#blk)", style=filled fillcolor=grey95]
blktabGet [label="blktab.Get(#blk):\nmanually + → ⌈rev(#blk)⌉", style=filled fillcolor=grey95]
δFtail [style=filled fillcolor=lightyellow]
setupWatch [label="watches:\nregister/maint", style=filled fillcolor=grey95]
clientInvHandle [label="process\n#blk invalidations", style=filled fillcolor=grey95]
headWatch [label="#blk ← head/watch", style=filled fillcolor=grey95]
fileSock [label="FileSock", style=filled fillcolor=lightyellow]
ZODB_go_inv [label="ZODB/go\ninvalidations", style=filled fillcolor=grey95]
fs1_go_inv [label="fs1/go\ninvalidations", style=filled fillcolor=lightyellow]
zeo_go_inv [label="zeo/go\ninvalidations", style=filled fillcolor=lightyellow]
neo_go_inv [label="neo/go\ninvalidations"]
zcache_go_inv [label="ZCache/go\n←watchq", color=grey, fontcolor=grey]
// Btree_read [label="BTree read", style=filled fillcolor=lightyellow]
// ZBlk_read [label="ZBigFile / ZBlk* read", style=filled fillcolor=lightyellow]
// ZODB_read [label="ZODB deserialize object", style=filled fillcolor=lightyellow]
// ogorek_persref [label="ogórek:\npersistent references", style=filled fillcolor=lightyellow];
// Sinvtree [label="server: inv. tree"]
// δR [label="δR encoding"]
// zodburl [label="zstor -> zurl", style=filled fillcolor=grey95]
wcfsGC [label="GC\n@rev/"]
liveCacheControl [label="ZODB/go\nLiveCache fix", style=filled fillcolor=grey95]
autoexit [label="autoexit\nif !activity"]
}
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
<!-- Generated by graphviz version 2.43.0 (0)
-->
<!-- Title: %3 Pages: 1 -->
<svg width="1885pt" height="424pt"
viewBox="0.00 0.00 1885.37 424.17" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
<g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 420.17)">
<title>%3</title>
<polygon fill="white" stroke="transparent" points="-4,4 -4,-420.17 1881.37,-420.17 1881.37,4 -4,4"/>
<!-- wcfs -->
<g id="node1" class="node">
<title>wcfs</title>
<ellipse fill="none" stroke="black" cx="1063.23" cy="-389.3" rx="31.7" ry="18"/>
<text text-anchor="middle" x="1063.23" y="-385.6" font-family="Times,serif" font-size="14.00">wcfs</text>
</g>
<!-- invProto -->
<g id="node2" class="node">
<title>invProto</title>
<ellipse fill="#f2f2f2" stroke="black" cx="1298.23" cy="-389.3" rx="82.05" ry="26.74"/>
<text text-anchor="middle" x="1298.23" y="-393.1" font-family="Times,serif" font-size="14.00">open/isolation</text>
<text text-anchor="middle" x="1298.23" y="-378.1" font-family="Times,serif" font-size="14.00">protocol</text>
</g>
<!-- wcfs&#45;&gt;invProto -->
<g id="edge1" class="edge">
<title>wcfs&#45;&gt;invProto</title>
<path fill="none" stroke="black" d="M1095.35,-389.3C1132.06,-389.3 1168.76,-389.3 1205.46,-389.3"/>
<polygon fill="black" stroke="black" points="1205.81,-392.8 1215.81,-389.3 1205.81,-385.8 1205.81,-392.8"/>
</g>
<!-- liveCacheControl -->
<g id="node4" class="node">
<title>liveCacheControl</title>
<ellipse fill="#f2f2f2" stroke="black" cx="625.23" cy="-299.56" rx="79.81" ry="26.74"/>
<text text-anchor="middle" x="625.23" y="-303.36" font-family="Times,serif" font-size="14.00">ZODB/go</text>
<text text-anchor="middle" x="625.23" y="-288.36" font-family="Times,serif" font-size="14.00">LiveCache fix</text>
</g>
<!-- wcfs&#45;&gt;liveCacheControl -->
<g id="edge3" class="edge">
<title>wcfs&#45;&gt;liveCacheControl</title>
<path fill="none" stroke="black" d="M1032.66,-384.01C971.69,-375.21 830.78,-353.62 714.23,-326.43 707.22,-324.8 699.95,-322.95 692.72,-321"/>
<polygon fill="black" stroke="black" points="693.49,-317.59 682.92,-318.31 691.63,-324.33 693.49,-317.59"/>
</g>
<!-- autoexit -->
<g id="node5" class="node">
<title>autoexit</title>
<ellipse fill="none" stroke="black" cx="787.23" cy="-299.56" rx="63.78" ry="26.74"/>
<text text-anchor="middle" x="787.23" y="-303.36" font-family="Times,serif" font-size="14.00">autoexit</text>
<text text-anchor="middle" x="787.23" y="-288.36" font-family="Times,serif" font-size="14.00">if !activity</text>
</g>
<!-- wcfs&#45;&gt;autoexit -->
<g id="edge4" class="edge">
<title>wcfs&#45;&gt;autoexit</title>
<path fill="none" stroke="grey" d="M1035.37,-380.34C995.96,-368.96 921.76,-347.14 859.23,-326.43 854.31,-324.81 849.22,-323.07 844.12,-321.31"/>
<polygon fill="grey" stroke="grey" points="844.95,-317.89 834.36,-317.88 842.64,-324.49 844.95,-317.89"/>
</g>
<!-- wcfsInvProcess -->
<g id="node6" class="node">
<title>wcfsInvProcess</title>
<ellipse fill="#f2f2f2" stroke="black" cx="978.23" cy="-299.56" rx="109.7" ry="26.74"/>
<text text-anchor="middle" x="978.23" y="-303.36" font-family="Times,serif" font-size="14.00">process</text>
<text text-anchor="middle" x="978.23" y="-288.36" font-family="Times,serif" font-size="14.00">ZODB invalidations</text>
</g>
<!-- wcfs&#45;&gt;wcfsInvProcess -->
<g id="edge5" class="edge">
<title>wcfs&#45;&gt;wcfsInvProcess</title>
<path fill="none" stroke="black" d="M1048.42,-373.02C1037.82,-362.08 1023.16,-346.95 1009.98,-333.34"/>
<polygon fill="black" stroke="black" points="1012.31,-330.72 1002.84,-325.97 1007.28,-335.59 1012.31,-330.72"/>
</g>
<!-- wcfsRead -->
<g id="node7" class="node">
<title>wcfsRead</title>
<ellipse fill="#f2f2f2" stroke="black" cx="1334.23" cy="-299.56" rx="60.39" ry="18"/>
<text text-anchor="middle" x="1334.23" y="-295.86" font-family="Times,serif" font-size="14.00">read(#blk)</text>
</g>
<!-- wcfs&#45;&gt;wcfsRead -->
<g id="edge6" class="edge">
<title>wcfs&#45;&gt;wcfsRead</title>
<path fill="none" stroke="black" d="M1090.2,-379.57C1135.63,-364.86 1228.05,-334.94 1285.2,-316.44"/>
<polygon fill="black" stroke="black" points="1286.32,-319.75 1294.75,-313.34 1284.16,-313.09 1286.32,-319.75"/>
</g>
<!-- wcfsGC -->
<g id="node8" class="node">
<title>wcfsGC</title>
<ellipse fill="none" stroke="black" cx="1147.23" cy="-299.56" rx="41.94" ry="26.74"/>
<text text-anchor="middle" x="1147.23" y="-303.36" font-family="Times,serif" font-size="14.00">GC</text>
<text text-anchor="middle" x="1147.23" y="-288.36" font-family="Times,serif" font-size="14.00">@rev/</text>
</g>
<!-- wcfs&#45;&gt;wcfsGC -->
<g id="edge7" class="edge">
<title>wcfs&#45;&gt;wcfsGC</title>
<path fill="none" stroke="grey" d="M1077.86,-373.02C1089.14,-361.23 1105.1,-344.56 1118.87,-330.18"/>
<polygon fill="grey" stroke="grey" points="1121.44,-332.55 1125.83,-322.91 1116.39,-327.71 1121.44,-332.55"/>
</g>
<!-- client -->
<g id="node3" class="node">
<title>client</title>
<ellipse fill="#f7f7f7" stroke="black" cx="1516.23" cy="-389.3" rx="36.29" ry="18"/>
<text text-anchor="middle" x="1516.23" y="-385.6" font-family="Times,serif" font-size="14.00">client</text>
</g>
<!-- invProto&#45;&gt;client -->
<g id="edge2" class="edge">
<title>invProto&#45;&gt;client</title>
<path fill="none" stroke="black" d="M1390.48,-389.3C1420.19,-389.3 1449.9,-389.3 1479.62,-389.3"/>
<polygon fill="black" stroke="black" points="1390.4,-385.8 1380.4,-389.3 1390.4,-392.8 1390.4,-385.8"/>
</g>
<!-- client&#45;&gt;wcfsRead -->
<g id="edge25" class="edge">
<title>client&#45;&gt;wcfsRead</title>
<path fill="none" stroke="black" d="M1490.92,-376.1C1460.61,-361.49 1409.49,-336.85 1373.92,-319.7"/>
<polygon fill="black" stroke="black" points="1375.31,-316.48 1364.78,-315.29 1372.27,-322.79 1375.31,-316.48"/>
</g>
<!-- setupWatch -->
<g id="node20" class="node">
<title>setupWatch</title>
<ellipse fill="#f2f2f2" stroke="black" cx="1793.23" cy="-209.82" rx="84.29" ry="26.74"/>
<text text-anchor="middle" x="1793.23" y="-213.62" font-family="Times,serif" font-size="14.00">watches:</text>
<text text-anchor="middle" x="1793.23" y="-198.62" font-family="Times,serif" font-size="14.00">register/maint</text>
</g>
<!-- client&#45;&gt;setupWatch -->
<g id="edge26" class="edge">
<title>client&#45;&gt;setupWatch</title>
<path fill="none" stroke="black" d="M1551.39,-384.65C1614.72,-377.33 1744.19,-358.73 1774.23,-326.43 1793.83,-305.36 1797.56,-272.26 1796.99,-246.87"/>
<polygon fill="black" stroke="black" points="1800.48,-246.69 1796.53,-236.86 1793.49,-247.01 1800.48,-246.69"/>
</g>
<!-- clientInvHandle -->
<g id="node25" class="node">
<title>clientInvHandle</title>
<ellipse fill="#f2f2f2" stroke="black" cx="1516.23" cy="-299.56" rx="103.48" ry="26.74"/>
<text text-anchor="middle" x="1516.23" y="-303.36" font-family="Times,serif" font-size="14.00">process</text>
<text text-anchor="middle" x="1516.23" y="-288.36" font-family="Times,serif" font-size="14.00">#blk invalidations</text>
</g>
<!-- client&#45;&gt;clientInvHandle -->
<g id="edge27" class="edge">
<title>client&#45;&gt;clientInvHandle</title>
<path fill="none" stroke="black" d="M1516.23,-370.97C1516.23,-361.16 1516.23,-348.51 1516.23,-336.65"/>
<polygon fill="black" stroke="black" points="1519.73,-336.52 1516.23,-326.52 1512.73,-336.52 1519.73,-336.52"/>
</g>
<!-- nowcfs -->
<g id="node26" class="node">
<title>nowcfs</title>
<ellipse fill="#f2f2f2" stroke="black" cx="1701.23" cy="-299.56" rx="63.89" ry="18"/>
<text text-anchor="middle" x="1701.23" y="-295.86" font-family="Times,serif" font-size="14.00">!wcfs mode</text>
</g>
<!-- client&#45;&gt;nowcfs -->
<g id="edge28" class="edge">
<title>client&#45;&gt;nowcfs</title>
<path fill="none" stroke="black" d="M1541.56,-376.29C1572.27,-361.73 1624.39,-337 1660.7,-319.78"/>
<polygon fill="black" stroke="black" points="1662.49,-322.81 1670.03,-315.36 1659.49,-316.48 1662.49,-322.81"/>
</g>
<!-- ZODB_go_inv -->
<g id="node9" class="node">
<title>ZODB_go_inv</title>
<ellipse fill="#f2f2f2" stroke="black" cx="567.23" cy="-209.82" rx="76.24" ry="26.74"/>
<text text-anchor="middle" x="567.23" y="-213.62" font-family="Times,serif" font-size="14.00">ZODB/go</text>
<text text-anchor="middle" x="567.23" y="-198.62" font-family="Times,serif" font-size="14.00">invalidations</text>
</g>
<!-- wcfsInvProcess&#45;&gt;ZODB_go_inv -->
<g id="edge8" class="edge">
<title>wcfsInvProcess&#45;&gt;ZODB_go_inv</title>
<path fill="none" stroke="black" d="M899.62,-280.83C886.48,-278.01 872.98,-275.2 860.23,-272.69 768.6,-254.69 744.1,-258.18 653.23,-236.69 646.34,-235.07 639.19,-233.2 632.1,-231.24"/>
<polygon fill="black" stroke="black" points="633.05,-227.87 622.48,-228.51 631.14,-234.6 633.05,-227.87"/>
</g>
<!-- zconnCacheGet -->
<g id="node10" class="node">
<title>zconnCacheGet</title>
<ellipse fill="lightyellow" stroke="black" cx="915.23" cy="-209.82" rx="67.35" ry="26.74"/>
<text text-anchor="middle" x="915.23" y="-213.62" font-family="Times,serif" font-size="14.00">zonn.</text>
<text text-anchor="middle" x="915.23" y="-198.62" font-family="Times,serif" font-size="14.00">.Cache.Get</text>
</g>
<!-- wcfsInvProcess&#45;&gt;zconnCacheGet -->
<g id="edge9" class="edge">
<title>wcfsInvProcess&#45;&gt;zconnCacheGet</title>
<path fill="none" stroke="black" d="M959.66,-272.71C953.27,-263.81 946.02,-253.71 939.26,-244.29"/>
<polygon fill="black" stroke="black" points="941.9,-241.97 933.22,-235.89 936.21,-246.05 941.9,-241.97"/>
</g>
<!-- zobj2file -->
<g id="node11" class="node">
<title>zobj2file</title>
<ellipse fill="#f2f2f2" stroke="black" cx="222.23" cy="-209.82" rx="81.49" ry="18"/>
<text text-anchor="middle" x="222.23" y="-206.12" font-family="Times,serif" font-size="14.00">Z* → file/[]#blk</text>
</g>
<!-- wcfsInvProcess&#45;&gt;zobj2file -->
<g id="edge10" class="edge">
<title>wcfsInvProcess&#45;&gt;zobj2file</title>
<path fill="none" stroke="black" d="M901.36,-280.31C887.71,-277.47 873.59,-274.79 860.23,-272.69 693.5,-246.58 650.19,-253.05 482.23,-236.69 422.76,-230.9 355.65,-224.22 304.51,-219.1"/>
<polygon fill="black" stroke="black" points="304.81,-215.61 294.51,-218.1 304.11,-222.58 304.81,-215.61"/>
</g>
<!-- δFtail -->
<g id="node12" class="node">
<title>δFtail</title>
<ellipse fill="lightyellow" stroke="black" cx="1139.23" cy="-209.82" rx="37.89" ry="18"/>
<text text-anchor="middle" x="1139.23" y="-206.12" font-family="Times,serif" font-size="14.00">δFtail</text>
</g>
<!-- wcfsInvProcess&#45;&gt;δFtail -->
<g id="edge11" class="edge">
<title>wcfsInvProcess&#45;&gt;δFtail</title>
<path fill="none" stroke="black" d="M1022.56,-274.96C1043.97,-263.5 1069.98,-249.48 1093.23,-236.69 1097.73,-234.22 1102.46,-231.59 1107.11,-228.98"/>
<polygon fill="black" stroke="black" points="1108.86,-232.02 1115.87,-224.07 1105.44,-225.91 1108.86,-232.02"/>
</g>
<!-- fuseRetrieveCache -->
<g id="node13" class="node">
<title>fuseRetrieveCache</title>
<ellipse fill="lightyellow" stroke="black" cx="746.23" cy="-209.82" rx="84.29" ry="26.74"/>
<text text-anchor="middle" x="746.23" y="-213.62" font-family="Times,serif" font-size="14.00">FUSE:</text>
<text text-anchor="middle" x="746.23" y="-198.62" font-family="Times,serif" font-size="14.00">retrieve cache</text>
</g>
<!-- wcfsInvProcess&#45;&gt;fuseRetrieveCache -->
<g id="edge12" class="edge">
<title>wcfsInvProcess&#45;&gt;fuseRetrieveCache</title>
<path fill="none" stroke="black" d="M920.28,-276.65C886.3,-263.8 843.36,-247.56 808.64,-234.43"/>
<polygon fill="black" stroke="black" points="809.66,-231.07 799.07,-230.81 807.19,-237.62 809.66,-231.07"/>
</g>
<!-- _wcfs_zhead -->
<g id="node14" class="node">
<title>_wcfs_zhead</title>
<ellipse fill="lightyellow" stroke="black" cx="1042.23" cy="-209.82" rx="41.94" ry="26.74"/>
<text text-anchor="middle" x="1042.23" y="-213.62" font-family="Times,serif" font-size="14.00">.wcfs/</text>
<text text-anchor="middle" x="1042.23" y="-198.62" font-family="Times,serif" font-size="14.00">zhead</text>
</g>
<!-- wcfsInvProcess&#45;&gt;_wcfs_zhead -->
<g id="edge13" class="edge">
<title>wcfsInvProcess&#45;&gt;_wcfs_zhead</title>
<path fill="none" stroke="black" d="M997.08,-272.71C1003.93,-263.33 1011.74,-252.62 1018.92,-242.77"/>
<polygon fill="black" stroke="black" points="1021.97,-244.54 1025.03,-234.39 1016.31,-240.41 1021.97,-244.54"/>
</g>
<!-- wcfsRead&#45;&gt;δFtail -->
<g id="edge19" class="edge">
<title>wcfsRead&#45;&gt;δFtail</title>
<path fill="none" stroke="black" d="M1298.77,-284.98C1268.49,-273.09 1223.97,-254.91 1186.23,-236.69 1181.42,-234.37 1176.41,-231.8 1171.53,-229.2"/>
<polygon fill="black" stroke="black" points="1172.85,-225.93 1162.4,-224.22 1169.5,-232.08 1172.85,-225.93"/>
</g>
<!-- blktabGet -->
<g id="node19" class="node">
<title>blktabGet</title>
<ellipse fill="#f2f2f2" stroke="black" cx="1334.23" cy="-209.82" rx="139.1" ry="26.74"/>
<text text-anchor="middle" x="1334.23" y="-213.62" font-family="Times,serif" font-size="14.00">blktab.Get(#blk):</text>
<text text-anchor="middle" x="1334.23" y="-198.62" font-family="Times,serif" font-size="14.00">manually + → ⌈rev(#blk)⌉</text>
</g>
<!-- wcfsRead&#45;&gt;blktabGet -->
<g id="edge18" class="edge">
<title>wcfsRead&#45;&gt;blktabGet</title>
<path fill="none" stroke="black" d="M1334.23,-281.23C1334.23,-271.42 1334.23,-258.77 1334.23,-246.91"/>
<polygon fill="black" stroke="black" points="1337.73,-246.78 1334.23,-236.78 1330.73,-246.78 1337.73,-246.78"/>
</g>
<!-- wcfsRead&#45;&gt;setupWatch -->
<g id="edge20" class="edge">
<title>wcfsRead&#45;&gt;setupWatch</title>
<path fill="none" stroke="black" d="M1367.56,-284.45C1379,-280.08 1392.01,-275.65 1404.23,-272.69 1532.59,-241.59 1569.64,-262.29 1699.23,-236.69 1707.14,-235.13 1715.36,-233.25 1723.5,-231.23"/>
<polygon fill="black" stroke="black" points="1724.54,-234.58 1733.36,-228.71 1722.81,-227.8 1724.54,-234.58"/>
</g>
<!-- headWatch -->
<g id="node21" class="node">
<title>headWatch</title>
<ellipse fill="#f2f2f2" stroke="black" cx="1591.23" cy="-209.82" rx="99.38" ry="18"/>
<text text-anchor="middle" x="1591.23" y="-206.12" font-family="Times,serif" font-size="14.00">#blk ← head/watch</text>
</g>
<!-- wcfsRead&#45;&gt;headWatch -->
<g id="edge21" class="edge">
<title>wcfsRead&#45;&gt;headWatch</title>
<path fill="none" stroke="black" d="M1370.11,-285.03C1381.04,-280.98 1393.1,-276.59 1404.23,-272.69 1447.93,-257.39 1497.71,-240.97 1534.92,-228.89"/>
<polygon fill="black" stroke="black" points="1536.15,-232.17 1544.58,-225.76 1533.99,-225.52 1536.15,-232.17"/>
</g>
<!-- fs1_go_inv -->
<g id="node15" class="node">
<title>fs1_go_inv</title>
<ellipse fill="lightyellow" stroke="black" cx="885.23" cy="-109.48" rx="76.24" ry="26.74"/>
<text text-anchor="middle" x="885.23" y="-113.28" font-family="Times,serif" font-size="14.00">fs1/go</text>
<text text-anchor="middle" x="885.23" y="-98.28" font-family="Times,serif" font-size="14.00">invalidations</text>
</g>
<!-- ZODB_go_inv&#45;&gt;fs1_go_inv -->
<g id="edge14" class="edge">
<title>ZODB_go_inv&#45;&gt;fs1_go_inv</title>
<path fill="none" stroke="black" d="M623.54,-191.43C633.41,-188.51 643.6,-185.58 653.23,-182.95 718.12,-165.26 736.43,-168.29 800.23,-146.95 810.27,-143.6 820.77,-139.55 830.84,-135.39"/>
<polygon fill="black" stroke="black" points="832.47,-138.5 840.33,-131.38 829.75,-132.05 832.47,-138.5"/>
</g>
<!-- zeo_go_inv -->
<g id="node16" class="node">
<title>zeo_go_inv</title>
<ellipse fill="lightyellow" stroke="black" cx="396.23" cy="-109.48" rx="76.24" ry="26.74"/>
<text text-anchor="middle" x="396.23" y="-113.28" font-family="Times,serif" font-size="14.00">zeo/go</text>
<text text-anchor="middle" x="396.23" y="-98.28" font-family="Times,serif" font-size="14.00">invalidations</text>
</g>
<!-- ZODB_go_inv&#45;&gt;zeo_go_inv -->
<g id="edge15" class="edge">
<title>ZODB_go_inv&#45;&gt;zeo_go_inv</title>
<path fill="none" stroke="black" d="M528.42,-186.51C503.28,-172.05 470.37,-153.12 443.67,-137.76"/>
<polygon fill="black" stroke="black" points="445.29,-134.66 434.87,-132.7 441.8,-140.72 445.29,-134.66"/>
</g>
<!-- neo_go_inv -->
<g id="node17" class="node">
<title>neo_go_inv</title>
<ellipse fill="none" stroke="black" cx="567.23" cy="-109.48" rx="76.24" ry="26.74"/>
<text text-anchor="middle" x="567.23" y="-113.28" font-family="Times,serif" font-size="14.00">neo/go</text>
<text text-anchor="middle" x="567.23" y="-98.28" font-family="Times,serif" font-size="14.00">invalidations</text>
</g>
<!-- ZODB_go_inv&#45;&gt;neo_go_inv -->
<g id="edge16" class="edge">
<title>ZODB_go_inv&#45;&gt;neo_go_inv</title>
<path fill="none" stroke="black" d="M567.23,-182.73C567.23,-171.64 567.23,-158.52 567.23,-146.53"/>
<polygon fill="black" stroke="black" points="570.73,-146.35 567.23,-136.35 563.73,-146.35 570.73,-146.35"/>
</g>
<!-- zcache_go_inv -->
<g id="node18" class="node">
<title>zcache_go_inv</title>
<ellipse fill="none" stroke="grey" cx="726.23" cy="-109.48" rx="65.11" ry="26.74"/>
<text text-anchor="middle" x="726.23" y="-113.28" font-family="Times,serif" font-size="14.00" fill="grey">ZCache/go</text>
<text text-anchor="middle" x="726.23" y="-98.28" font-family="Times,serif" font-size="14.00" fill="grey">←watchq</text>
</g>
<!-- ZODB_go_inv&#45;&gt;zcache_go_inv -->
<g id="edge17" class="edge">
<title>ZODB_go_inv&#45;&gt;zcache_go_inv</title>
<path fill="none" stroke="grey" stroke-dasharray="5,2" d="M604.1,-186.01C627.47,-171.56 657.81,-152.79 682.41,-137.58"/>
<polygon fill="grey" stroke="grey" points="684.52,-140.39 691.18,-132.15 680.84,-134.43 684.52,-140.39"/>
</g>
<!-- zblk2file -->
<g id="node22" class="node">
<title>zblk2file</title>
<ellipse fill="lightyellow" stroke="black" cx="62.23" cy="-109.48" rx="62.45" ry="37.45"/>
<text text-anchor="middle" x="62.23" y="-120.78" font-family="Times,serif" font-size="14.00">ZBlk*</text>
<text text-anchor="middle" x="62.23" y="-105.78" font-family="Times,serif" font-size="14.00"></text>
<text text-anchor="middle" x="62.23" y="-90.78" font-family="Times,serif" font-size="14.00">file/[]#blk</text>
</g>
<!-- zobj2file&#45;&gt;zblk2file -->
<g id="edge22" class="edge">
<title>zobj2file&#45;&gt;zblk2file</title>
<path fill="none" stroke="black" d="M195.8,-192.58C173.54,-178.9 141.03,-158.91 113.65,-142.09"/>
<polygon fill="black" stroke="black" points="115.32,-139 104.96,-136.75 111.65,-144.97 115.32,-139"/>
</g>
<!-- zbtree2file -->
<g id="node23" class="node">
<title>zbtree2file</title>
<ellipse fill="none" stroke="black" cx="222.23" cy="-109.48" rx="79.81" ry="37.45"/>
<text text-anchor="middle" x="222.23" y="-120.78" font-family="Times,serif" font-size="14.00">BTree/Bucket</text>
<text text-anchor="middle" x="222.23" y="-105.78" font-family="Times,serif" font-size="14.00"></text>
<text text-anchor="middle" x="222.23" y="-90.78" font-family="Times,serif" font-size="14.00">file/[]#blk</text>
</g>
<!-- zobj2file&#45;&gt;zbtree2file -->
<g id="edge23" class="edge">
<title>zobj2file&#45;&gt;zbtree2file</title>
<path fill="none" stroke="black" d="M222.23,-191.69C222.23,-182.06 222.23,-169.53 222.23,-157.2"/>
<polygon fill="black" stroke="black" points="225.73,-156.96 222.23,-146.96 218.73,-156.96 225.73,-156.96"/>
</g>
<!-- fileSock -->
<g id="node27" class="node">
<title>fileSock</title>
<ellipse fill="lightyellow" stroke="black" cx="1316.23" cy="-109.48" rx="50.09" ry="18"/>
<text text-anchor="middle" x="1316.23" y="-105.78" font-family="Times,serif" font-size="14.00">FileSock</text>
</g>
<!-- _wcfs_zhead&#45;&gt;fileSock -->
<g id="edge31" class="edge">
<title>_wcfs_zhead&#45;&gt;fileSock</title>
<path fill="none" stroke="black" d="M1073.32,-191.73C1079.5,-188.63 1086,-185.55 1092.23,-182.95 1151.27,-158.32 1221.85,-136.72 1267.78,-123.65"/>
<polygon fill="black" stroke="black" points="1268.81,-126.99 1277.48,-120.91 1266.91,-120.26 1268.81,-126.99"/>
</g>
<!-- headWatch&#45;&gt;fileSock -->
<g id="edge30" class="edge">
<title>headWatch&#45;&gt;fileSock</title>
<path fill="none" stroke="black" d="M1548.26,-193.46C1497.03,-175.14 1411.86,-144.68 1360.19,-126.2"/>
<polygon fill="black" stroke="black" points="1361.23,-122.85 1350.64,-122.78 1358.87,-129.45 1361.23,-122.85"/>
</g>
<!-- δBTree -->
<g id="node24" class="node">
<title>δBTree</title>
<ellipse fill="#f2f2f2" stroke="black" cx="222.23" cy="-18" rx="50.89" ry="18"/>
<text text-anchor="middle" x="222.23" y="-14.3" font-family="Times,serif" font-size="14.00">δ(BTree)</text>
</g>
<!-- zbtree2file&#45;&gt;δBTree -->
<g id="edge24" class="edge">
<title>zbtree2file&#45;&gt;δBTree</title>
<path fill="none" stroke="grey" d="M222.23,-71.82C222.23,-63.33 222.23,-54.43 222.23,-46.42"/>
<polygon fill="grey" stroke="grey" points="225.73,-46.15 222.23,-36.15 218.73,-46.15 225.73,-46.15"/>
</g>
<!-- clientInvHandle&#45;&gt;headWatch -->
<g id="edge29" class="edge">
<title>clientInvHandle&#45;&gt;headWatch</title>
<path fill="none" stroke="black" d="M1537.92,-273.19C1548.04,-261.34 1560.05,-247.3 1570.1,-235.54"/>
<polygon fill="black" stroke="black" points="1572.86,-237.69 1576.7,-227.82 1567.54,-233.15 1572.86,-237.69"/>
</g>
</g>
</svg>
# -*- coding: utf-8 -*-
# Copyright (C) 2018-2020 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""wcfs_test.py tests wcfs filesystem from outside as python client process.
Virtmem layer provided by wcfs client package is unit-tested by
wcfs/client/client_test.py .
At functional level, the whole wendelin.core test suite is used to verify
wcfs.py/wcfs.go while running tox tests in wcfs mode.
"""
from __future__ import print_function, absolute_import
from wendelin.lib.testing import getTestDB
from wendelin.lib.zodb import dbclose, zstor_2zurl
from wendelin.lib.mem import memcpy
from wendelin.bigfile.file_zodb import ZBigFile
from wendelin.bigfile.tests.test_filezodb import blksize
from wendelin import wcfs
import transaction
from persistent import Persistent
from persistent.timestamp import TimeStamp
from ZODB.utils import z64, u64, p64
import sys, os, os.path, subprocess
from thread import get_ident as gettid
from time import gmtime
from errno import EINVAL, ENOENT, ENOTCONN
from stat import S_ISDIR
from signal import SIGQUIT, SIGKILL
from resource import setrlimit, getrlimit, RLIMIT_MEMLOCK
from golang import go, chan, select, func, defer, default, error, b
from golang import context, errors, sync, time
from zodbtools.util import ashex as h, fromhex
import pytest; xfail = pytest.mark.xfail
from pytest import raises, fail
from wendelin.wcfs.internal import io, mm
from wendelin.wcfs.internal.wcfs_test import _tDB, read_nogil, install_sigbus_trap, fadvise_dontneed
from wendelin.wcfs.client._wcfs import _tpywlinkwrite as _twlinkwrite
# setup:
# - create test database, compute zurl and mountpoint for wcfs
# - at every test: make sure wcfs is not running before & after the test.
testdb = None
testzurl = None # URL of testdb
testmntpt = None # wcfs is mounted here
def setup_module():
# if wcfs.py receives SIGBUS because wcfs.go panics while serving mmap'ed
# read, we want to see python-level traceback instead of being killed.
install_sigbus_trap()
# if wcfs.go is built with race detector and detects a race - make it fail
# current test loudly on the first wcfs.go race.
gorace = os.environ.get("GORACE", "")
if gorace != "":
gorace += " "
os.environ["GORACE"] = gorace + "halt_on_error=1"
# ↑ memlock soft-limit till its hard maximum
# (tFile needs ~ 64M to mlock while default memlock soft-limit is usually 64K)
memlockS, memlockH = getrlimit(RLIMIT_MEMLOCK)
if memlockS != memlockH:
setrlimit(RLIMIT_MEMLOCK, (memlockH, memlockH))
global testdb, testzurl, testmntpt
testdb = getTestDB()
testdb.setup()
zstor = testdb.getZODBStorage()
testzurl = zstor_2zurl(zstor)
zstor.close()
testmntpt = wcfs._mntpt_4zurl(testzurl)
os.rmdir(testmntpt)
def teardown_module():
testdb.teardown()
# make sure we start every test without wcfs server running.
def setup_function(f):
assert not os.path.exists(testmntpt)
# make sure we unmount wcfs after every test.
# (tDB checks this in more detail, but join tests don't use tDB)
def teardown_function(f):
mounted = is_mountpoint(testmntpt)
if mounted:
subprocess.check_call(["fusermount", "-u", testmntpt])
if os.path.exists(testmntpt):
os.rmdir(testmntpt)
# ---- test join/autostart ----
# test that join works.
@func
def test_join():
zurl = testzurl
with raises(RuntimeError, match="wcfs: join .*: server not started"):
wcfs.join(zurl, autostart=False)
assert wcfs._wcregistry == {}
def _():
assert wcfs._wcregistry == {}
defer(_)
wc = wcfs._start(zurl)
defer(wc.close)
assert wc.mountpoint == testmntpt
assert wc._njoin == 1
assert readfile(wc.mountpoint + "/.wcfs/zurl") == zurl
assert os.path.isdir(wc.mountpoint + "/head")
assert os.path.isdir(wc.mountpoint + "/head/bigfile")
wc2 = wcfs.join(zurl, autostart=False)
defer(wc2.close)
assert wc2 is wc
assert wc._njoin == 2
# test that join(autostart=y) works.
@func
def test_join_autostart():
zurl = testzurl
with raises(RuntimeError, match="wcfs: join .*: server not started"):
wcfs.join(zurl, autostart=False)
assert wcfs._wcregistry == {}
def _():
assert wcfs._wcregistry == {}
defer(_)
wc = wcfs.join(zurl, autostart=True)
defer(wc.close)
assert wc.mountpoint == testmntpt
assert wc._njoin == 1
assert readfile(wc.mountpoint + "/.wcfs/zurl") == zurl
assert os.path.isdir(wc.mountpoint + "/head")
assert os.path.isdir(wc.mountpoint + "/head/bigfile")
# ---- infrastructure for data access tests ----
#
# Testing infrastructure consists of tDB, tFile, tWatch and tWatchLink that
# jointly organize wcfs behaviour testing. See individual classes for details.
# many tests need to be run with some reasonable timeout to detect lack of wcfs
# response. with_timeout and timeout provide syntactic shortcuts to do so.
def with_timeout(parent=context.background()): # -> ctx, cancel
return context.with_timeout(parent, 3*time.second)
def timeout(parent=context.background()): # -> ctx
ctx, _ = with_timeout()
return ctx
# tdelay is used in places where we need to delay a bit in order to e.g.
# increase probability of a bug due to race condition.
def tdelay():
time.sleep(10*time.millisecond)
# DF represents a change in files space.
# it corresponds to ΔF in wcfs.go .
class DF:
# .rev tid
# .byfile {} ZBigFile -> DFile
def __init__(dF):
# rev set from outside
dF.byfile = {}
# DFile represents a change to one file.
# it is similar to ΔFile in wcfs.go .
class DFile:
# .rev tid
# .ddata {} blk -> data
def __init__(dfile):
# rev set from outside
dfile.ddata = {}
# tDB provides database/wcfs testing environment.
#
# Database root and wcfs connection are represented by .root and .wc correspondingly.
# The database is initialized with one ZBigFile created and opened via ZODB connection as .zfile .
#
# The primary way to access wcfs is by opening BigFiles and WatchLinks.
# A BigFile opened under tDB is represented as tFile - see .open for details.
# A WatchLink opened under tDB is represented as tWatchLink - see .openwatch for details.
#
# The database can be mutated (via !wcfs codepath) with .change + .commit .
# Current database head is represented by .head .
# The history of the changes is kept in .dFtail .
# There are various helpers to query history (_blkDataAt, _pinnedAt, .iter_revv, ...)
#
# tDB must be explicitly closed once no longer used.
#
# XXX print -> t.trace/debug() + t.verbose depending on py.test -v -v ?
class tDB(_tDB):
@func
def __init__(t):
t.root = testdb.dbopen()
def _(): # close/unlock db if __init__ fails
exc = sys.exc_info()[1]
if exc is not None:
dbclose(t.root)
defer(_)
assert not os.path.exists(testmntpt)
t.wc = wcfs.join(testzurl, autostart=True)
assert os.path.exists(testmntpt)
assert is_mountpoint(testmntpt)
# force-unmount wcfs on timeout to unstuck current test and let it fail.
# Force-unmount can be done reliably only by writing into
# /sys/fs/fuse/connections/<X>/abort. For everything else there are
# cases, when wcfs, even after receiving `kill -9`, will be stuck in kernel.
# ( git.kernel.org/linus/a131de0a482a makes in-kernel FUSE client to
# still wait for request completion even after fatal signal )
t._wcfuseabort = open("/sys/fs/fuse/connections/%d/abort" % os.stat(testmntpt).st_dev, "w")
nogilready = chan(dtype='C.structZ')
go(t._abort_ontimeout, 10*time.second, nogilready) # NOTE must be: with_timeout << · << wcfs_pin_timeout
nogilready.recv() # wait till _abort_ontimeout enters nogil
# ZBigFile(s) scheduled for commit
t._changed = {} # ZBigFile -> {} blk -> data
# committed: (tail, head] + δF history
t.tail = t.root._p_jar.db().storage.lastTransaction()
t.dFtail = [] # of DF; head = dFtail[-1].rev
# fh(.wcfs/zhead) + history of zhead read from there
t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead")
t._wc_zheadv = []
# whether head/ ZBigFile(s) blocks were ever accessed via wcfs.
# this is updated only explicitly via ._blkheadaccess() .
t._blkaccessedViaHead = {} # ZBigFile -> set(blk) XXX ZF -> foid ? (threads)
# tracked opened tFiles & tWatchLinks
t._files = set()
t._wlinks = set()
# ID of the thread which created tDB
# ( transaction plays dirty games with threading.local and we have to
# check the thread is the same when .root is used )
t._maintid = gettid()
# prepare initial objects for test: zfile, nonzfile
t.root['!file'] = t.nonzfile = Persistent()
t.root['zfile'] = t.zfile = ZBigFile(blksize)
t.at0 = t.commit()
@property
def head(t):
return t.dFtail[-1].rev
# _abort_ontimeout is in wcfs_test.pyx
# close closes test database as well as all tracked files, watch links and wcfs.
# it also prints change history to help developer overview current testcase.
@func
def close(t):
defer(t._wcfuseabort.close)
defer(t._closed.close)
defer(lambda: dbclose(t.root))
# unmount and wait for wcfs to exit
def _():
assert not is_mountpoint(testmntpt)
os.rmdir(testmntpt)
defer(_)
def _():
# kill wcfs.go in case it is deadlocked and does not exit by itself
if procwait_(timeout(), t.wc._proc):
return
eprint("\nC: wcfs.go does not exit")
eprint("-> kill -QUIT wcfs.go ...\n")
os.kill(t.wc._proc.pid, SIGQUIT)
if procwait_(timeout(), t.wc._proc):
return
eprint("\nC: wcfs.go does not exit (after SIGQUIT)")
eprint("-> kill -KILL wcfs.go ...\n")
os.kill(t.wc._proc.pid, SIGKILL)
if procwait_(timeout(), t.wc._proc):
return
eprint("\nC: wcfs.go does not exit (after SIGKILL; probably it is stuck in kernel)")
eprint("-> nothing we can do...\n") # XXX dump /proc/pid/task/*/stack instead (ignore EPERM)
fail("wcfs.go does not exit even after SIGKILL")
defer(_)
def _():
#if not ready(t._wcfuseaborted): XXX kill _wcfuseaborted ?
# assert 0 == subprocess.call(["mountpoint", "-q", testmntpt])
assert is_mountpoint(testmntpt)
subprocess.check_call(["fusermount", "-u", testmntpt])
defer(_)
defer(t.dump_history)
for tf in t._files.copy():
tf.close()
for tw in t._wlinks.copy():
tw.close()
assert len(t._files) == 0
assert len(t._wlinks) == 0
t._wc_zheadfh.close()
t.wc.close()
# open opens wcfs file corresponding to zf@at and starts to track it.
# see returned tFile for details.
def open(t, zf, at=None): # -> tFile
return tFile(t, zf, at=at)
# openwatch opens /head/watch on wcfs.
# see returned tWatchLink for details.
def openwatch(t): # -> tWatchLink
return tWatchLink(t)
# change schedules zf to be changed according to changeDelta at commit.
#
# changeDelta: {} blk -> data.
# data can be both bytes and unicode.
def change(t, zf, changeDelta):
assert isinstance(zf, ZBigFile)
zfDelta = t._changed.setdefault(zf, {})
for blk, data in changeDelta.iteritems():
data = b(data)
assert len(data) <= zf.blksize
zfDelta[blk] = data
# commit commits transaction and makes sure wcfs is synchronized to it.
#
# It updates .dFtail and returns committed transaction ID.
#
# zf and changeDelta can be optionally provided, in which case .change(zf,
# changeDelta) call is made before actually committing.
def commit(t, zf=None, changeDelta=None): # -> tAt
if zf is not None:
assert changeDelta is not None
t.change(zf, changeDelta)
# perform modifications scheduled by change.
# use !wcfs mode so that we prepare data independently of wcfs code paths.
dF = DF()
zconns = set()
for zf, zfDelta in t._changed.items():
dfile = DFile()
zconns.add(zf._p_jar)
zfh = zf.fileh_open(_use_wcfs=False)
for blk, data in zfDelta.iteritems():
dfile.ddata[blk] = data
data += b'\0'*(zf.blksize - len(data)) # trailing \0
vma = zfh.mmap(blk, 1)
memcpy(vma, data)
dF.byfile[zf] = dfile
# verify that all changed objects come from the same ZODB connection
assert len(zconns) in (0, 1) # either nothing to commit or all from the same zconn
if len(zconns) == 1:
zconn = zconns.pop()
root = zconn.root()
else:
# no objects to commit
root = t.root
assert gettid() == t._maintid
# perform the commit. NOTE there is no clean way to retrieve tid of
# just committed transaction - we use last._p_serial as workaround.
root['_last'] = last = Persistent()
last._p_changed = 1
transaction.commit()
head = tAt(t, last._p_serial)
dF.rev = head
for dfile in dF.byfile.values():
dfile.rev = head
t.dFtail.append(dF)
assert t.head == head # self-check
print('\nM: commit -> %s' % head)
for zf, zfDelta in t._changed.items():
print('M: f<%s>\t%s' % (h(zf._p_oid), sorted(zfDelta.keys())))
t._changed = {}
# synchronize wcfs to db, and we are done
t._wcsync()
return head
# _wcsync makes sure wcfs is synchronized to latest committed transaction.
def _wcsync(t):
while len(t._wc_zheadv) < len(t.dFtail):
l = t._wc_zheadfh.readline()
#print('> zhead read: %r' % l)
l = l.rstrip('\n')
wchead = tAt(t, fromhex(l))
i = len(t._wc_zheadv)
if wchead != t.dFtail[i].rev:
raise RuntimeError("wcsync #%d: wczhead (%s) != zhead (%s)" % (i, wchead, t.dFtail[i].rev))
t._wc_zheadv.append(wchead)
# head/at = last txn of whole db
assert t.wc._read("head/at") == h(t.head)
# _blkheadaccess marks head/zf[blk] accessed.
def _blkheadaccess(t, zf, blk):
# XXX locking needed? or we do everything serially?
t._blkaccessed(zf).add(blk)
# _blkaccessed returns set describing whether head/zf blocks were ever accessed.
def _blkaccessed(t, zf): # set(blk)
return t._blkaccessedViaHead.setdefault(zf, set())
# tFile provides testing environment for one bigfile opened on wcfs.
#
# ._blk() provides access to data of a block. .cached() gives state of which
# blocks are in OS pagecache. .assertCache and .assertBlk/.assertData assert
# on state of cache and data.
class tFile:
# maximum number of pages we mmap for 1 file.
# this should be not big not to exceed mlock limit.
_max_tracked_pages = 8
def __init__(t, tdb, zf, at=None):
assert isinstance(zf, ZBigFile)
t.tdb = tdb
t.zf = zf
t.at = at
t.f = tdb.wc._open(zf, at=at)
t.blksize = zf.blksize
t.fmmap = None
tdb._files.add(t)
# make sure that wcfs reports zf.blksize as preffered block size for IO.
# wcfs.py also uses .st_blksize in blk -> byte offset computation.
st = os.fstat(t.f.fileno())
assert st.st_blksize == t.blksize
# mmap the file past the end up to _max_tracked_pages and setup
# invariants on which we rely to verify OS cache state:
#
# 1. lock pages with MLOCK_ONFAULT: this way after a page is read by
# mmap access we have the guarantee from kernel that the page will
# stay in pagecache.
#
# 2. madvise memory with MADV_SEQUENTIAL and MADV_RANDOM in interleaved
# mode. This adjusts kernel readahead (which triggers for
# MADV_NORMAL or MADV_SEQUENTIAL vma) to not go over to next block
# and thus a read access to one block won't trigger implicit read
# access to its neighbour block.
#
# https://www.quora.com/What-heuristics-does-the-adaptive-readahead-implementation-in-the-Linux-kernel-use
# https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/mm/madvise.c?h=v5.2-rc4#n51
#
# we don't use MADV_NORMAL instead of MADV_SEQUENTIAL, because for
# MADV_NORMAL, there is not only read-ahead, but also read-around,
# which might result in accessing previous block.
#
# we don't disable readahead universally, since enabled readahead
# helps to test how wcfs handles simultaneous read triggered by
# async kernel readahead vs wcfs uploading data for the same block
# into OS cache. Also, fully enabled readahead is how wcfs is
# actually used in practice.
assert t.blksize % mm.PAGE_SIZE == 0
t.fmmap = mm.map_ro(t.f.fileno(), 0, t._max_tracked_pages*t.blksize)
mm.lock(t.fmmap, mm.MLOCK_ONFAULT)
for blk in range(t._max_tracked_pages):
blkmmap = t.fmmap[blk*t.blksize:(blk+1)*t.blksize]
# NOTE the kernel does not start readahead from access to
# MADV_RANDOM vma, but for a MADV_{NORMAL/SEQUENTIAL} vma it starts
# readahead which can go _beyond_ vma that was used to decide RA
# start. For this reason - to prevent RA started at one block to
# overlap with the next block, we put MADV_RANDOM vma at the end of
# every block covering last 1/8 of it.
# XXX implicit assumption that RA window is < 1/8·blksize
#
# NOTE with a block completely covered by MADV_RANDOM the kernel
# issues 4K sized reads; wcfs starts uploading into cache almost
# immediately, but the kernel still issues many reads to read the
# full 2MB of the block. This works slow.
# XXX -> investigate and maybe make read(while-uploading) wait for
# uploading to complete and only then return? (maybe it will help
# performance even in normal case)
_ = len(blkmmap)*7//8
mm.advise(blkmmap[:_], mm.MADV_SEQUENTIAL)
mm.advise(blkmmap[_:], mm.MADV_RANDOM)
def close(t):
t.tdb._files.remove(t)
if t.fmmap is not None:
mm.unmap(t.fmmap)
t.f.close()
# _blk returns memoryview of file[blk].
# when/if block memory is accessed, the user has to notify tFile with _blkaccess call.
def _blk(t, blk):
assert blk <= t._max_tracked_pages
return memoryview(t.fmmap[blk*t.blksize:(blk+1)*t.blksize])
def _blkaccess(t, blk):
if t.at is None: # notify tDB only for head/file access
t.tdb._blkheadaccess(t.zf, blk)
# cached returns [] with indicating whether a file block is cached or not.
# 1 - cached, 0 - not cached, fractional (0,1) - some pages of the block are cached some not.
def cached(t):
l = t._sizeinblk()
incorev = mm.incore(t.fmmap[:l*t.blksize])
# incorev is in pages; convert to in blocks
assert t.blksize % mm.PAGE_SIZE == 0
blkpages = t.blksize // mm.PAGE_SIZE
cachev = [0.]*l
for i, v in enumerate(incorev):
blk = i // blkpages
cachev[blk] += bool(v)
for blk in range(l):
cachev[blk] /= blkpages
if cachev[blk] == int(cachev[blk]):
cachev[blk] = int(cachev[blk]) # 0.0 -> 0, 1.0 -> 1
return cachev
# _sizeinblk returns file size in blocks.
def _sizeinblk(t):
st = os.fstat(t.f.fileno())
assert st.st_blksize == t.blksize # just in case
assert st.st_size % t.blksize == 0
assert st.st_size // t.blksize <= t._max_tracked_pages
return st.st_size // t.blksize
# assertCache asserts on state of OS cache for file.
#
# incorev is [] of 1/0 representing whether block data is present or not.
def assertCache(t, incorev):
assert t.cached() == incorev
# assertBlk asserts that file[blk] has data as expected.
#
# Expected data may be given with size < t.blksize. In such case the data
# is implicitly appended with trailing zeros. Data can be both bytes and unicode.
#
# It also checks that file watches are properly notified on data access -
# - see "7.2) for all registered client@at watchers ..."
#
# pinokByWLink: {} tWatchLink -> {} blk -> at.
# pinokByWLink can be omitted - in that case it is computed only automatically.
#
# The automatic computation of pinokByWLink is verified against explicitly
# provided pinokByWLink when it is present.
@func
def assertBlk(t, blk, dataok, pinokByWLink=None):
# XXX -> assertCtx('blk #%d' % blk)
def _():
assertCtx = 'blk #%d' % blk
_, e, _ = sys.exc_info()
if isinstance(e, AssertionError):
assert len(e.args) == 1 # pytest puts everything as args[0]
e.args = (assertCtx + "\n" + e.args[0],)
defer(_)
dataok = b(dataok)
blkdata, _ = t.tdb._blkDataAt(t.zf, blk, t.at)
assert blkdata == dataok, "computed vs explicit data"
t._assertBlk(blk, dataok, pinokByWLink)
@func
def _assertBlk(t, blk, dataok, pinokByWLink=None, pinfunc=None):
assert len(dataok) <= t.blksize
dataok += b'\0'*(t.blksize - len(dataok)) # tailing zeros
assert blk < t._sizeinblk()
# access to this block must not trigger access to other blocks
incore_before = t.cached()
def _():
incore_after = t.cached()
incore_before[blk] = 'x'
incore_after [blk] = 'x'
assert incore_before == incore_after
defer(_)
cached = t.cached()[blk]
assert cached in (0, 1) # every check accesses a block in full
shouldPin = False # whether at least one wlink should receive a pin
# watches that must be notified if access goes to @head/file
wpin = {} # tWatchLink -> pinok
blkrev = t.tdb._blkRevAt(t.zf, blk, t.at)
if t.at is None: # @head/...
for wlink in t.tdb._wlinks:
pinok = {}
w = wlink._watching.get(t.zf._p_oid)
if w is not None and w.at < blkrev:
if cached == 1:
# @head[blk].rev is after w.at - w[blk] must be already pinned
assert blk in w.pinned
assert w.pinned[blk] <= w.at
else:
assert cached == 0
# even if @head[blk] is uncached, the block could be
# already pinned by setup watch
if blk not in w.pinned:
pinok = {blk: t.tdb._blkRevAt(t.zf, blk, w.at)}
shouldPin = True
wpin[wlink] = pinok
if pinokByWLink is not None:
assert wpin == pinokByWLink, "computed vs explicit pinokByWLink"
pinokByWLink = wpin
# doCheckingPin expects every wlink entry to also contain zf
for wlink, pinok in pinokByWLink.items():
pinokByWLink[wlink] = (t.zf, pinok)
# access 1 byte on the block and verify that wcfs sends us correct pins
blkview = t._blk(blk)
assert t.cached()[blk] == cached
def _(ctx, ev):
assert t.cached()[blk] == cached
ev.append('read pre')
# access data with released GIL so that the thread that reads data from
# head/watch can receive pin message. Be careful to handle cancellation,
# so that on error in another worker we don't get stuck and the
# error can be propagated to wait and reported.
#
# XXX after WatchLink is moved to pyx/nogil, do we still need to do
# here with nogil?
have_read = chan(1)
def _():
b = read_nogil(blkview[0:1])
t._blkaccess(blk)
have_read.send(b)
go(_)
_, _rx = select(
ctx.done().recv, # 0
have_read.recv, # 1
)
if _ == 0:
raise ctx.err()
b = _rx
ev.append('read ' + b)
ev = doCheckingPin(_, pinokByWLink, pinfunc)
# XXX hack - wlinks are notified and emit events simultaneously - we
# check only that events begin and end with read pre/post and that pins
# are inside (i.e. read is stuck until pins are acknowledged).
# Better do explicit check in tracetest style.
assert ev[0] == 'read pre', ev
assert ev[-1] == 'read ' + dataok[0], ev
ev = ev[1:-1]
if not shouldPin:
assert ev == []
else:
assert 'pin rx' in ev
assert 'pin ack pre' in ev
assert t.cached()[blk] > 0
# verify full data of the block
# XXX assert individually for every block's page? (easier debugging?)
assert blkview.tobytes() == dataok
# we just accessed the block in full - it has to be in OS cache completely
assert t.cached()[blk] == 1
# assertData asserts that file has data blocks as specified.
#
# Expected blocks may be given with size < zf.blksize. In such case they
# are implicitly appended with trailing zeros. If a block is specified as
# 'x' - this particular block is not accessed and is not checked.
#
# The file size and optionally mtime are also verified.
def assertData(t, dataokv, mtime=None):
st = os.fstat(t.f.fileno())
assert st.st_blksize == t.blksize
assert st.st_size == len(dataokv)*t.blksize
if mtime is not None:
assert st.st_mtime == tidtime(mtime)
cachev = t.cached()
for blk, dataok in enumerate(dataokv):
if dataok == 'x':
continue
t.assertBlk(blk, dataok)
cachev[blk] = 1
# all accessed blocks must be in cache after we touched them all
t.assertCache(cachev)
# tWatch represents watch for one file setup on a tWatchLink.
class tWatch:
def __init__(w, foid):
w.foid = foid
w.at = z64 # not None - always concrete
w.pinned = {} # blk -> rev
# tWatchLink provides testing environment for /head/watch link opened on wcfs.
#
# .watch() setups/adjusts a watch for a file and verifies that wcfs correctly sends initial pins.
class tWatchLink(wcfs.WatchLink):
def __init__(t, tdb):
super(tWatchLink, t).__init__(tdb.wc)
t.tdb = tdb
tdb._wlinks.add(t)
# this tWatchLink currently watches the following files at particular state.
t._watching = {} # {} foid -> tWatch
def close(t):
t.tdb._wlinks.remove(t)
super(tWatchLink, t).close()
# disable all established watches
for w in t._watching.values():
w.at = z64
w.pinned = {}
t._watching = {}
# XXX just wrap req.at with tAt inpace
"""
# recvReq is the same as WatchLink.recvReq but returns tSrvReq instead of PinReq.
def recvReq(t, ctx): # -> tSrvReq | None when EOF
req = super(tWatchLink, t).recvReq(ctx)
if req is not None:
assert req.__class__ is wcfs.PinReq
req.__class__ = tSrvReq
return req
class tSrvReq(wcfs.PinReq):
# _parse is the same as PinReq._parse, but returns at wrapped with tAt.
# XXX -> just wrap `at`
def _parse(req): # -> (foid, blk, at|None)
foid, blk, at = super(tSrvReq, req)._parse()
if at is not None:
at = tAt(req.wlink.tdb, at)
return foid, blk, at
"""
# ---- infrastructure: watch setup/adjust ----
# watch sets up or adjusts a watch for file@at.
#
# During setup it verifies that wcfs sends correct initial/update pins.
#
# pinok: {} blk -> rev
# pinok can be omitted - in that case it is computed automatically.
#
# The automatic computation of pinok is verified against explicitly provided
# pinok when it is present.
@func(tWatchLink)
def watch(twlink, zf, at, pinok=None): # -> tWatch
foid = zf._p_oid
t = twlink.tdb
w = twlink._watching.get(foid)
if w is None:
w = twlink._watching[foid] = tWatch(foid)
at_prev = None
else:
at_prev = w.at # we were previously watching zf @at_prev
at_from = ''
if at_prev is not None:
at_from = '(%s ->) ' % at_prev
print('\nC: setup watch f<%s> %s%s' % (h(foid), at_from, at))
accessed = t._blkaccessed(zf)
lastRevOf = lambda blk: t._blkRevAt(zf, blk, t.head)
pin_prev = {}
if at_prev is not None:
assert at_prev <= at, 'TODO %s -> %s' % (at_prev, at)
pin_prev = t._pinnedAt(zf, at_prev)
assert w.pinned == pin_prev
pin = t._pinnedAt(zf, at)
if at_prev != at and at_prev is not None:
print('# pin@old: %s\n# pin@new: %s' % (t.hpin(pin_prev), t.hpin(pin)))
for blk in set(pin_prev.keys()).union(pin.keys()):
# blk ∉ pin_prev, blk ∉ pin -> cannot happen
assert (blk in pin_prev) or (blk in pin)
# blk ∉ pin_prev, blk ∈ pin -> cannot happen, except on first start
if blk not in pin_prev and blk in pin:
if at_prev is not None:
fail('#%d pinned %s; not pinned %s' % (at_prev, at))
# blk ∈ pin -> blk is tracked; has rev > at
# (see criteria in _pinnedAt)
assert blk in accessed
assert at < lastRevOf(blk)
# blk ∈ pin_prev, blk ∉ pin -> unpin to head
elif blk in pin_prev and blk not in pin:
# blk ∈ pin_prev -> blk is tracked; has rev > at_prev
assert blk in accessed
assert at_prev < lastRevOf(blk)
# blk ∉ pin -> last blk revision is ≤ at
assert lastRevOf(blk) <= at
pin[blk] = None # @head
# blk ∈ pin_prev, blk ∈ pin -> if rev different: use pin
elif blk in pin_prev and blk in pin:
# blk ∈ pin_prev, pin -> blk is tracked; has rev > at_prev, at
assert blk in accessed
assert at_prev < lastRevOf(blk)
assert at < lastRevOf(blk)
assert pin_prev[blk] <= pin[blk]
if pin_prev[blk] == pin[blk]:
del pin[blk] # would need to pin to what it is already pinned
#print('-> %s' % t.hpin(pin))
# {} blk -> at that have to be pinned.
if pinok is not None:
assert pin == pinok, "computed vs explicit pinok"
pinok = pin
print('# pinok: %s' % t.hpin(pinok))
# send watch request and check that we receive pins for tracked (previously
# accessed at least once) blocks changed > at.
twlink._watch(zf, at, pinok, "ok")
w.at = at
# `watch ... -> at_i -> at_j` must be the same as `watch ø -> at_j`
assert w.pinned == t._pinnedAt(zf, at)
return w
# stop_watch instructs wlink to stop watching the file.
@func(tWatchLink)
def stop_watch(twlink, zf):
foid = zf._p_oid
assert foid in twlink._watching
w = twlink._watching.pop(foid)
twlink._watch(zf, b"-", {}, "ok")
w.at = z64
w.pinned = {}
# _watch sends watch request for zf@at, expects initial pins specified by pinok and final reply.
#
# at also can be b"-" which means "stop watching"
#
# pinok: {} blk -> at that have to be pinned.
# if replyok ends with '…' only reply prefix until the dots is checked.
@func(tWatchLink)
def _watch(twlink, zf, at, pinok, replyok):
if at == b"-":
xat = at
else:
xat = b"@%s" % h(at)
def _(ctx, ev):
reply = twlink.sendReq(ctx, b"watch %s %s" % (h(zf._p_oid), xat))
if replyok.endswith('…'):
rok = replyok[:-len('…')]
assert reply[:len(rok)] == rok
else:
assert reply == replyok
doCheckingPin(_, {twlink: (zf, pinok)})
# doCheckingPin calls f and verifies that wcfs sends expected pins during the
# time f executes.
#
# f(ctx, eventv)
# pinokByWLink: {} tWatchLink -> (zf, {} blk -> at).
# pinfunc(wlink, foid, blk, at) | None.
#
# pinfunc is called after pin request is received from wcfs, but before pin ack
# is replied back. Pinfunc must not block.
def doCheckingPin(f, pinokByWLink, pinfunc=None): # -> []event(str)
# call f and check that we receive pins as specified.
# Use timeout to detect wcfs replying less pins than expected.
#
# XXX detect not sent pins via ack'ing previous pins as they come in (not
# waiting for all of them) and then seeing that we did not received expected
# pin when f completes?
ctx, cancel = with_timeout()
wg = sync.WorkGroup(ctx)
ev = []
for wlink, (zf, pinok) in pinokByWLink.items():
def _(ctx, wlink, zf, pinok):
w = wlink._watching.get(zf._p_oid)
if len(pinok) > 0:
assert w is not None
pinv = wlink._expectPin(ctx, zf, pinok)
if len(pinv) > 0:
ev.append('pin rx') # XXX + zf, pin details?
# increase probability to receive erroneous extra pins
tdelay()
if len(pinv) > 0:
if pinfunc is not None:
for p in pinv:
pinfunc(wlink, p.foid, p.blk, p.at)
ev.append('pin ack pre') # XXX +details?
for p in pinv:
assert w.foid == p.foid
if p.at is None: # unpin to @head
assert p.blk in w.pinned # must have been pinned before
del w.pinned[p.blk]
else:
w.pinned[p.blk] = p.at
#p.reply(b"ack")
wlink.replyReq(ctx, p, b"ack")
# check that we don't get extra pins before f completes
try:
req = wlink.recvReq(ctx)
except Exception as e:
if errors.Is(e, context.canceled):
return # cancel is expected after f completes
raise
fail("extra pin message received: %r" % req.msg)
wg.go(_, wlink, zf, pinok)
def _(ctx):
f(ctx, ev)
# cancel _expectPin waiting upon completing f
# -> error that missed pins were not received.
cancel()
wg.go(_)
wg.wait()
return ev
# _expectPin asserts that wcfs sends expected pin messages.
#
# expect is {} blk -> at
# returns [] of received pin requests.
@func(tWatchLink)
def _expectPin(twlink, ctx, zf, expect): # -> []SrvReq
expected = set() # of expected pin messages
for blk, at in expect.items():
hat = h(at) if at is not None else 'head'
msg = b"pin %s #%d @%s" % (h(zf._p_oid), blk, hat)
assert msg not in expected
expected.add(msg)
reqv = [] # of received requests
while len(expected) > 0:
try:
req = twlink.recvReq(ctx)
except Exception as e:
raise RuntimeError("%s\nnot all pin messages received - pending:\n%s" % (e, expected))
assert req is not None # channel not closed
assert req.msg in expected
expected.remove(req.msg)
reqv.append(req)
return reqv
# ---- infrastructure: helpers to query dFtail/accessed history ----
# _blkDataAt returns expected zf[blk] data and its revision as of @at database state.
#
# If the block is hole - (b'', at0) is returned. XXX -> @z64?
# XXX ret for when the file did not existed at all? blk was after file size?
@func(tDB)
def _blkDataAt(t, zf, blk, at): # -> (data, rev)
if at is None:
at = t.head
# all changes to zf
vdf = [_.byfile[zf] for _ in t.dFtail if zf in _.byfile]
# changes to zf[blk] <= at
blkhistoryat = [_ for _ in vdf if blk in _.ddata and _.rev <= at]
if len(blkhistoryat) == 0:
# blk did not existed @at # XXX verify whether file was existing at all
data = b''
rev = t.dFtail[0].rev # was hole - at0 XXX -> pin to z64
else:
_ = blkhistoryat[-1]
data = _.ddata[blk]
rev = _.rev
assert rev <= at
return data, rev
# _blkRevAt returns expected zf[blk] revision as of @at database state.
@func(tDB)
def _blkRevAt(t, zf, blk, at): # -> rev
_, rev = t._blkDataAt(zf, blk, at)
return rev
# _pinnedAt returns which blocks need to be pinned for zf@at compared to zf@head
# according to wcfs isolation protocol.
#
# Criteria for when blk must be pinned as of @at view:
#
# blk ∈ pinned(at) <=> 1) ∃ r = rev(blk): at < r ; blk was changed after at
# 2) blk ∈ tracked ; blk was accessed at least once
# ; (and so is tracked by wcfs)
@func(tDB)
def _pinnedAt(t, zf, at): # -> pin = {} blk -> rev
# all changes to zf
vdf = [_.byfile[zf] for _ in t.dFtail if zf in _.byfile]
# {} blk -> at for changes ∈ (at, head]
pin = {}
for df in [_ for _ in vdf if _.rev > at]:
for blk in df.ddata:
if blk in pin:
continue
if blk in t._blkaccessed(zf):
pin[blk] = t._blkRevAt(zf, blk, at)
return pin
# iter_revv iterates through all possible at_i -> at_j -> at_k ... sequences.
# at_i < at_j NOTE all sequences go till head.
@func(tDB)
def iter_revv(t, start=z64, level=0):
dFtail = [_ for _ in t.dFtail if _.rev > start]
#print(' '*level, 'iter_revv', start, [_.rev for _ in dFtail])
if len(dFtail) == 0:
yield []
return
for dF in dFtail:
#print(' '*level, 'QQQ', dF.rev)
for tail in t.iter_revv(start=dF.rev, level=level+1):
#print(' '*level, 'zzz', tail)
yield ([dF.rev] + tail)
# -------------------------------------
# ---- actual tests to access data ----
# exercise wcfs functionality without wcfs isolation protocol.
# plain data access + wcfs handling of ZODB invalidations.
@func
def test_wcfs_basic():
t = tDB(); zf = t.zfile
defer(t.close)
# >>> lookup non-BigFile -> must be rejected
with raises(OSError) as exc:
t.wc._stat("head/bigfile/%s" % h(t.nonzfile._p_oid))
assert exc.value.errno == EINVAL
# >>> file initially empty
f = t.open(zf)
f.assertCache([])
f.assertData ([], mtime=t.at0)
# >>> (@at1) commit data -> we can see it on wcfs
at1 = t.commit(zf, {2:'c1'})
f.assertCache([0,0,0]) # initially not cached
f.assertData (['','','c1'], mtime=t.head)
# >>> (@at2) commit again -> we can see both latest and snapshotted states
# NOTE blocks d(4) and f(5) will be accessed only in the end
at2 = t.commit(zf, {2:'c2', 3:'d2', 5:'f2'})
# f @head
f.assertCache([1,1,0,0,0,0])
f.assertData (['','', 'c2', 'd2', 'x','x'], mtime=t.head)
f.assertCache([1,1,1,1,0,0])
# f @at1
f1 = t.open(zf, at=at1)
f1.assertCache([0,0,1])
f1.assertData (['','','c1']) # XXX + mtime=at1?
# >>> (@at3) commit again without changing zf size
f2 = t.open(zf, at=at2)
at3 = t.commit(zf, {2:'c3', 5:'f3'}) # FIXME + a3 after δbtree works (hole -> zblk)
f.assertCache([1,1,0,1,0,0])
# f @head is opened again -> cache must not be lost
f_ = t.open(zf)
f_.assertCache([1,1,0,1,0,0])
f_.close()
f.assertCache([1,1,0,1,0,0])
# f @head
f.assertCache([1,1,0,1,0,0])
f.assertData (['','','c3','d2','x','x'], mtime=t.head)
# f @at2
# NOTE f(2) is accessed but via @at/ not head/ ; f(2) in head/zf remains unaccessed
f2.assertCache([0,0,1,0,0,0])
f2.assertData (['','','c2','d2','','f2']) # XXX mtime=at2?
# f @at1
f1.assertCache([1,1,1])
f1.assertData (['','','c1']) # XXX mtime=at1?
# >>> f close / open again -> cache must not be lost
# XXX a bit flaky since OS can evict whole f cache under pressure
f.assertCache([1,1,1,1,0,0])
f.close()
f = t.open(zf)
if f.cached() != [1,1,1,1,0,0]:
assert sum(f.cached()) > 4*1/2 # > 50%
# verify all blocks
f.assertData(['','','c3','d2','','f3'])
f.assertCache([1,1,1,1,1,1])
# verify how wcfs processes ZODB invalidations when hole becomes a block with data.
@func
def test_wcfs_basic_hole2zblk():
t = tDB(); zf = t.zfile
defer(t.close)
f = t.open(zf)
t.commit(zf, {2:'c1'}) # b & a are holes
f.assertCache([0,0,0])
f.assertData(['','','c1'])
t.commit(zf, {1:'b2'}) # hole -> zblk
f.assertCache([1,0,1])
f.assertData(['','b2','c1'])
# XXX ZBlk copied from blk1 -> blk2 ; for the same file and for file1 -> file2 (δbtree)
# XXX ZBlk moved from blk1 -> blk2 ; for the same file and for file1 -> file2 (δbtree)
# verify that read after file size returns (0, ok)
# (the same behaviour as on e.g. ext4 and as requested by posix)
@func
def test_wcfs_basic_read_aftertail():
t = tDB(); zf = t.zfile
defer(t.close)
t.commit(zf, {2:'c1'})
f = t.open(zf)
f.assertData(['','','c1'])
def _(off): # -> bytes read from f[off +4)
buf = bytearray(4)
n = io.readat(f.f.fileno(), off, buf)
return bytes(buf[:n])
assert _(0*blksize) == b'\x00\x00\x00\x00'
assert _(1*blksize) == b'\x00\x00\x00\x00'
assert _(2*blksize) == b'c1\x00\x00'
assert _(3*blksize-4) == b'\x00\x00\x00\x00'
assert _(3*blksize-3) == b'\x00\x00\x00'
assert _(3*blksize-2) == b'\x00\x00'
assert _(3*blksize-1) == b'\x00'
assert _(3*blksize-0) == b''
assert _(3*blksize+1) == b''
assert _(3*blksize+2) == b''
assert _(3*blksize+3) == b''
assert _(4*blksize) == b''
assert _(8*blksize) == b''
assert _(100*blksize) == b''
# ---- verify wcfs functionality that depends on isolation protocol ----
# verify that watch setup is robust to client errors/misbehaviour.
@func
def test_wcfs_watch_robust():
t = tDB(); zf = t.zfile
defer(t.close)
# sysread(/head/watch) can be interrupted
p = subprocess.Popen(["%s/testprog/wcfs_readcancel.py" %
os.path.dirname(__file__), t.wc.mountpoint])
procwait(timeout(), p)
at1 = t.commit(zf, {2:'c1'})
at2 = t.commit(zf, {2:'c2'})
# file not yet opened on wcfs
wl = t.openwatch()
assert wl.sendReq(timeout(), b"watch %s @%s" % (h(zf._p_oid), h(at1))) == \
"error setup watch f<%s> @%s: " % (h(zf._p_oid), h(at1)) + \
"file not yet known to wcfs or is not a ZBigFile"
wl.close()
# closeTX/bye cancels blocked pin handlers
f = t.open(zf)
f.assertBlk(2, 'c2')
f.assertCache([0,0,1])
wl = t.openwatch()
wg = sync.WorkGroup(timeout())
def _(ctx):
# TODO clarify what wcfs should do if pin handler closes wlink TX:
# - reply error + close, or
# - just close
# t = when reviewing WatchLink.serve in wcfs.go
#assert wl.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1))) == \
# "error setup watch f<%s> @%s: " % (h(zf._p_oid), h(at1)) + \
# "pin #%d @%s: context canceled" % (2, h(at1))
#with raises(error, match="unexpected EOF"):
with raises(error, match="recvReply: link is down"):
wl.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1)))
wg.go(_)
def _(ctx):
req = wl.recvReq(ctx)
assert req is not None
assert req.msg == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1))
# don't reply to req - close instead
wl.closeWrite()
wg.go(_)
wg.wait()
wl.close()
# NOTE if wcfs.go does not fully cleanup this canceled watch and leaves it
# in half-working state, it will break on further commit, as pin to the
# watch won't be handled.
at3 = t.commit(zf, {2:'c3'})
# invalid requests -> wcfs replies error
wl = t.openwatch()
assert wl.sendReq(timeout(), b'bla bla') == \
b'error bad watch: not a watch request: "bla bla"'
# invalid request not following frame structure -> fatal + wcfs must close watch link
assert wl.fatalv == []
_twlinkwrite(wl, b'zzz hello\n')
_, _rx = select(
timeout().done().recv,
wl.rx_eof.recv,
)
if _ == 0:
raise RuntimeError("%s: did not rx EOF after bad frame " % wl)
assert wl.fatalv == [b'error: invalid frame: "zzz hello\\n" (invalid stream)']
wl.close()
# watch with @at < δtail.tail -> rejected
wl = t.openwatch()
atpast = p64(u64(t.tail)-1)
wl._watch(zf, atpast, {}, "error setup watch f<%s> @%s: too far away back from"
" head/at (@%s); …" % (h(zf._p_oid), h(atpast), h(t.head)))
wl.close()
# verify that `watch file @at` -> error, for @at when file did not existed.
@xfail # check that file exists @at
@func
def test_wcfs_watch_before_create():
t = tDB(); zf = t.zfile
defer(t.close)
at1 = t.commit(zf, {2:'c1'})
zf2 = t.root['zfile2'] = ZBigFile(blksize) # zf2 created @at2
at2 = t.commit()
at3 = t.commit(zf2, {1:'β3'})
# force wcfs to access/know zf2
f2 = t.open(zf2)
f2.assertData(['','β3'])
wl = t.openwatch()
assert wl.sendReq(timeout(), b"watch %s @%s" % (h(zf2._p_oid), h(at1))) == \
"error setup watch f<%s> @%s: " % (h(zf2._p_oid), h(at1)) + \
"file does not exist at that database state"
wl.close()
# verify that watch @at_i -> @at_j ↓ is rejected
# XXX we might want to allow going back in history later.
@func
def test_wcfs_watch_going_back():
t = tDB(); zf = t.zfile
defer(t.close)
at1 = t.commit(zf, {2:'c1'})
at2 = t.commit(zf, {2:'c2'})
f = t.open(zf)
f.assertData(['','','c2'])
wl = t.openwatch()
wl.watch(zf, at2, {})
wl.sendReq(timeout(), b"watch %s @%s" % (h(zf._p_oid), h(at1))) == \
"error setup watch f<%s> @%s: " % (h(zf._p_oid), h(at1)) + \
"going back in history is forbidden"
wl.close()
# verify that wcfs kills slow/faulty client who does not reply to pin in time.
@xfail # protection against faulty/slow clients
@func
def test_wcfs_pintimeout_kill():
# adjusted wcfs timeout to kill client who is stuck not providing pin reply
tkill = 3*time.second
t = tDB(); zf = t.zfile # XXX wcfs args += tkill
defer(t.close)
at1 = t.commit(zf, {2:'c1'})
at2 = t.commit(zf, {2:'c2'})
f = t.open(zf)
f.assertData(['','','c2'])
# XXX move into subprocess not to kill whole testing
ctx, _ = context.with_timeout(context.background(), 2*tkill)
wl = t.openwatch()
wg = sync.WorkGroup(ctx)
def _(ctx):
# send watch. The pin handler won't be replying -> we should never get reply here.
wl.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1)))
fail("watch request completed (should not as pin handler is stuck)")
wg.go(_)
def _(ctx):
req = wl.recvReq(ctx)
assert req is not None
assert req.msg == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1))
# sleep > wcfs pin timeout - wcfs must kill us
_, _rx = select(
ctx.done().recv, # 0
time.after(tkill).recv, # 1
)
if _ == 0:
raise ctx.err()
fail("wcfs did not killed stuck client")
wg.go(_)
wg.wait()
# watch with @at > head - must wait for head to become >= at.
# XXX too far ahead - reject?
@func
def test_wcfs_watch_setup_ahead():
t = tDB(); zf = t.zfile
defer(t.close)
f = t.open(zf)
at1 = t.commit(zf, {2:'c1'})
f.assertData(['','x','c1']) # NOTE #1 not accessed for watch @at1 to receive no pins
wg = sync.WorkGroup(timeout())
dt = 100*time.millisecond
committing = chan() # becomes ready when T2 starts to commit
# T1: watch @(at1+1·dt)
@func
def _(ctx):
wl = t.openwatch()
defer(wl.close)
wat = tidfromtime(tidtime(at1) + 1*dt) # > at1, but < at2
rx = wl.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(wat)))
assert ready(committing)
assert rx == b"ok"
wg.go(_)
# T2: sleep(10·dt); commit
@func
def _(ctx):
# reopen connection to database as we are committing from another thread
conn = t.root._p_jar.db().open()
defer(conn.close)
root = conn.root()
zf = root['zfile']
time.sleep(10*dt)
committing.close()
at2 = t.commit(zf, {1:'b2'})
assert tidtime(at2) - tidtime(at1) >= 10*dt
wg.go(_)
wg.wait()
# verify that watch setup/update sends correct pins.
@func
def test_wcfs_watch_setup():
t = tDB(); zf = t.zfile; at0=t.at0
defer(t.close)
f = t.open(zf)
at1 = t.commit(zf, {2:'c1'}) # XXX + hole -> zblk
at2 = t.commit(zf, {2:'c2', 3:'d2', 4:'e2', 5:'f2'})
at3 = t.commit(zf, {2:'c3', 5:'f3'})
f.assertData(['','','c3','d2','x','f3']) # access everything except e as of @at3
f.assertCache([1,1,1,1,0,1])
# change again, but don't access e and f
at4 = t.commit(zf, {2:'c4', 4:'e4', 5:'f4'})
at5 = t.commit(zf, {3:'d5', 5:'f5'})
f.assertData(['','','c4','d5','x','x'])
f.assertCache([1,1,1,1,0,0])
# some watch setup/update requests with explicit pinok (also partly
# verifies how tWatchLink.watch computes automatic pinok)
# new watch setup ø -> at
def assertNewWatch(at, pinok):
wl = t.openwatch()
wl.watch(zf, at, pinok)
wl.close()
assertNewWatch(at1, {2:at1, 3:at0, 5:at0})
assertNewWatch(at2, {2:at2, 3:at2, 5:at2})
assertNewWatch(at3, {2:at3, 3:at2, 5:at3}) # f(5) is pinned, even though it was not
assertNewWatch(at4, { 3:at2, 5:at4}) # accessed after at3
assertNewWatch(at5, { })
# new watch + update at_i -> at_j
wl = t.openwatch()
# XXX check @at0 ?
wl.watch(zf, at1, {2:at1, 3:at0, 5:at0}) # -> at1 (new watch) XXX at0 -> ø?
wl.watch(zf, at2, {2:at2, 3:at2, 5:at2}) # at1 -> at2
wl.watch(zf, at3, {2:at3, 5:at3}) # at2 -> at3
wl.watch(zf, at4, {2:None, 5:at4}) # at3 -> at4 f(5) pinned even it was not accessed >=4
wl.watch(zf, at5, { 3:None, 5:None}) # at4 -> at5 (current head)
wl.close()
# all valid watch setup/update requests going at_i -> at_j -> ... with automatic pinok
for zf in t.zfiles():
for revv in t.iter_revv():
print('\n--------')
print(' -> '.join(['%s' % _ for _ in revv])) # XXX join joins bytes as raw
wl = t.openwatch()
wl.watch(zf, revv[0])
wl.watch(zf, revv[0]) # verify at_i -> at_i
for at in revv[1:]:
wl.watch(zf, at)
wl.close()
# verify that already setup watch(es) receive correct pins on block access.
@func
def test_wcfs_watch_vs_access():
t = tDB(); zf = t.zfile; at0=t.at0
defer(t.close)
f = t.open(zf)
at1 = t.commit(zf, {2:'c1'}) # XXX + hole -> zblk
at2 = t.commit(zf, {2:'c2', 3:'d2', 5:'f2'})
at3 = t.commit(zf, {2:'c3', 5:'f3'})
f.assertData(['','','c3','d2','x','x'])
f.assertCache([1,1,1,1,0,0])
# watched + commit -> read -> receive pin messages.
# read vs pin ordering is checked by assertBlk.
#
# f(5) is kept not accessed to check later how wcfs.go handles δFtail
# rebuild after it sees not yet accessed ZBlk that has change history.
wl3 = t.openwatch(); w3 = wl3.watch(zf, at3); assert at3 == t.head
assert w3.at == at3
assert w3.pinned == {}
wl3_ = t.openwatch(); w3_ = wl3_.watch(zf, at3)
assert w3_.at == at3
assert w3_.pinned == {}
wl2 = t.openwatch(); w2 = wl2.watch(zf, at2)
assert w2.at == at2
assert w2.pinned == {2: at2}
# w_assertPin asserts on state of .pinned for {w3,w3_,w2}
def w_assertPin(pinw3, pinw3_, pinw2):
assert w3.pinned == pinw3
assert w3_.pinned == pinw3_
assert w2.pinned == pinw2
f.assertCache([1,1,1,1,0,0])
at4 = t.commit(zf, { 2:'c4', 5:'f4', 6:'g4'}) # FIXME + b4 after δbtree works + update vvv
f.assertCache([1,1,0,1,0,0,0])
f.assertBlk(0, '', {wl3: {}, wl3_: {}, wl2: {}})
w_assertPin( {}, {}, {2:at2})
f.assertBlk(1, '', {wl3: {}, wl3_: {}, wl2: {}})
w_assertPin( {}, {}, {2:at2})
f.assertBlk(2, 'c4', {wl3: {2:at3}, wl3_: {2:at3}, wl2: {}})
w_assertPin( {2:at3}, {2:at3}, {2:at2})
f.assertBlk(3, 'd2', {wl3: {}, wl3_: {}, wl2: {}})
w_assertPin( {2:at3}, {2:at3}, {2:at2})
# blk4 is hole @head - the same as at earlier db view - not pinned
f.assertBlk(4, '', {wl3: {}, wl3_: {}, wl2: {}})
w_assertPin( {2:at3}, {2:at3}, {2:at2})
# f(5) is kept unaccessed (see ^^^)
assert f.cached()[5] == 0
f.assertBlk(6, 'g4', {wl3: {6:at0}, wl3_: {6:at0}, wl2: {6:at0}}) # XXX at0->ø?
w_assertPin( {2:at3, 6:at0}, {2:at3, 6:at0}, {2:at2, 6:at0})
# commit again:
# - c(2) is already pinned -> wl3 not notified
# - watch stopped (wl3_) -> watch no longer notified
# - wlink closed (wl2) -> watch no longer notified
# - f(5) is still kept unaccessed (see ^^^)
f.assertCache([1,1,1,1,1,0,1])
at5 = t.commit(zf, {2:'c5', 3:'d5', 5:'f5'})
f.assertCache([1,1,0,0,1,0,1])
wl3_.stop_watch(zf) # w3_ should not be notified
wl2.close() # wl2:* should not be notified
def w_assertPin(pinw3):
assert w3.pinned == pinw3
assert w3_.pinned == {}; assert w3_.at == z64 # wl3_ unsubscribed from zf
assert w2.pinned == {}; assert w2.at == z64 # wl2 closed
f.assertBlk(0, '', {wl3: {}, wl3_: {}}) # no change
w_assertPin( {2:at3, 6:at0})
f.assertBlk(1, '', {wl3: {}, wl3_: {}})
w_assertPin( {2:at3, 6:at0})
f.assertBlk(2, 'c5', {wl3: {}, wl3_: {}}) # c(2) already pinned on wl3
w_assertPin( {2:at3, 6:at0})
f.assertBlk(3, 'd5', {wl3: {3:at2}, wl3_: {}}) # d(3) was not pinned on wl3; wl3_ not notified
w_assertPin( {2:at3, 3:at2, 6:at0})
f.assertBlk(4, '', {wl3: {}, wl3_: {}})
w_assertPin( {2:at3, 3:at2, 6:at0})
# f(5) is kept still unaccessed (see ^^^)
assert f.cached()[5] == 0
f.assertBlk(6, 'g4', {wl3: {}, wl3_: {}})
w_assertPin( {2:at3, 3:at2, 6:at0})
# advance watch - receives new pins/unpins to @head.
# this is also tested ^^^ in `at_i -> at_j -> ...` watch setup/adjust.
# NOTE f(5) is not affected because it was not pinned previously.
wl3.watch(zf, at4, {2:at4, 6:None}) # at3 -> at4
w_assertPin( {2:at4, 3:at2})
# access f(5) -> wl3 should be correctly pinned
assert f.cached() == [1,1,1,1,1,0,1] # f(5) was not yet accessed
f.assertBlk(5, 'f5', {wl3: {5:at4}, wl3_: {}})
w_assertPin( {2:at4, 3:at2, 5:at4})
# advance watch again
wl3.watch(zf, at5, {2:None, 3:None, 5:None}) # at4 -> at5
w_assertPin( {})
wl3.close()
# verify that on pin message, while under pagefault, we can mmap @at/f[blk]
# into where head/f[blk] was mmaped; the result of original pagefaulting read
# must be from newly inserted mapping.
#
# TODO same with two mappings to the same file, but only one changing blk mmap
# -> one read gets changed data, one read gets data from @head.
@func
def test_wcfs_remmap_on_pin():
t = tDB(); zf = t.zfile
defer(t.close)
at1 = t.commit(zf, {2:'hello'})
at2 = t.commit(zf, {2:'world'})
f = t.open(zf)
f1 = t.open(zf, at=at1)
wl = t.openwatch()
wl.watch(zf, at1, {})
f.assertCache([0,0,0])
def _(wlink, foid, blk, at):
assert wlink is wl
assert foid == zf._p_oid
assert blk == 2
assert at == at1
mm.map_into_ro(f._blk(blk), f1.f.fileno(), blk*f.blksize)
f._assertBlk(2, 'hello', {wl: {2:at1}}, pinfunc=_) # NOTE not world
# verify that pin message is not sent for the same blk@at twice.
@func
def test_wcfs_no_pin_twice():
t = tDB(); zf = t.zfile
defer(t.close)
f = t.open(zf)
at1 = t.commit(zf, {2:'c1'})
at2 = t.commit(zf, {2:'c2'})
wl = t.openwatch()
w = wl.watch(zf, at1, {})
f.assertCache([0,0,0])
f.assertBlk(2, 'c2', {wl: {2:at1}})
f.assertCache([0,0,1])
assert w.pinned == {2:at1}
# drop file[blk] from cache, access again -> no pin message sent the second time
#
# ( we need both madvise(DONTNEED) and fadvise(DONTNEED) - given only one of
# those the kernel won't release the page from pagecache; madvise does
# not work without munlock. )
mm.unlock(f._blk(2))
mm.advise(f._blk(2), mm.MADV_DONTNEED)
fadvise_dontneed(f.f.fileno(), 2*blksize, 1*blksize)
f.assertCache([0,0,0])
f.assertBlk(2, 'c2', {wl: {}})
f.assertCache([0,0,1])
# verify watching for 2 files over single watch link.
@func
def test_wcfs_watch_2files():
t = tDB(); zf1 = t.zfile
defer(t.close)
t.root['zfile2'] = zf2 = ZBigFile(blksize)
t.commit()
t.change(zf1, {0:'a2', 2:'c2'})
t.change(zf2, {1:'β2', 3:'δ2'})
at2 = t.commit()
t.change(zf1, {0:'a3', 2:'c3'})
t.change(zf2, {1:'β3', 3:'δ3'})
at3 = t.commit()
f1 = t.open(zf1)
f2 = t.open(zf2)
f1.assertData(['a3', '', 'x' ])
f2.assertData(['', 'β3', '', 'x'])
wl = t.openwatch()
w1 = wl.watch(zf1, at2, {0:at2})
w2 = wl.watch(zf2, at2, {1:at2})
def w_assertPin(pinw1, pinw2):
assert w1.pinned == pinw1
assert w2.pinned == pinw2
w_assertPin( {0:at2}, {1:at2})
f1.assertBlk(2, 'c3', {wl: {2:at2}})
w_assertPin( {0:at2, 2:at2}, {1:at2})
f2.assertBlk(3, 'δ3', {wl: {3:at2}})
w_assertPin( {0:at2, 2:at2}, {1:at2, 3:at2})
wl.watch(zf1, at3, {0:None, 2:None})
w_assertPin( {}, {1:at2, 3:at2})
wl.watch(zf2, at3, {1:None, 3:None})
w_assertPin( {}, {})
# XXX new watch request while previous watch request is in progress (over the same /head/watch handle)
# XXX @revX/ is automatically removed after some time
# ---- misc ---
# readfile reads file @ path.
def readfile(path):
with open(path) as f:
return f.read()
# writefile writes data to file @ path.
def writefile(path, data):
with open(path, "w") as f:
f.write(data)
# tidtime converts tid to transaction commit time.
def tidtime(tid):
t = TimeStamp(tid).timeTime()
# ZODB/py vs ZODB/go time resolution is not better than 1µs
# see e.g. https://lab.nexedi.com/kirr/neo/commit/9112f21e
#
# NOTE pytest.approx supports only ==, not e.g. <, so we use plain round.
return round(t, 6)
# tidfromtime converts time into corresponding transaction ID.
def tidfromtime(t):
f = t - int(t) # fraction of seconds
t = int(t)
_ = gmtime(t)
s = _.tm_sec + f # total seconds
ts = TimeStamp(_.tm_year, _.tm_mon, _.tm_mday, _.tm_hour, _.tm_min, s)
return ts.raw()
# verify that tidtime is precise enough to show difference in between transactions.
# verify that tidtime -> tidfromtime is identity within rounding tolerance.
@func
def test_tidtime():
t = tDB()
defer(t.close)
# tidtime not rough
atv = [t.commit()]
for i in range(10):
at = t.commit()
assert tidtime(at) > tidtime(atv[-1])
atv.append(at)
# tidtime -> tidfromtime
for at in atv:
tat = tidtime(at)
at_ = tidfromtime(tat)
tat_ = tidtime(at_)
assert abs(tat_ - tat) <= 2E-6
# tAt is bytes whose repr returns human readable string considering it as `at` under tDB.
#
# It gives both symbolic version and raw hex forms, for example:
# @at2 (03cf7850500b5f66)
#
# tAt is used everywhere with the idea that e.g. if an assert comparing at, or
# e.g. dicts containing at, fails, everything is printed in human readable
# form instead of raw hex that is hard to visibly map to logical transaction.
class tAt(bytes):
def __new__(cls, tdb, at):
tat = bytes.__new__(cls, at)
tat.tdb = tdb
return tat
def __repr__(at):
t = at.tdb
for i, dF in enumerate(t.dFtail):
if dF.rev == at:
return "@at%d (%s)" % (i, h(at))
return "@" + h(at)
__str__ = __repr__
# hpin returns human-readable representation for {}blk->rev.
@func(tDB)
def hpin(t, pin):
pinv = []
for blk in sorted(pin.keys()):
if pin[blk] is None:
s = '@head'
else:
s = '%s' % pin[blk]
pinv.append('%d: %s' % (blk, s))
return '{%s}' % ', '.join(pinv)
# zfiles returns ZBigFiles that were ever changed under t.
@func(tDB)
def zfiles(t):
zfs = set()
for dF in t.dFtail:
for zf in dF.byfile:
if zf not in zfs:
zfs.add(zf)
return zfs
# dump_history prints t's change history in tabular form.
#
# the output is useful while developing or analyzing a test failure: to get
# overview of how file(s) are changed in tests.
@func(tDB)
def dump_history(t):
print('>>> Change history by file:')
for zf in t.zfiles():
print('\nf<%s>:' % h(zf._p_oid))
indent = '\t%s\t' % (' '*len('%s' % t.head),)
print('%s%s' % (indent, ' '.join('01234567')))
print('%s%s' % (indent, ' '.join('abcdefgh')))
for dF in t.dFtail:
df = dF.byfile.get(zf)
emitv = []
if df is not None:
dblk = set(df.ddata.keys())
for blk in range(max(dblk)+1):
if blk in dblk:
emitv.append('%d' % blk)
else:
emitv.append(' ')
print('\t%s\t%s' % (dF.rev, ' '.join(emitv)))
print()
# ready reports whether chan ch is ready.
def ready(ch):
_, _rx = select(
default, # 0
ch.recv, # 1
)
return bool(_)
# procwait waits for a process (subprocess.Popen) to terminate.
def procwait(ctx, proc):
wg = sync.WorkGroup(ctx)
def _(ctx):
while 1:
if ready(ctx.done()):
raise ctx.err()
ret = proc.poll()
if ret is not None:
return
tdelay()
wg.go(_)
wg.wait()
# procwait_, similarly to procwait, waits for a process (subprocess.Popen) to terminate.
#
# it returns bool whether process terminated or not - e.g. due to context being canceled.
def procwait_(ctx, proc): # -> ok
try:
procwait(ctx, proc)
except Exception as e:
if errors.Is(e, context.canceled) or errors.Is(e, context.deadlineExceeded):
return False
raise
return True
# is_mountpoint returns whether path is a mountpoint
def is_mountpoint(path): # -> bool
# NOTE we don't call mountpoint directly on path, because if FUSE
# fileserver failed, the mountpoint will also fail and print ENOTCONN
try:
_ = os.lstat(path)
except OSError as e:
if e.errno == ENOENT:
return False
# "Transport endpoint is not connected" -> it is a failed FUSE server
# (XXX we can also grep /proc/mounts)
if e.errno == ENOTCONN:
return True
raise
if not S_ISDIR(_.st_mode):
return False
mounted = (0 == subprocess.call(["mountpoint", "-q", path]))
return mounted
# eprint prints msg to stderr
def eprint(msg):
print(msg, file=sys.stderr)
...@@ -67,74 +67,11 @@ type zBlk interface { ...@@ -67,74 +67,11 @@ type zBlk interface {
// returns data and revision of ZBlk. // returns data and revision of ZBlk.
loadBlkData(ctx context.Context) (data []byte, rev zodb.Tid, _ error) loadBlkData(ctx context.Context) (data []byte, rev zodb.Tid, _ error)
// inΔFtail returns pointer to struct zblkInΔFtail embedded into this ZBlk.
// inΔFtail() *zblkInΔFtail
// XXX kill - in favour of inΔFtail
/*
// bindFile associates ZBlk as being used by file to store block #blk.
//
// A ZBlk may be bound to several blocks inside one file, and to
// several files.
//
// The information is preserved even when ZBlk comes to ghost
// state, but is lost if ZBlk is garbage collected.
//
// it is safe to call multiple bindFile simultaneously.
// it is not safe to call bindFile and boundTo simultaneously.
//
// XXX link to overview.
bindFile(file *BigFile, blk int64)
// XXX unbindFile
// XXX zfile -> bind map for it
// blkBoundTo returns ZBlk association with file(s)/#blk(s).
//
// The association returned is that was previously set by bindFile.
//
// blkBoundTo must not be called simultaneously wrt bindFile.
blkBoundTo() map[*BigFile]SetI64
*/
} }
var _ zBlk = (*ZBlk0)(nil) var _ zBlk = (*ZBlk0)(nil)
var _ zBlk = (*ZBlk1)(nil) var _ zBlk = (*ZBlk1)(nil)
// XXX kill
/*
// ---- zBlkBase ----
// zBlkBase provides common functionality to implement ZBlk* -> zfile, #blk binding.
//
// The data stored by zBlkBase is transient - it is _not_ included into
// persistent state.
type zBlkBase struct {
bindMu sync.Mutex // used only for binding to support multiple loaders
infile map[*BigFile]SetI64 // {} file -> set(#blk)
}
// bindFile implements zBlk.
func (zb *zBlkBase) bindFile(file *BigFile, blk int64) {
zb.bindMu.Lock()
defer zb.bindMu.Unlock()
blkmap, ok := zb.infile[file]
if !ok {
blkmap = make(SetI64, 1)
if zb.infile == nil {
zb.infile = make(map[*BigFile]SetI64)
}
zb.infile[file] = blkmap
}
blkmap.Add(blk)
}
// blkBoundTo implementss zBlk.
func (zb *zBlkBase) blkBoundTo() map[*BigFile]SetI64 {
return zb.infile
}
*/
// ---- ZBlk0 ---- // ---- ZBlk0 ----
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment