Commit 5bfa8cf8 authored by Kirill Smelkov's avatar Kirill Smelkov

X wcfs: Add start to spawn a Server that can be later stopped (draft)

Implement Server.stop using most of the functionality that was
previously living in tWCFS test part.
parent 4188ea9e
...@@ -46,8 +46,18 @@ def transaction_reset(): ...@@ -46,8 +46,18 @@ def transaction_reset():
# nothing to run after test # nothing to run after test
# Before pytest exits, teardown WCFS(s) that we automatically spawned during # enable log_cli on no-capture
# test runs in bigfile/bigarray/... # (output during a test is a mixture of print and log)
def pytest_configure(config):
if config.option.capture == "no":
config.inicfg['log_cli'] = "true"
assert config.getini("log_cli") is True
if config.option.verbose:
config.inicfg['log_cli_level'] = "INFO"
# Before pytest exits, teardown WCFS server(s) that we automatically spawned
# during test runs in bigfile/bigarray/...
# #
# If we do not do this, spawned wcfs servers are left running _and_ connected # If we do not do this, spawned wcfs servers are left running _and_ connected
# by stdout to nxdtest input - which makes nxdtest to wait for them to exit. # by stdout to nxdtest input - which makes nxdtest to wait for them to exit.
...@@ -58,18 +68,13 @@ def pytest_unconfigure(config): ...@@ -58,18 +68,13 @@ def pytest_unconfigure(config):
gc.collect() gc.collect()
from wendelin import wcfs from wendelin import wcfs
for wc in wcfs._wcstarted: for wc in wcfs._wcautostarted:
if wc._proc.poll() is not None:
continue # this wcfs server already exited
# NOTE: defer instead of direct call - to call all wc.close if there # NOTE: defer instead of direct call - to call all wc.close if there
# was multiple wc spawned, and proceeding till the end even if any # was multiple wc spawned, and proceeding till the end even if any
# particular call raises exception. # particular call raises exception.
defer(partial(_wcclose, wc)) defer(partial(_wcclose_and_stop, wc))
@func
def _wcclose(wc): def _wcclose_and_stop(wc):
from wendelin.wcfs.wcfs_test import tWCFS defer(wc._wcsrv.stop)
print("# unmount/stop wcfs pid%d @ %s" % (wc._proc.pid, wc.mountpoint)) defer(wc.close)
twc = tWCFS(wc=wc)
twc.close()
...@@ -25,6 +25,9 @@ represents filesystem-level connection to joined wcfs server. If wcfs server ...@@ -25,6 +25,9 @@ represents filesystem-level connection to joined wcfs server. If wcfs server
for zurl is not yet running, it will be automatically started if join is given for zurl is not yet running, it will be automatically started if join is given
`autostart=True` option. `autostart=True` option.
Serve(zurl) starts and runs WCFS server for ZODB at zurl.
Start(zurl) starts WCFS server for ZODB at zurl and returns corresponding Server object.
The rest of wcfs.py merely wraps C++ wcfs client package: The rest of wcfs.py merely wraps C++ wcfs client package:
- `WCFS` represents filesystem-level connection to wcfs server. - `WCFS` represents filesystem-level connection to wcfs server.
...@@ -58,18 +61,20 @@ The following environment variables can be used to control wcfs.py client: ...@@ -58,18 +61,20 @@ The following environment variables can be used to control wcfs.py client:
no join: don't spawn wcfs server unless explicitly requested via autostart=True no join: don't spawn wcfs server unless explicitly requested via autostart=True
$WENDELIN_CORE_WCFS_OPTIONS $WENDELIN_CORE_WCFS_OPTIONS
"" additional options to pass to wcfs server when spawned it "" join/start: additional options to pass to wcfs server when spawning it
""" """
from __future__ import print_function, absolute_import from __future__ import print_function, absolute_import
import os, sys, hashlib, subprocess, time, stat import os, sys, hashlib, subprocess, stat
import logging as log import logging as log
from os.path import dirname from os.path import dirname
from stat import S_ISDIR
from errno import ENOENT, ENOTCONN, EEXIST from errno import ENOENT, ENOTCONN, EEXIST
from signal import SIGQUIT, SIGKILL
from golang import chan, select, default, func, defer from golang import chan, select, default, func, defer
from golang import sync, context from golang import context, errors, sync, time
from golang.gcompat import qq from golang.gcompat import qq
from persistent import Persistent from persistent import Persistent
...@@ -98,7 +103,18 @@ class WCFS(_WCFS): ...@@ -98,7 +103,18 @@ class WCFS(_WCFS):
# ._fwcfs /.wcfs/zurl opened to keep the server from going away (at least cleanly) # ._fwcfs /.wcfs/zurl opened to keep the server from going away (at least cleanly)
# ._njoin this connection was returned for so many joins # ._njoin this connection was returned for so many joins
# ._proc wcfs process if it was opened by this WCFS | None # ._wcsrv wcfs Server if it was opened by this WCFS | None
pass
# Server represents running wcfs server.
#
# Use start to create it.
class Server:
# .mountpoint path to wcfs mountpoint
# ._proc wcfs process
# ._fuseabort opened /sys/fs/fuse/connections/X/abort for this server
# ._stopOnce
pass pass
...@@ -143,15 +159,15 @@ def _open(wc, obj, mode='rb', at=None): ...@@ -143,15 +159,15 @@ def _open(wc, obj, mode='rb', at=None):
# ---- join/run wcfs ---- # ---- join/run wcfs ----
_wcmu = sync.Mutex() _wcmu = sync.Mutex()
_wcregistry = {} # mntpt -> WCFS _wcregistry = {} # mntpt -> WCFS
_wcstarted = [] # of WCFS for wcfs we ever _start'ed (for tests) _wcautostarted = [] # of WCFS, with ._wcsrv != None, for wcfs we ever autostart'ed (for tests)
@func(WCFS) @func(WCFS)
def __init__(wc, mountpoint, fwcfs, proc): def __init__(wc, mountpoint, fwcfs, wcsrv):
wc.mountpoint = mountpoint wc.mountpoint = mountpoint
wc._fwcfs = fwcfs wc._fwcfs = fwcfs
wc._njoin = 1 wc._njoin = 1
wc._proc = proc wc._wcsrv = wcsrv
# close must be called to release joined connection after it is no longer needed. # close must be called to release joined connection after it is no longer needed.
@func(WCFS) @func(WCFS)
...@@ -216,14 +232,28 @@ def join(zurl, autostart=_default_autostart()): # -> WCFS ...@@ -216,14 +232,28 @@ def join(zurl, autostart=_default_autostart()): # -> WCFS
# TODO -> fs-level locking # TODO -> fs-level locking
if unclean: if unclean:
_fuse_unmount(mntpt) _fuse_unmount(mntpt)
return _start(zurl, "-autoexit")
wcsrv, fwcfs = _start(zurl, "-autoexit")
wc = WCFS(mntpt, fwcfs, wcsrv)
_wcautostarted.append(wc)
assert mntpt not in _wcregistry
_wcregistry[mntpt] = wc
return wc
# _start starts wcfs server for ZODB @ zurl. # start starts wcfs server for ZODB @ zurl.
# #
# optv can be optionally given to pass flags to wcfs. # optv can be optionally given to pass flags to wcfs.
# called under _wcmu. def start(zurl, *optv): # -> Server
def _start(zurl, *optv): # -> WCFS # XXX check already started?
wcsrv, fwcfs = _start(zurl, *optv)
fwcfs.close()
return wcsrv
# _start serves start and join.
@func
def _start(zurl, *optv): # -> Server, fwcfs
mntpt = _mntpt_4zurl(zurl) mntpt = _mntpt_4zurl(zurl)
log.info("wcfs: starting for %s ...", zurl) log.info("wcfs: starting for %s ...", zurl)
...@@ -233,8 +263,7 @@ def _start(zurl, *optv): # -> WCFS ...@@ -233,8 +263,7 @@ def _start(zurl, *optv): # -> WCFS
# XXX errctx "wcfs: start" # XXX errctx "wcfs: start"
# spawn wcfs and wait till filesystem-level access to it is ready # spawn wcfs and wait till filesystem-level access to it is ready
wc = WCFS(mntpt, None, None) wcsrv = Server(mntpt, None, None)
_wcstarted.append(wc)
wg = sync.WorkGroup(context.background()) wg = sync.WorkGroup(context.background())
fsready = chan(dtype='C.structZ') fsready = chan(dtype='C.structZ')
def _(ctx): def _(ctx):
...@@ -256,24 +285,36 @@ def _start(zurl, *optv): # -> WCFS ...@@ -256,24 +285,36 @@ def _start(zurl, *optv): # -> WCFS
raise ctx.err() raise ctx.err()
if _ == 1: if _ == 1:
# startup was ok - don't monitor spawned wcfs any longer # startup was ok - don't monitor spawned wcfs any longer
wc._proc = proc wcsrv._proc = proc
return return
time.sleep(0.1) time.sleep(0.1*time.second)
wg.go(_) wg.go(_)
def _(ctx): def _(ctx):
# XXX errctx "waitmount" # XXX errctx "waitmount"
fwcfs = _waitmount(ctx, zurl, mntpt) fwcfs = _waitmount(ctx, zurl, mntpt)
wc._fwcfs = fwcfs wcsrv._fwcfs = fwcfs
fsready.close() fsready.close()
wg.go(_) wg.go(_)
wg.wait() wg.wait()
log.info("wcfs: started pid%d @ %s", wcsrv._proc.pid, mntpt)
assert mntpt not in _wcregistry fwcfs = wcsrv._fwcfs
_wcregistry[mntpt] = wc del wcsrv._fwcfs
return wc
# open fuse abort control file
# shutdown wcsrv if that open fails
try:
x = os.minor(os.stat(wcsrv.mountpoint).st_dev)
wcsrv._fuseabort = open("/sys/fs/fuse/connections/%d/abort" % x, "wb")
except:
defer(wcsrv.stop)
defer(fwcfs.close)
raise
return wcsrv, fwcfs
# _waitmount waits for wcfs filesystem for zurl @mntpt to become ready. # _waitmount waits for wcfs filesystem for zurl @mntpt to become ready.
def _waitmount(ctx, zurl, mntpt): # -> fwcfs def _waitmount(ctx, zurl, mntpt): # -> fwcfs
...@@ -298,7 +339,97 @@ def _waitmount(ctx, zurl, mntpt): # -> fwcfs ...@@ -298,7 +339,97 @@ def _waitmount(ctx, zurl, mntpt): # -> fwcfs
if _ == 0: if _ == 0:
raise ctx.err() raise ctx.err()
time.sleep(0.1) time.sleep(0.1*time.second)
@func(Server)
def __init__(wcsrv, mountpoint, proc, ffuseabort):
wcsrv.mountpoint = mountpoint
wcsrv._proc = proc
wcsrv._fuseabort = ffuseabort
wcsrv._stopOnce = sync.Once()
# stop shutdowns the server.
@func(Server)
def stop(wcsrv, ctx=None):
if ctx is None:
ctx, cancel = context.with_timeout(context.background(), 10*time.second)
defer(cancel)
wcsrv._stop(ctx)
@func(Server)
def _stop(wcsrv, ctx, _onstuck=None):
def _():
wcsrv.__stop(ctx, _onstuck)
wcsrv._stopOnce.do(_)
@func(Server)
def __stop(wcsrv, ctx, _onstuck):
log.info("wcfs: unmount/stop wcfs pid%d @ %s", wcsrv._proc.pid, wcsrv.mountpoint)
deadline = ctx.deadline()
if deadline is None:
deadline = float('inf')
timeoutTotal = (deadline - time.now())
if timeoutTotal < 0:
timeoutTotal = 0.
# timeoutFrac returns ctx with `timeout ~= fraction·totalTimeout`
# however if the context is already cancelled, returned timeout is 0.1s to
# give chance for an operation to complete.
def timeoutFrac(fraction):
if _ready(ctx.done()):
tctx, _ = context.with_timeout(context.background(), 0.1*time.second)
else:
tctx, _ = context.with_timeout(ctx, fraction*timeoutTotal)
return tctx
# unmount and wait for wcfs to exit
def _():
if wcsrv._fuseabort is not None:
wcsrv._fuseabort.close()
defer(_)
@func
def _():
# kill wcfs.go in case it is deadlocked and does not exit by itself
if _procwait_(timeoutFrac(0.5), wcsrv._proc):
return
log.warn("\nC: wcfs.go does not exit")
log.warn("-> kill -QUIT wcfs.go ...\n")
os.kill(wcsrv._proc.pid, SIGQUIT)
if _procwait_(timeoutFrac(0.25), wcsrv._proc):
return
log.warn("\nC: wcfs.go does not exit (after SIGQUIT)")
log.warn("-> kill -KILL wcfs.go ...\n")
os.kill(wcsrv._proc.pid, SIGKILL)
if _procwait_(timeoutFrac(0.25), wcsrv._proc):
return
log.warn("\nC: wcfs.go does not exit (after SIGKILL; probably it is stuck in kernel)")
log.warn("-> nothing we can do...\n") # XXX dump /proc/pid/task/*/stack instead (ignore EPERM)
if _onstuck is not None:
_onstuck()
else:
_procwait(context.background(), wcsrv._proc)
defer(_)
@func
def _():
try:
if _is_mountpoint(wcsrv.mountpoint): # could be unmounted from outside
_fuse_unmount(wcsrv.mountpoint)
except:
# if clean unmount failed -> force abort of fuse connection
def _():
if wcsrv._fuseabort is not None:
log.warn("aborting FUSE connection ...")
wcsrv._fuseabort.write(b"1\n")
wcsrv._fuseabort.flush()
defer(_)
raise
defer(_)
# ---- misc ---- # ---- misc ----
...@@ -355,9 +486,97 @@ def _mkdir_p(path, mode=0o777): # -> created(bool) ...@@ -355,9 +486,97 @@ def _mkdir_p(path, mode=0o777): # -> created(bool)
return False return False
return True return True
# fusermount -u. # _fuse_unmount calls `fusermount -u` + logs details if unmount failed.
@func
def _fuse_unmount(mntpt): def _fuse_unmount(mntpt):
subprocess.check_call(["fusermount", "-u", mntpt]) try:
subprocess.check_call(["fusermount", "-u", mntpt], close_fds=True)
except subprocess.CalledProcessError:
# unmount failed, usually due to "device is busy".
# Log which files are still opened and reraise
def _():
log.warn("fuse_unmount %s failed", mntpt)
log.warn("# lsof %s" % mntpt)
# -w to avoid lots of
# lsof: WARNING: can't stat() fuse.wcfs file system /tmp/wcfs/X
# Output information may be incomplete.
# if there are other uncleaned wcfs mountpoints.
# (lsof stats all filesystems on startup)
# XXX -> better use `fuser -m <mntpt>` (it says it will limit search to files only under mntpt) ?
lsof = subprocess.Popen(["lsof", "-w", "+D", mntpt],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out, _ = lsof.communicate()
log.warn(out)
if lsof.returncode:
log.warn("(lsof failed)")
defer(_)
raise
# _is_mountpoint returns whether path is a mountpoint
def _is_mountpoint(path): # -> bool
# NOTE we don't call mountpoint directly on path, because if FUSE
# fileserver failed, the mountpoint will also fail and print ENOTCONN
try:
_ = os.lstat(path)
except OSError as e:
if e.errno == ENOENT:
return False
# "Transport endpoint is not connected" -> it is a failed FUSE server
# (XXX we can also grep /proc/mounts)
if e.errno == ENOTCONN:
return True
raise
if not S_ISDIR(_.st_mode):
return False
mounted = (0 == subprocess.call(["mountpoint", "-q", path]))
return mounted
# _procwait waits for a process (subprocess.Popen) to terminate.
def _procwait(ctx, proc):
_waitfor(ctx, lambda: proc.poll() is not None)
# _procwait_, similarly to _procwait, waits for a process (subprocess.Popen) to terminate.
#
# it returns bool whether process terminated or not - e.g. due to context being canceled.
def _procwait_(ctx, proc): # -> ok
return _waitfor_(ctx, lambda: proc.poll() is not None)
# _waitfor waits for condf() to become true.
def _waitfor(ctx, condf):
wg = sync.WorkGroup(ctx)
def _(ctx):
while 1:
if _ready(ctx.done()):
raise ctx.err()
if condf():
return
time.sleep(10*time.millisecond)
wg.go(_)
wg.wait()
# _waitfor_, similarly to _waitfor, waits for condf() to become true.
#
# it returns bool whether target condition was reached or not - e.g. due to
# context being canceled.
def _waitfor_(ctx, condf): # -> ok
try:
_waitfor(ctx, condf)
except Exception as e:
if errors.Is(e, context.canceled) or errors.Is(e, context.deadlineExceeded):
return False
raise
return True
# _ready reports whether chan ch is ready.
def _ready(ch):
_, _rx = select(
default, # 0
ch.recv, # 1
)
return bool(_)
# serve starts and runs wcfs server for ZODB @ zurl. # serve starts and runs wcfs server for ZODB @ zurl.
......
...@@ -29,7 +29,8 @@ from __future__ import print_function, absolute_import ...@@ -29,7 +29,8 @@ from __future__ import print_function, absolute_import
from golang import func, defer, error, b from golang import func, defer, error, b
from wendelin.bigfile.file_zodb import ZBigFile from wendelin.bigfile.file_zodb import ZBigFile
from wendelin.wcfs.wcfs_test import tDB, tAt, timeout, waitfor_, eprint from wendelin.wcfs.wcfs_test import tDB, tAt, timeout, eprint
from wendelin.wcfs import _waitfor_ as waitfor_
from wendelin.wcfs import wcfs_test from wendelin.wcfs import wcfs_test
from wendelin.wcfs.internal.wcfs_test import read_mustfault from wendelin.wcfs.internal.wcfs_test import read_mustfault
from wendelin.wcfs.internal import mm from wendelin.wcfs.internal import mm
......
...@@ -53,9 +53,8 @@ cdef class _tWCFS: ...@@ -53,9 +53,8 @@ cdef class _tWCFS:
# but pin handler is failing one way or another - select will wake-up # but pin handler is failing one way or another - select will wake-up
# but, if _abort_ontimeout uses GIL, won't continue to run trying to lock # but, if _abort_ontimeout uses GIL, won't continue to run trying to lock
# GIL -> deadlock. # GIL -> deadlock.
def _abort_ontimeout(_tWCFS t, double dt, pychan nogilready not None): def _abort_ontimeout(_tWCFS t, int fdabort, double dt, pychan nogilready not None):
cdef chan[double] timeoutch = time.after(dt) cdef chan[double] timeoutch = time.after(dt)
cdef int fdabort = t._wcfuseabort.fileno()
emsg1 = "\nC: test timed out after %.1fs\n" % (dt / time.second) emsg1 = "\nC: test timed out after %.1fs\n" % (dt / time.second)
cdef char *_emsg1 = emsg1 cdef char *_emsg1 = emsg1
with nogil: with nogil:
......
...@@ -43,9 +43,7 @@ from ZODB.utils import z64, u64, p64 ...@@ -43,9 +43,7 @@ from ZODB.utils import z64, u64, p64
import sys, os, os.path, subprocess import sys, os, os.path, subprocess
from thread import get_ident as gettid from thread import get_ident as gettid
from time import gmtime from time import gmtime
from errno import EINVAL, ENOENT, ENOTCONN from errno import EINVAL, ENOTCONN
from stat import S_ISDIR
from signal import SIGQUIT, SIGKILL
from resource import setrlimit, getrlimit, RLIMIT_MEMLOCK from resource import setrlimit, getrlimit, RLIMIT_MEMLOCK
from golang import go, chan, select, func, defer, default, error, b from golang import go, chan, select, func, defer, default, error, b
from golang import context, errors, sync, time from golang import context, errors, sync, time
...@@ -55,6 +53,7 @@ from pytest import raises, fail ...@@ -55,6 +53,7 @@ from pytest import raises, fail
from wendelin.wcfs.internal import io, mm from wendelin.wcfs.internal import io, mm
from wendelin.wcfs.internal.wcfs_test import _tWCFS, read_nogil, install_sigbus_trap, fadvise_dontneed from wendelin.wcfs.internal.wcfs_test import _tWCFS, read_nogil, install_sigbus_trap, fadvise_dontneed
from wendelin.wcfs.client._wcfs import _tpywlinkwrite as _twlinkwrite from wendelin.wcfs.client._wcfs import _tpywlinkwrite as _twlinkwrite
from wendelin.wcfs import _is_mountpoint as is_mountpoint, _procwait as procwait, _ready as ready
# setup: # setup:
...@@ -113,26 +112,9 @@ def teardown_function(f): ...@@ -113,26 +112,9 @@ def teardown_function(f):
procmounts_lookup_wcfs(testzurl) procmounts_lookup_wcfs(testzurl)
# fuse_unmount unmounts FUSE filesystem mounted @ mntpt. # fuse_unmount unmounts FUSE filesystem mounted @ mntpt.
@func
def fuse_unmount(mntpt): def fuse_unmount(mntpt):
assert is_mountpoint(mntpt) assert is_mountpoint(mntpt)
try: wcfs._fuse_unmount(mntpt)
wcfs._fuse_unmount(mntpt)
except subprocess.CalledProcessError:
# unmount failed, usually due to "device is busy".
# Print which files are still opened and reraise
def _():
print("# lsof %s" % mntpt)
# -w to avoid lots of
# lsof: WARNING: can't stat() fuse.wcfs file system /tmp/wcfs/X
# Output information may be incomplete.
# if there are other uncleaned wcfs mountpoints.
# (lsof stats all filesystems on startup)
# XXX -> better use `fuser -m <mntpt>` (it says it will limit search to files only under mntpt) ?
subprocess.check_call(["lsof", "-w", "+D", mntpt])
defer(_)
raise
# ---- test join/autostart/serve ---- # ---- test join/autostart/serve ----
...@@ -149,13 +131,18 @@ def test_join(): ...@@ -149,13 +131,18 @@ def test_join():
assert wcfs._wcregistry == {} assert wcfs._wcregistry == {}
defer(_) defer(_)
wc = wcfs._start(zurl) wcsrv = wcfs.start(zurl)
defer(wcsrv.stop)
assert wcsrv.mountpoint == testmntpt
assert readfile(wcsrv.mountpoint + "/.wcfs/zurl") == zurl
assert os.path.isdir(wcsrv.mountpoint + "/head")
assert os.path.isdir(wcsrv.mountpoint + "/head/bigfile")
wc = wcfs.join(zurl, autostart=False)
defer(wc.close) defer(wc.close)
assert wc.mountpoint == testmntpt assert wc.mountpoint == wcsrv.mountpoint
assert wc._njoin == 1 assert wc._njoin == 1
assert readfile(wc.mountpoint + "/.wcfs/zurl") == zurl assert wc._wcsrv is None
assert os.path.isdir(wc.mountpoint + "/head")
assert os.path.isdir(wc.mountpoint + "/head/bigfile")
wc2 = wcfs.join(zurl, autostart=False) wc2 = wcfs.join(zurl, autostart=False)
defer(wc2.close) defer(wc2.close)
...@@ -239,6 +226,9 @@ def test_serve_after_crash(): ...@@ -239,6 +226,9 @@ def test_serve_after_crash():
assert procmounts_lookup_wcfs(zurl) == mntpt assert procmounts_lookup_wcfs(zurl) == mntpt
# XXX test_start_after_crash?
# start_and_crash_wcfs starts wcfs and then kills it. # start_and_crash_wcfs starts wcfs and then kills it.
# it returns closed WCFS connection that was connected to the killed WCFS server. # it returns closed WCFS connection that was connected to the killed WCFS server.
def start_and_crash_wcfs(zurl, mntpt): # -> WCFS def start_and_crash_wcfs(zurl, mntpt): # -> WCFS
...@@ -247,7 +237,11 @@ def start_and_crash_wcfs(zurl, mntpt): # -> WCFS ...@@ -247,7 +237,11 @@ def start_and_crash_wcfs(zurl, mntpt): # -> WCFS
procmounts_lookup_wcfs(zurl) procmounts_lookup_wcfs(zurl)
# start the server with attached client # start the server with attached client
wc = wcfs._start(zurl) wcsrv = wcfs.start(zurl)
assert wcsrv.mountpoint == mntpt
assert mntpt not in wcfs._wcregistry
wc = wcfs.join(zurl, autostart=False)
assert wcfs._wcregistry[mntpt] is wc assert wcfs._wcregistry[mntpt] is wc
assert wc.mountpoint == mntpt assert wc.mountpoint == mntpt
assert readfile(mntpt + "/.wcfs/zurl") == zurl assert readfile(mntpt + "/.wcfs/zurl") == zurl
...@@ -257,8 +251,8 @@ def start_and_crash_wcfs(zurl, mntpt): # -> WCFS ...@@ -257,8 +251,8 @@ def start_and_crash_wcfs(zurl, mntpt): # -> WCFS
# kill the server # kill the server
wc._proc.kill() # sends SIGKILL wcsrv._proc.kill() # sends SIGKILL
assert wc._proc.wait() != 0 assert wcsrv._proc.wait() != 0
# access to filesystem should raise "Transport endpoint not connected" # access to filesystem should raise "Transport endpoint not connected"
with raises(IOError) as exc: with raises(IOError) as exc:
...@@ -335,14 +329,12 @@ class DFile: ...@@ -335,14 +329,12 @@ class DFile:
# XXX print -> t.trace/debug() + t.verbose depending on py.test -v -v ? # XXX print -> t.trace/debug() + t.verbose depending on py.test -v -v ?
class tWCFS(_tWCFS): class tWCFS(_tWCFS):
@func @func
def __init__(t, wc=None): def __init__(t):
# `wc != None` is used to create tWCFS on existing WCFS connection assert not os.path.exists(testmntpt)
if wc is None: wc = wcfs.join(testzurl, autostart=True)
assert not os.path.exists(testmntpt) assert wc.mountpoint == testmntpt
wc = wcfs.join(testzurl, autostart=True) assert os.path.exists(wc.mountpoint)
assert wc.mountpoint == testmntpt assert is_mountpoint(wc.mountpoint)
assert os.path.exists(wc.mountpoint)
assert is_mountpoint(wc.mountpoint)
t.wc = wc t.wc = wc
# force-unmount wcfs on timeout to unstuck current test and let it fail. # force-unmount wcfs on timeout to unstuck current test and let it fail.
...@@ -351,60 +343,35 @@ class tWCFS(_tWCFS): ...@@ -351,60 +343,35 @@ class tWCFS(_tWCFS):
# cases, when wcfs, even after receiving `kill -9`, will be stuck in kernel. # cases, when wcfs, even after receiving `kill -9`, will be stuck in kernel.
# ( git.kernel.org/linus/a131de0a482a makes in-kernel FUSE client to # ( git.kernel.org/linus/a131de0a482a makes in-kernel FUSE client to
# still wait for request completion even after fatal signal ) # still wait for request completion even after fatal signal )
t._wcfuseabort = open("/sys/fs/fuse/connections/%d/abort" % os.minor(os.stat(t.wc.mountpoint).st_dev), "w")
nogilready = chan(dtype='C.structZ') nogilready = chan(dtype='C.structZ')
go(t._abort_ontimeout, 10*time.second, nogilready) # NOTE must be: with_timeout << · << wcfs_pin_timeout t._wcfuseabort = os.dup(wc._wcsrv._fuseabort.fileno())
go(t._abort_ontimeout, t._wcfuseabort, 10*time.second, nogilready) # NOTE must be: with_timeout << · << wcfs_pin_timeout
nogilready.recv() # wait till _abort_ontimeout enters nogil nogilready.recv() # wait till _abort_ontimeout enters nogil
# close closes connection to wcfs, unmounts the filesystem and makes sure # close closes connection to wcfs, unmounts the filesystem and makes sure
# that wcfs server exits. # that wcfs server exits.
@func @func
def close(t): def close(t):
defer(t._wcfuseabort.close) def _():
os.close(t._wcfuseabort)
defer(t._closed.close) defer(t._closed.close)
# unmount and wait for wcfs to exit # unmount and wait for wcfs to exit
def _(): def _():
# run `fusermount -u` the second time after if wcfs was killed to
# cleanup /proc/mounts.
if is_mountpoint(t.wc.mountpoint):
fuse_unmount(t.wc.mountpoint)
assert not is_mountpoint(t.wc.mountpoint) assert not is_mountpoint(t.wc.mountpoint)
os.rmdir(t.wc.mountpoint) os.rmdir(t.wc.mountpoint)
defer(_) defer(_)
@func
def _():
# kill wcfs.go in case it is deadlocked and does not exit by itself
if procwait_(timeout(), t.wc._proc):
return
# run `fusermount -u` the second time after we kill wcfs to cleanup
# /proc/mounts and avoid `assert not is_mountpoint` above.
def _():
if is_mountpoint(t.wc.mountpoint):
fuse_unmount(t.wc.mountpoint)
defer(_)
eprint("\nC: wcfs.go does not exit")
eprint("-> kill -QUIT wcfs.go ...\n")
os.kill(t.wc._proc.pid, SIGQUIT)
if procwait_(timeout(), t.wc._proc):
return
eprint("\nC: wcfs.go does not exit (after SIGQUIT)")
eprint("-> kill -KILL wcfs.go ...\n")
os.kill(t.wc._proc.pid, SIGKILL)
if procwait_(timeout(), t.wc._proc):
return
eprint("\nC: wcfs.go does not exit (after SIGKILL; probably it is stuck in kernel)")
eprint("-> nothing we can do...\n") # XXX dump /proc/pid/task/*/stack instead (ignore EPERM)
fail("wcfs.go does not exit even after SIGKILL")
defer(_)
def _(): def _():
#if not ready(t._wcfuseaborted): XXX kill _wcfuseaborted ? def onstuck():
# assert 0 == subprocess.call(["mountpoint", "-q", t.wc.mountpoint]) fail("wcfs.go does not exit even after SIGKILL")
assert is_mountpoint(t.wc.mountpoint) t.wc._wcsrv._stop(timeout(), _onstuck=onstuck)
fuse_unmount(t.wc.mountpoint)
defer(_) defer(_)
defer(t.wc.close)
t.wc.close() assert is_mountpoint(t.wc.mountpoint)
class tDB(tWCFS): class tDB(tWCFS):
...@@ -1971,72 +1938,6 @@ def dump_history(t): ...@@ -1971,72 +1938,6 @@ def dump_history(t):
print() print()
# ready reports whether chan ch is ready.
def ready(ch):
_, _rx = select(
default, # 0
ch.recv, # 1
)
return bool(_)
# procwait waits for a process (subprocess.Popen) to terminate.
def procwait(ctx, proc):
waitfor(ctx, lambda: proc.poll() is not None)
# procwait_, similarly to procwait, waits for a process (subprocess.Popen) to terminate.
#
# it returns bool whether process terminated or not - e.g. due to context being canceled.
def procwait_(ctx, proc): # -> ok
return waitfor_(ctx, lambda: proc.poll() is not None)
# waitfor waits for condf() to become true.
def waitfor(ctx, condf):
wg = sync.WorkGroup(ctx)
def _(ctx):
while 1:
if ready(ctx.done()):
raise ctx.err()
if condf():
return
tdelay()
wg.go(_)
wg.wait()
# waitfor_, similarly to waitfor, waits for condf() to become true.
#
# it returns bool whether target condition was reached or not - e.g. due to
# context being canceled.
def waitfor_(ctx, proc): # -> ok
try:
waitfor(ctx, proc)
except Exception as e:
if errors.Is(e, context.canceled) or errors.Is(e, context.deadlineExceeded):
return False
raise
return True
# is_mountpoint returns whether path is a mountpoint
def is_mountpoint(path): # -> bool
# NOTE we don't call mountpoint directly on path, because if FUSE
# fileserver failed, the mountpoint will also fail and print ENOTCONN
try:
_ = os.lstat(path)
except OSError as e:
if e.errno == ENOENT:
return False
# "Transport endpoint is not connected" -> it is a failed FUSE server
# (XXX we can also grep /proc/mounts)
if e.errno == ENOTCONN:
return True
raise
if not S_ISDIR(_.st_mode):
return False
mounted = (0 == subprocess.call(["mountpoint", "-q", path]))
return mounted
# procmounts_lookup_wcfs returns /proc/mount entry for wcfs mounted to serve zurl. # procmounts_lookup_wcfs returns /proc/mount entry for wcfs mounted to serve zurl.
def procmounts_lookup_wcfs(zurl): # -> mountpoint | KeyError def procmounts_lookup_wcfs(zurl): # -> mountpoint | KeyError
for line in readfile('/proc/mounts').splitlines(): for line in readfile('/proc/mounts').splitlines():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment