Commit b221b0b6 authored by Kirill Smelkov's avatar Kirill Smelkov

Merge branch 't' into t2

* t:
  .
  X wcfs: Fix ZSync to close wconn on zdb.close, even if zconn stays alive
  X lib/zodb: Connection += onShutdownCallback
  .
  X wcfs: lsof +D misbehaves - don't use it
  X wcfs: _fuse_unmount: Try first `kill -TERM` before `kill -QUIT` wcfs
  X wcfs: Tune _fuse_unmount to include `fusermount -u` error message into raised exception
  X wcfs: Teach start to start successfully even after unclean wcfs shutdown
  fixup! X wcfs: Run fusermount and friends with /bin:/usr/bin always on path
  X wcfs: Run fusermount and friends with /bin:/usr/bin always on path
  X wcfs: Add start to spawn a Server that can be later stopped  (draft)
parents 68f6e672 c144b4a4
......@@ -46,8 +46,18 @@ def transaction_reset():
# nothing to run after test
# Before pytest exits, teardown WCFS(s) that we automatically spawned during
# test runs in bigfile/bigarray/...
# enable log_cli on no-capture
# (output during a test is a mixture of print and log)
def pytest_configure(config):
if config.option.capture == "no":
config.inicfg['log_cli'] = "true"
assert config.getini("log_cli") is True
if config.option.verbose:
config.inicfg['log_cli_level'] = "INFO"
# Before pytest exits, teardown WCFS server(s) that we automatically spawned
# during test runs in bigfile/bigarray/...
#
# If we do not do this, spawned wcfs servers are left running _and_ connected
# by stdout to nxdtest input - which makes nxdtest to wait for them to exit.
......@@ -58,18 +68,13 @@ def pytest_unconfigure(config):
gc.collect()
from wendelin import wcfs
for wc in wcfs._wcstarted:
if wc._proc.poll() is not None:
continue # this wcfs server already exited
for wc in wcfs._wcautostarted:
# NOTE: defer instead of direct call - to call all wc.close if there
# was multiple wc spawned, and proceeding till the end even if any
# particular call raises exception.
defer(partial(_wcclose, wc))
defer(partial(_wcclose_and_stop, wc))
def _wcclose(wc):
from wendelin.wcfs.wcfs_test import tWCFS
print("# unmount/stop wcfs pid%d @ %s" % (wc._proc.pid, wc.mountpoint))
twc = tWCFS(wc=wc)
twc.close()
@func
def _wcclose_and_stop(wc):
defer(wc._wcsrv.stop)
defer(wc.close)
......@@ -28,6 +28,7 @@ from BTrees.IOBTree import IOBTree
import transaction
from transaction import TransactionManager
from golang import defer, func
import weakref, gc
from pytest import raises
import pytest; xfail = pytest.mark.xfail
......@@ -354,6 +355,40 @@ def test_zodb_onresync():
conn.close()
# verify that ZODB.Connection.onShutdownCallback works
@func
def test_zodb_onshutdown():
stor = testdb.getZODBStorage()
defer(stor.close)
db = DB(stor)
class T:
def __init__(t):
t.nshutdown = 0
def on_connection_shutdown(t):
t.nshutdown += 1
t1 = T()
t2 = T()
# conn1 stays alive outside of db.pool
conn1 = db.open()
conn1.onShutdownCallback(t1)
# conn2 stays alive inside db.pool
conn2 = db.open()
conn2.onShutdownCallback(t2)
conn2.close()
assert t1.nshutdown == 0
assert t2.nshutdown == 0
# db.close triggers conn1 and conn2 shutdown
db.close()
assert t1.nshutdown == 1
assert t2.nshutdown == 1
# test that zurl does not change from one open to another storage open.
def test_zurlstable():
if not isinstance(testdb, (testing.TestDB_FileStorage, testing.TestDB_ZEO, testing.TestDB_NEO)):
......
......@@ -296,6 +296,30 @@ else:
raise AssertionError("ZODB3 is not supported anymore")
# patch for ZODB.Connection to support callback on after database is closed
ZODB.Connection.Connection._onShutdownCallbacks = None
def Connection_onShutdownCallback(self, f):
if self._onShutdownCallbacks is None:
# NOTE WeakSet does not work for bound methods - they are always created
# anew for each obj.method access, and thus will go away almost immediately
self._onShutdownCallbacks = WeakSet()
self._onShutdownCallbacks.add(f)
assert not hasattr(ZODB.Connection.Connection, 'onShutdownCallback')
ZODB.Connection.Connection.onShutdownCallback = Connection_onShutdownCallback
_orig_DB_close = ZODB.DB.close
def _ZDB_close(self):
# the same code for ZODB3/4/5
@self._connectionMap
def _(conn):
if conn._onShutdownCallbacks:
for f in conn._onShutdownCallbacks:
f.on_connection_shutdown()
_orig_DB_close(self)
ZODB.DB.close = _ZDB_close
# zstor_2zurl converts a ZODB storage to URL to access it.
def zstor_2zurl(zstor):
......
......@@ -20,6 +20,9 @@
"""Module wcfs.py provides python gateway for spawning and interoperating with wcfs server.
Serve(zurl) starts and runs WCFS server for ZODB at zurl.
Start(zurl) starts WCFS server for ZODB at zurl and returns corresponding Server object.
Join(zurl) joins wcfs server for ZODB at zurl and returns WCFS object that
represents filesystem-level connection to joined wcfs server. If wcfs server
for zurl is not yet running, it will be automatically started if join is given
......@@ -58,18 +61,20 @@ The following environment variables can be used to control wcfs.py client:
no join: don't spawn wcfs server unless explicitly requested via autostart=True
$WENDELIN_CORE_WCFS_OPTIONS
"" additional options to pass to wcfs server when spawned it
"" serve/start/join: additional options to pass to wcfs server when spawning it
"""
from __future__ import print_function, absolute_import
import os, sys, hashlib, subprocess, time, stat
import os, sys, hashlib, subprocess, stat
import logging as log
from os.path import dirname
from stat import S_ISDIR
from errno import ENOENT, ENOTCONN, EEXIST
from signal import SIGTERM, SIGQUIT, SIGKILL
from golang import chan, select, default, func, defer
from golang import sync, context
from golang import context, errors, sync, time
from golang.gcompat import qq
from persistent import Persistent
......@@ -82,6 +87,17 @@ from .client._wcfs import \
PyPinReq as PinReq \
# Server represents running wcfs server.
#
# Use start to create it.
class Server:
# .mountpoint path to wcfs mountpoint
# ._proc wcfs process
# ._fuseabort opened /sys/fs/fuse/connections/X/abort for this server
# ._stopOnce
pass
# WCFS represents filesystem-level connection to wcfs server.
#
# Use join to create it.
......@@ -98,7 +114,7 @@ class WCFS(_WCFS):
# ._fwcfs /.wcfs/zurl opened to keep the server from going away (at least cleanly)
# ._njoin this connection was returned for so many joins
# ._proc wcfs process if it was opened by this WCFS | None
# ._wcsrv wcfs Server if it was opened by this WCFS | None
pass
......@@ -143,15 +159,15 @@ def _open(wc, obj, mode='rb', at=None):
# ---- join/run wcfs ----
_wcmu = sync.Mutex()
_wcregistry = {} # mntpt -> WCFS
_wcstarted = [] # of WCFS for wcfs we ever _start'ed (for tests)
_wcregistry = {} # mntpt -> WCFS
_wcautostarted = [] # of WCFS, with ._wcsrv != None, for wcfs we ever autostart'ed (for tests)
@func(WCFS)
def __init__(wc, mountpoint, fwcfs, proc):
def __init__(wc, mountpoint, fwcfs, wcsrv):
wc.mountpoint = mountpoint
wc._fwcfs = fwcfs
wc._njoin = 1
wc._proc = proc
wc._wcsrv = wcsrv
# close must be called to release joined connection after it is no longer needed.
@func(WCFS)
......@@ -192,19 +208,10 @@ def join(zurl, autostart=_default_autostart()): # -> WCFS
return wc
# no. try opening .wcfs - if we succeed - wcfs is already running.
unclean = False
try:
f = open(mntpt + "/.wcfs/zurl")
except IOError as e:
if e.errno == ENOENT: # wcfs cleanly unmounted
pass
elif e.errno == ENOTCONN: # wcfs crashed/killed
unclean = True
else:
raise
else:
fwcfs, trylockstartf = _try_attach_wcsrv(mntpt)
if fwcfs is not None:
# already have it
wc = WCFS(mntpt, f, None)
wc = WCFS(mntpt, fwcfs, None)
_wcregistry[mntpt] = wc
return wc
......@@ -212,29 +219,84 @@ def join(zurl, autostart=_default_autostart()): # -> WCFS
raise RuntimeError("wcfs: join %s: server not running" % zurl)
# start wcfs with telling it to automatically exit when there is no client activity.
trylockstartf() # XXX retry access if another wcfs was started in the meantime
wcsrv, fwcfs = _start(zurl, "-autoexit")
wc = WCFS(mntpt, fwcfs, wcsrv)
_wcautostarted.append(wc)
assert mntpt not in _wcregistry
_wcregistry[mntpt] = wc
return wc
# _try_attach_wcsrv tries to attach to running wcfs server.
#
# if successful, it returns fwcfs - opened file handle for /.wcfs/zurl
# if unsuccessful, it returns fwcfs=None, and trylockstartf function that can
# be used to prepare to start new WCFS server.
def _try_attach_wcsrv(mntpt): # -> (fwcfs, trylockstartf)
# try opening .wcfs - if we succeed - wcfs is already running.
unclean = False
try:
fwcfs = open(mntpt + "/.wcfs/zurl")
except IOError as e:
if e.errno == ENOENT: # wcfs cleanly unmounted
pass
elif e.errno == ENOTCONN: # wcfs crashed/killed
unclean = True
else:
raise
else:
return (fwcfs, None)
# the server is not running.
# return func to prepare start of another wcfs server
def trylockstartf():
# XXX race window if external process starts after ^^^ check
# TODO -> fs-level locking
if unclean:
_fuse_unmount(mntpt)
return _start(zurl, "-autoexit")
return (None, trylockstartf)
# _start starts wcfs server for ZODB @ zurl.
# start starts wcfs server for ZODB @ zurl.
#
# optv can be optionally given to pass flags to wcfs.
# called under _wcmu.
def _start(zurl, *optv): # -> WCFS
def start(zurl, *optv): # -> Server
# verify that wcfs is not already running
mntpt = _mntpt_4zurl(zurl)
log.info("wcfs: starting for %s ...", zurl)
fwcfs, trylockstartf = _try_attach_wcsrv(mntpt)
if fwcfs is not None:
fwcfs.close()
raise RuntimeError("wcfs: start %s: already running" % zurl)
# seems to be ok to start
trylockstartf() # XXX -> "already running" if lock fails
wcsrv, fwcfs = _start(zurl, *optv)
fwcfs.close()
return wcsrv
# _optv_with_wcfs_defaults returns optv prepended with default WCFS options taken from environment.
def _optv_with_wcfs_defaults(optv): # -> optv
optv_defaults = os.environ.get("WENDELIN_CORE_WCFS_OPTIONS", "").split()
optv = tuple(optv_defaults) + optv
return tuple(optv_defaults) + tuple(optv)
# _start serves start and join.
@func
def _start(zurl, *optv): # -> Server, fwcfs
mntpt = _mntpt_4zurl(zurl)
optv = _optv_with_wcfs_defaults(optv)
log.info("wcfs: starting for %s ...", zurl)
# XXX errctx "wcfs: start"
# spawn wcfs and wait till filesystem-level access to it is ready
wc = WCFS(mntpt, None, None)
_wcstarted.append(wc)
wcsrv = Server(mntpt, None, None)
wg = sync.WorkGroup(context.background())
fsready = chan(dtype='C.structZ')
def _(ctx):
......@@ -256,24 +318,36 @@ def _start(zurl, *optv): # -> WCFS
raise ctx.err()
if _ == 1:
# startup was ok - don't monitor spawned wcfs any longer
wc._proc = proc
wcsrv._proc = proc
return
time.sleep(0.1)
time.sleep(0.1*time.second)
wg.go(_)
def _(ctx):
# XXX errctx "waitmount"
fwcfs = _waitmount(ctx, zurl, mntpt)
wc._fwcfs = fwcfs
wcsrv._fwcfs = fwcfs
fsready.close()
wg.go(_)
wg.wait()
log.info("wcfs: started pid%d @ %s", wcsrv._proc.pid, mntpt)
fwcfs = wcsrv._fwcfs
del wcsrv._fwcfs
# open fuse abort control file
# shutdown wcsrv if that open fails
try:
x = os.minor(os.stat(wcsrv.mountpoint).st_dev)
wcsrv._fuseabort = open("/sys/fs/fuse/connections/%d/abort" % x, "wb")
except:
defer(wcsrv.stop)
defer(fwcfs.close)
raise
assert mntpt not in _wcregistry
_wcregistry[mntpt] = wc
return wc
return wcsrv, fwcfs
# _waitmount waits for wcfs filesystem for zurl @mntpt to become ready.
def _waitmount(ctx, zurl, mntpt): # -> fwcfs
......@@ -298,7 +372,100 @@ def _waitmount(ctx, zurl, mntpt): # -> fwcfs
if _ == 0:
raise ctx.err()
time.sleep(0.1)
time.sleep(0.1*time.second)
@func(Server)
def __init__(wcsrv, mountpoint, proc, ffuseabort):
wcsrv.mountpoint = mountpoint
wcsrv._proc = proc
wcsrv._fuseabort = ffuseabort
wcsrv._stopOnce = sync.Once()
# stop shutdowns the server.
@func(Server)
def stop(wcsrv, ctx=None):
if ctx is None:
ctx, cancel = context.with_timeout(context.background(), 10*time.second)
defer(cancel)
wcsrv._stop(ctx)
@func(Server)
def _stop(wcsrv, ctx, _onstuck=None):
def _():
wcsrv.__stop(ctx, _onstuck)
wcsrv._stopOnce.do(_)
@func(Server)
def __stop(wcsrv, ctx, _onstuck):
log.info("wcfs: unmount/stop wcfs pid%d @ %s", wcsrv._proc.pid, wcsrv.mountpoint)
deadline = ctx.deadline()
if deadline is None:
deadline = float('inf')
timeoutTotal = (deadline - time.now())
if timeoutTotal < 0:
timeoutTotal = 0.
# timeoutFrac returns ctx with `timeout ~= fraction·totalTimeout`
# however if the context is already cancelled, returned timeout is 0.1s to
# give chance for an operation to complete.
def timeoutFrac(fraction):
if _ready(ctx.done()):
tctx, _ = context.with_timeout(context.background(), 0.1*time.second)
else:
tctx, _ = context.with_timeout(ctx, fraction*timeoutTotal)
return tctx
# unmount and wait for wcfs to exit
# kill wcfs and abort FUSE connection if clean unmount fails
def _():
if wcsrv._fuseabort is not None:
wcsrv._fuseabort.close()
defer(_)
@func
def _():
# kill wcfs.go in case it is deadlocked and does not exit by itself
if _procwait_(timeoutFrac(0.5), wcsrv._proc):
return
log.warn("wcfs.go does not exit (after SIGTERM)")
log.warn("-> kill -QUIT wcfs.go ...")
os.kill(wcsrv._proc.pid, SIGQUIT)
if _procwait_(timeoutFrac(0.25), wcsrv._proc):
return
log.warn("wcfs.go does not exit (after SIGQUIT)")
log.warn("-> kill -KILL wcfs.go ...")
os.kill(wcsrv._proc.pid, SIGKILL)
if _procwait_(timeoutFrac(0.25), wcsrv._proc):
return
log.warn("wcfs.go does not exit (after SIGKILL; probably it is stuck in kernel)")
log.warn("-> nothing we can do...") # XXX dump /proc/pid/task/*/stack instead (ignore EPERM)
if _onstuck is not None:
_onstuck()
else:
_procwait(context.background(), wcsrv._proc)
defer(_)
try:
if _is_mountpoint(wcsrv.mountpoint): # could be unmounted from outside
_fuse_unmount(wcsrv.mountpoint)
except:
# if clean unmount failed -> kill -TERM wcfs and force abort of fuse connection.
#
# aborting fuse connection is needed in case wcfs/kernel will be stuck
# in a deadlock even after being `kill -9`. See comments in tWCFS for details.
def _():
log.warn("-> kill -TERM wcfs.go ...")
os.kill(wcsrv._proc.pid, SIGTERM)
if wcsrv._fuseabort is not None:
log.warn("-> abort FUSE connection ...")
wcsrv._fuseabort.write(b"1\n")
wcsrv._fuseabort.flush()
defer(_)
raise
# ---- misc ----
......@@ -333,7 +500,7 @@ def _mntpt_4zurl(zurl):
if _mkdir_p(wcfsroot):
os.chmod(wcfsroot, wcfsmode)
else:
# migration workaround for the situation when /tmp/wcfs was created by
# migration workaround for the situation when /dev/shm/wcfs was created by
# code that did not yet set sticky bit.
_ = os.stat(wcfsroot)
if _.st_uid == os.getuid():
......@@ -355,9 +522,126 @@ def _mkdir_p(path, mode=0o777): # -> created(bool)
return False
return True
# fusermount -u.
# _fuse_unmount calls `fusermount -u` + logs details if unmount failed.
@func
def _fuse_unmount(mntpt):
subprocess.check_call(["fusermount", "-u", mntpt])
ret, out = _sysproccallout(["fusermount", "-u", mntpt])
if ret != 0:
# unmount failed, usually due to "device is busy".
# Log which files are still opened and reraise
def _():
log.warn("# lsof %s" % mntpt)
# -w to avoid lots of
# lsof: WARNING: can't stat() fuse.wcfs file system /dev/shm/wcfs/X
# Output information may be incomplete.
# if there are other uncleaned wcfs mountpoints.
# (lsof stats all filesystems on startup)
# NOTE lsof +D misbehaves - don't use it
ret, out = _sysproccallout(["lsof", "-w", mntpt])
log.warn(out)
if ret:
log.warn("(lsof failed)")
defer(_)
out = out.rstrip() # kill trailing \n\n
emsg = "fuse_unmount %s: failed: %s" % (mntpt, out)
log.warn(emsg)
raise RuntimeError("%s\n(more details logged)" % emsg)
# _is_mountpoint returns whether path is a mountpoint
def _is_mountpoint(path): # -> bool
# NOTE we don't call mountpoint directly on path, because if FUSE
# fileserver failed, the mountpoint will also fail and print ENOTCONN
try:
_ = os.lstat(path)
except OSError as e:
if e.errno == ENOENT:
return False
# "Transport endpoint is not connected" -> it is a failed FUSE server
# (XXX we can also grep /proc/mounts)
if e.errno == ENOTCONN:
return True
raise
if not S_ISDIR(_.st_mode):
return False
mounted = (0 == _sysproccall(["mountpoint", "-q", path]))
return mounted
# _sysproc creates subprocess.Popen for "system" command.
#
# System commands are those that reside either in /bin or /usr/bin and which
# should be found even if $PATH does no contain those directories. For example
# runUnitTest in ERP5 sets $PATH without /bin, and this way executing
# fusermount via subprocess.Popen instead of _sysproc would fail.
def _sysproc(argv, **kw): # -> subprocess.Popen
env = kw.get('env', None)
if env is None:
env = os.environ
env = env.copy()
path = env.get('PATH', '')
if path:
path += ':'
path += '/bin:/usr/bin'
env['PATH'] = path
return subprocess.Popen(argv, env=env, close_fds=True, **kw)
# _sysproccall calls _sysproc and waits for spawned program to complete.
def _sysproccall(argv, **kw): # -> retcode
return _sysproc(argv, **kw).wait()
# _sysproccallout calls _sysproc, waits for spawned program to complete and returns combined out/err.
def _sysproccallout(argv, **kw): # -> retcode, output
proc = _sysproc(argv, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw)
out, _ = proc.communicate()
return proc.returncode, out
# _procwait waits for a process (subprocess.Popen) to terminate.
def _procwait(ctx, proc):
_waitfor(ctx, lambda: proc.poll() is not None)
# _procwait_, similarly to _procwait, waits for a process (subprocess.Popen) to terminate.
#
# it returns bool whether process terminated or not - e.g. due to context being canceled.
def _procwait_(ctx, proc): # -> ok
return _waitfor_(ctx, lambda: proc.poll() is not None)
# _waitfor waits for condf() to become true.
def _waitfor(ctx, condf):
wg = sync.WorkGroup(ctx)
def _(ctx):
while 1:
if _ready(ctx.done()):
raise ctx.err()
if condf():
return
time.sleep(10*time.millisecond)
wg.go(_)
wg.wait()
# _waitfor_, similarly to _waitfor, waits for condf() to become true.
#
# it returns bool whether target condition was reached or not - e.g. due to
# context being canceled.
def _waitfor_(ctx, condf): # -> ok
try:
_waitfor(ctx, condf)
except Exception as e:
if errors.Is(e, context.canceled) or errors.Is(e, context.deadlineExceeded):
return False
raise
return True
# _ready reports whether chan ch is ready.
def _ready(ch):
_, _rx = select(
default, # 0
ch.recv, # 1
)
return bool(_)
# serve starts and runs wcfs server for ZODB @ zurl.
......@@ -373,30 +657,18 @@ def _fuse_unmount(mntpt):
# serve(zurl, exec_=False).
def serve(zurl, optv, exec_=False, _tstartingq=None):
mntpt = _mntpt_4zurl(zurl)
# XXX take $WENDELIN_CORE_WCFS_OPTIONS into account?
optv = _optv_with_wcfs_defaults(optv)
log.info("wcfs: serving %s ...", zurl)
# try opening .wcfs - it is an error if we can do it.
# XXX -> option to wcfs itself to verify wcfs/something is already mounted?
unclean = False
try:
f = open(mntpt + "/.wcfs/zurl")
except IOError as e:
if e.errno == ENOENT: # wcfs cleanly unmounted
pass
elif e.errno == ENOTCONN: # wcfs crashed/killed
unclean = True
else:
raise
else:
f.close()
raise RuntimeError("wcfs: start %s: already running" % zurl)
fwcfs, trylockstartf = _try_attach_wcsrv(mntpt)
if fwcfs is not None:
fwcfs.close()
raise RuntimeError("wcfs: serve %s: already running" % zurl)
# seems to be ok to start
# XXX race window if external process starts after ^^^ check
# TODO -> fs-level locking
if unclean:
_fuse_unmount(mntpt)
trylockstartf() # XXX -> "already running" if lock fails
if _tstartingq is not None:
_tstartingq.close()
argv = [_wcfs_exe()] + list(optv) + [zurl, mntpt]
......
......@@ -58,7 +58,7 @@ cdef wcfs.PyConn pywconnOf(zconn):
zconn._wcfs_wconn = wconn
# keep wconn view of the database in sync with zconn
# wconn and wc (= wconn.wc) will be closed when zconn is garbage-collected
# wconn and wc (= wconn.wc) will be closed when zconn is garbage-collected or shutdown via DB.close
_ZSync(zconn, wconn)
return wconn
......@@ -66,8 +66,8 @@ cdef wcfs.PyConn pywconnOf(zconn):
# _ZSync keeps wconn in sync with zconn.
#
# wconn will be closed once zconn is destroyed (not closed, which returns it
# back into DB pool).
# wconn will be closed once zconn is garbage-collected (not closed, which
# returns it back into DB pool), or once zconn.db is closed.
#
# _ZSync cares itself to stay alive as long as zconn stays alive.
_zsyncReg = {} # id(zsync) -> zsync (protected by GIL)
......@@ -79,8 +79,12 @@ class _ZSync:
#print('ZSync: setup %r <-> %r' % (wconn, zconn))
assert zconn.opened
zsync.wconn = wconn
# notify us on zconn GC
zsync.zconn_ref = weakref.ref(zconn, zsync.on_zconn_dealloc)
# notify us on zconn.db.close
zconn.onShutdownCallback(zsync)
# notify us when zconn changes its view of the database
# NOTE zconn.onOpenCallback is not enough: zconn.at can change even
# without zconn.close/zconn.open, e.g.:
# zconn = DB.open(transaction_manager=tm)
......@@ -98,8 +102,14 @@ class _ZSync:
if 1: # = `with gil:` (GIL already held in python code)
_zsyncReg[id(zsync)] = zsync
# .zconn dealloc -> wconn.close; release zsync
def on_zconn_dealloc(zsync, _):
# _release1 closes .wconn and releases zsync once.
def _release1(zsync):
# unregister zsync from being kept alive
if 1: # = `with gil:` (see note in __init__)
_ = _zsyncReg.pop(id(zsync), None)
if _ is None:
return # another call already done/is simultaneously doing release1
#print('ZSync: sched break %r <-> .' % (zsync.wconn,))
# schedule wconn.close() + wconn.wc.close()
_zsync_wclose_wg.add(1)
......@@ -112,9 +122,13 @@ class _ZSync:
_zsync_releaseq.send(zsync.wconn)
"""
# unregister zsync from being kept alive
if 1: # = `with gil:` (see note in __init__)
del _zsyncReg[id(zsync)]
# .zconn dealloc -> wconn.close; release zsync.
def on_zconn_dealloc(zsync, _):
zsync._release1()
# DB.close -> wconn.close; release zsync.
def on_connection_shutdown(zsync):
zsync._release1()
# DB resyncs .zconn onto new database view.
# -> resync .wconn to updated database view of ZODB connection.
......
......@@ -39,12 +39,10 @@ def setup_module():
def teardown_module():
testdb.teardown()
# verify that ZSync keeps wconn in sync wrt zconn.
@func
def test_zsync():
zstor = testdb.getZODBStorage()
defer(zstor.close)
# _zsync_setup setups up DB, zconn and wconn _ZSync'ed to zconn.
@func
def _zsync_setup(zstor): # -> (db, zconn, wconn)
zurl = zstor_2zurl(zstor)
# create new DB that we'll precisely control
......@@ -53,7 +51,6 @@ def test_zsync():
at0 = zconn_at(zconn)
# create wconn
wc = wcfs.join(zurl)
wc_njoin0 = wc._njoin
wconn = wc.connect(at0)
assert wconn.at() == at0
# setup ZSync for wconn <-> zconn; don't keep zsync explicitly referenced
......@@ -61,8 +58,63 @@ def test_zsync():
_ZSync(zconn, wconn)
assert wconn.at() == at0
return db, zconn, wconn
# verify that ZSync closes wconn when db is closed.
@func
def test_zsync_db_close():
zstor = testdb.getZODBStorage()
defer(zstor.close)
db, zconn, wconn = _zsync_setup(zstor)
defer(wconn.close)
# close db -> ZSync should close wconn and wc even though zconn stays referenced
wc_njoin0 = wconn.wc._njoin
db.close()
_zsync_wclose_wg.wait()
# NOTE db and zconn are still alive - not GC'ed
with raises(error, match=": connection closed"):
wconn.open(p64(0))
assert wconn.wc._njoin == (wc_njoin0 - 1)
# verify that ZSync closes wconn when zconn is garbage-collected.
@func
def test_zsync_zconn_gc():
zstor = testdb.getZODBStorage()
defer(zstor.close)
db, zconn, wconn = _zsync_setup(zstor)
defer(wconn.close)
# del zconn -> zconn should disappear and ZSync should close wconn and wc
zconn_weak = weakref.ref(zconn)
assert zconn_weak() is not None
wc_njoin0 = wconn.wc._njoin
del zconn
# NOTE db stays alive and not closed
gc.collect()
assert zconn_weak() is None
_zsync_wclose_wg.wait()
with raises(error, match=": connection closed"):
wconn.open(p64(0))
assert wconn.wc._njoin == (wc_njoin0 - 1)
# verify that ZSync keeps wconn in sync wrt zconn.
@func
def test_zsync_resync():
zstor = testdb.getZODBStorage()
defer(zstor.close)
db, zconn, wconn = _zsync_setup(zstor)
defer(db.close)
# commit something - ZSync should resync wconn to updated db state
at0 = zconn_at(zconn)
assert wconn.at() == at0
root = zconn.root()
root['tzync'] = 1
transaction.commit()
......@@ -97,13 +149,3 @@ def test_zsync():
assert zconn_weak() is zconn
assert zconn_at(zconn) == at2
assert wconn.at() == at2
# close db -> zconn should disappear and ZSync should close wconn and wc
del zconn
db.close()
gc.collect()
assert zconn_weak() is None
_zsync_wclose_wg.wait()
with raises(error, match=": connection closed"):
wconn.open(p64(0))
assert wc._njoin == wc_njoin0 - 1
......@@ -29,7 +29,8 @@ from __future__ import print_function, absolute_import
from golang import func, defer, error, b
from wendelin.bigfile.file_zodb import ZBigFile
from wendelin.wcfs.wcfs_test import tDB, tAt, timeout, waitfor_, eprint
from wendelin.wcfs.wcfs_test import tDB, tAt, timeout, eprint
from wendelin.wcfs import _waitfor_ as waitfor_
from wendelin.wcfs import wcfs_test
from wendelin.wcfs.internal.wcfs_test import read_mustfault
from wendelin.wcfs.internal import mm
......
......@@ -53,9 +53,8 @@ cdef class _tWCFS:
# but pin handler is failing one way or another - select will wake-up
# but, if _abort_ontimeout uses GIL, won't continue to run trying to lock
# GIL -> deadlock.
def _abort_ontimeout(_tWCFS t, double dt, pychan nogilready not None):
def _abort_ontimeout(_tWCFS t, int fdabort, double dt, pychan nogilready not None):
cdef chan[double] timeoutch = time.after(dt)
cdef int fdabort = t._wcfuseabort.fileno()
emsg1 = "\nC: test timed out after %.1fs\n" % (dt / time.second)
cdef char *_emsg1 = emsg1
with nogil:
......
......@@ -43,9 +43,7 @@ from ZODB.utils import z64, u64, p64
import sys, os, os.path, subprocess
from thread import get_ident as gettid
from time import gmtime
from errno import EINVAL, ENOENT, ENOTCONN
from stat import S_ISDIR
from signal import SIGQUIT, SIGKILL
from errno import EINVAL, ENOTCONN
from resource import setrlimit, getrlimit, RLIMIT_MEMLOCK
from golang import go, chan, select, func, defer, default, error, b
from golang import context, errors, sync, time
......@@ -55,6 +53,7 @@ from pytest import raises, fail
from wendelin.wcfs.internal import io, mm
from wendelin.wcfs.internal.wcfs_test import _tWCFS, read_nogil, install_sigbus_trap, fadvise_dontneed
from wendelin.wcfs.client._wcfs import _tpywlinkwrite as _twlinkwrite
from wendelin.wcfs import _is_mountpoint as is_mountpoint, _procwait as procwait, _ready as ready
# setup:
......@@ -113,26 +112,9 @@ def teardown_function(f):
procmounts_lookup_wcfs(testzurl)
# fuse_unmount unmounts FUSE filesystem mounted @ mntpt.
@func
def fuse_unmount(mntpt):
assert is_mountpoint(mntpt)
try:
wcfs._fuse_unmount(mntpt)
except subprocess.CalledProcessError:
# unmount failed, usually due to "device is busy".
# Print which files are still opened and reraise
def _():
print("# lsof %s" % mntpt)
# -w to avoid lots of
# lsof: WARNING: can't stat() fuse.wcfs file system /tmp/wcfs/X
# Output information may be incomplete.
# if there are other uncleaned wcfs mountpoints.
# (lsof stats all filesystems on startup)
# XXX -> better use `fuser -m <mntpt>` (it says it will limit search to files only under mntpt) ?
subprocess.check_call(["lsof", "-w", "+D", mntpt])
defer(_)
raise
wcfs._fuse_unmount(mntpt)
# ---- test join/autostart/serve ----
......@@ -149,13 +131,18 @@ def test_join():
assert wcfs._wcregistry == {}
defer(_)
wc = wcfs._start(zurl)
wcsrv = wcfs.start(zurl)
defer(wcsrv.stop)
assert wcsrv.mountpoint == testmntpt
assert readfile(wcsrv.mountpoint + "/.wcfs/zurl") == zurl
assert os.path.isdir(wcsrv.mountpoint + "/head")
assert os.path.isdir(wcsrv.mountpoint + "/head/bigfile")
wc = wcfs.join(zurl, autostart=False)
defer(wc.close)
assert wc.mountpoint == testmntpt
assert wc.mountpoint == wcsrv.mountpoint
assert wc._njoin == 1
assert readfile(wc.mountpoint + "/.wcfs/zurl") == zurl
assert os.path.isdir(wc.mountpoint + "/head")
assert os.path.isdir(wc.mountpoint + "/head/bigfile")
assert wc._wcsrv is None
wc2 = wcfs.join(zurl, autostart=False)
defer(wc2.close)
......@@ -211,6 +198,28 @@ def test_join_after_crash():
procmounts_lookup_wcfs(zurl)
# verify that start successfuly starts server if previous wcfs exited uncleanly.
@func
def test_start_after_crash():
zurl = testzurl
mntpt = testmntpt
wc = start_and_crash_wcfs(zurl, mntpt)
wcsrv = wcfs.start(zurl)
defer(wcsrv.stop)
assert wcsrv.mountpoint == mntpt
assert readfile(mntpt + "/.wcfs/zurl") == zurl
# /proc/mounts should contain wcfs entry
assert procmounts_lookup_wcfs(zurl) == mntpt
# stop the server - /proc/mounts entry should be gone
wcsrv.stop()
with raises(KeyError):
procmounts_lookup_wcfs(zurl)
# verify that serve successfully starts if previous wcfs exited uncleanly.
@func
def test_serve_after_crash():
......@@ -247,7 +256,11 @@ def start_and_crash_wcfs(zurl, mntpt): # -> WCFS
procmounts_lookup_wcfs(zurl)
# start the server with attached client
wc = wcfs._start(zurl)
wcsrv = wcfs.start(zurl)
assert wcsrv.mountpoint == mntpt
assert mntpt not in wcfs._wcregistry
wc = wcfs.join(zurl, autostart=False)
assert wcfs._wcregistry[mntpt] is wc
assert wc.mountpoint == mntpt
assert readfile(mntpt + "/.wcfs/zurl") == zurl
......@@ -257,8 +270,8 @@ def start_and_crash_wcfs(zurl, mntpt): # -> WCFS
# kill the server
wc._proc.kill() # sends SIGKILL
assert wc._proc.wait() != 0
wcsrv._proc.kill() # sends SIGKILL
assert wcsrv._proc.wait() != 0
# access to filesystem should raise "Transport endpoint not connected"
with raises(IOError) as exc:
......@@ -335,14 +348,12 @@ class DFile:
# XXX print -> t.trace/debug() + t.verbose depending on py.test -v -v ?
class tWCFS(_tWCFS):
@func
def __init__(t, wc=None):
# `wc != None` is used to create tWCFS on existing WCFS connection
if wc is None:
assert not os.path.exists(testmntpt)
wc = wcfs.join(testzurl, autostart=True)
assert wc.mountpoint == testmntpt
assert os.path.exists(wc.mountpoint)
assert is_mountpoint(wc.mountpoint)
def __init__(t):
assert not os.path.exists(testmntpt)
wc = wcfs.join(testzurl, autostart=True)
assert wc.mountpoint == testmntpt
assert os.path.exists(wc.mountpoint)
assert is_mountpoint(wc.mountpoint)
t.wc = wc
# force-unmount wcfs on timeout to unstuck current test and let it fail.
......@@ -351,60 +362,35 @@ class tWCFS(_tWCFS):
# cases, when wcfs, even after receiving `kill -9`, will be stuck in kernel.
# ( git.kernel.org/linus/a131de0a482a makes in-kernel FUSE client to
# still wait for request completion even after fatal signal )
t._wcfuseabort = open("/sys/fs/fuse/connections/%d/abort" % os.minor(os.stat(t.wc.mountpoint).st_dev), "w")
nogilready = chan(dtype='C.structZ')
go(t._abort_ontimeout, 10*time.second, nogilready) # NOTE must be: with_timeout << · << wcfs_pin_timeout
t._wcfuseabort = os.dup(wc._wcsrv._fuseabort.fileno())
go(t._abort_ontimeout, t._wcfuseabort, 10*time.second, nogilready) # NOTE must be: with_timeout << · << wcfs_pin_timeout
nogilready.recv() # wait till _abort_ontimeout enters nogil
# close closes connection to wcfs, unmounts the filesystem and makes sure
# that wcfs server exits.
@func
def close(t):
defer(t._wcfuseabort.close)
def _():
os.close(t._wcfuseabort)
defer(t._closed.close)
# unmount and wait for wcfs to exit
def _():
# run `fusermount -u` the second time after if wcfs was killed to
# cleanup /proc/mounts.
if is_mountpoint(t.wc.mountpoint):
fuse_unmount(t.wc.mountpoint)
assert not is_mountpoint(t.wc.mountpoint)
os.rmdir(t.wc.mountpoint)
defer(_)
@func
def _():
# kill wcfs.go in case it is deadlocked and does not exit by itself
if procwait_(timeout(), t.wc._proc):
return
# run `fusermount -u` the second time after we kill wcfs to cleanup
# /proc/mounts and avoid `assert not is_mountpoint` above.
def _():
if is_mountpoint(t.wc.mountpoint):
fuse_unmount(t.wc.mountpoint)
defer(_)
eprint("\nC: wcfs.go does not exit")
eprint("-> kill -QUIT wcfs.go ...\n")
os.kill(t.wc._proc.pid, SIGQUIT)
if procwait_(timeout(), t.wc._proc):
return
eprint("\nC: wcfs.go does not exit (after SIGQUIT)")
eprint("-> kill -KILL wcfs.go ...\n")
os.kill(t.wc._proc.pid, SIGKILL)
if procwait_(timeout(), t.wc._proc):
return
eprint("\nC: wcfs.go does not exit (after SIGKILL; probably it is stuck in kernel)")
eprint("-> nothing we can do...\n") # XXX dump /proc/pid/task/*/stack instead (ignore EPERM)
fail("wcfs.go does not exit even after SIGKILL")
defer(_)
def _():
#if not ready(t._wcfuseaborted): XXX kill _wcfuseaborted ?
# assert 0 == subprocess.call(["mountpoint", "-q", t.wc.mountpoint])
assert is_mountpoint(t.wc.mountpoint)
fuse_unmount(t.wc.mountpoint)
def onstuck():
fail("wcfs.go does not exit even after SIGKILL")
t.wc._wcsrv._stop(timeout(), _onstuck=onstuck)
defer(_)
t.wc.close()
defer(t.wc.close)
assert is_mountpoint(t.wc.mountpoint)
class tDB(tWCFS):
......@@ -1971,72 +1957,6 @@ def dump_history(t):
print()
# ready reports whether chan ch is ready.
def ready(ch):
_, _rx = select(
default, # 0
ch.recv, # 1
)
return bool(_)
# procwait waits for a process (subprocess.Popen) to terminate.
def procwait(ctx, proc):
waitfor(ctx, lambda: proc.poll() is not None)
# procwait_, similarly to procwait, waits for a process (subprocess.Popen) to terminate.
#
# it returns bool whether process terminated or not - e.g. due to context being canceled.
def procwait_(ctx, proc): # -> ok
return waitfor_(ctx, lambda: proc.poll() is not None)
# waitfor waits for condf() to become true.
def waitfor(ctx, condf):
wg = sync.WorkGroup(ctx)
def _(ctx):
while 1:
if ready(ctx.done()):
raise ctx.err()
if condf():
return
tdelay()
wg.go(_)
wg.wait()
# waitfor_, similarly to waitfor, waits for condf() to become true.
#
# it returns bool whether target condition was reached or not - e.g. due to
# context being canceled.
def waitfor_(ctx, proc): # -> ok
try:
waitfor(ctx, proc)
except Exception as e:
if errors.Is(e, context.canceled) or errors.Is(e, context.deadlineExceeded):
return False
raise
return True
# is_mountpoint returns whether path is a mountpoint
def is_mountpoint(path): # -> bool
# NOTE we don't call mountpoint directly on path, because if FUSE
# fileserver failed, the mountpoint will also fail and print ENOTCONN
try:
_ = os.lstat(path)
except OSError as e:
if e.errno == ENOENT:
return False
# "Transport endpoint is not connected" -> it is a failed FUSE server
# (XXX we can also grep /proc/mounts)
if e.errno == ENOTCONN:
return True
raise
if not S_ISDIR(_.st_mode):
return False
mounted = (0 == subprocess.call(["mountpoint", "-q", path]))
return mounted
# procmounts_lookup_wcfs returns /proc/mount entry for wcfs mounted to serve zurl.
def procmounts_lookup_wcfs(zurl): # -> mountpoint | KeyError
for line in readfile('/proc/mounts').splitlines():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment