Commit 7c1cc95d authored by Kirill Smelkov's avatar Kirill Smelkov

Merge branch 't' into t2

* t:
  X wcfs: Teach join/serve to start successfully even after unclean wcfs shutdown
  X wcfs: Switch mountpoints from /tmp/wcfs/* to /dev/shm/*
  X fixup! X lib/zodb: zstor_2zurl: Explicitly reject MappingStorage
parents 132705f6 b0ca031f
......@@ -373,9 +373,9 @@ def zstor_2zurl(zstor):
# MappingStorage:
if ztype == "ZODB.MappingStorage.MappingStorage":
raise ValueError("%s is in-RAM storage\n" +
raise ValueError(("%s is in-RAM storage\n" +
"\tin-RAM storages are not supported:\n" +
"\ta zurl pointing to in-RAM storage in one process would lead to\n"
"\tanother in-RAM storage in WCFS process." % ztype)
"\tanother in-RAM storage in WCFS process.") % ztype)
raise NotImplementedError("don't know how to extract zurl from %r" % zstor)
......@@ -63,10 +63,10 @@ The following environment variables can be used to control wcfs.py client:
from __future__ import print_function, absolute_import
import os, sys, hashlib, tempfile, subprocess, time, stat
import os, sys, hashlib, subprocess, time, stat
import logging as log
from os.path import dirname
from errno import ENOENT, EEXIST
from errno import ENOENT, ENOTCONN, EEXIST
from golang import chan, select, default, func, defer
from golang import sync, context
......@@ -159,10 +159,11 @@ def close(wc):
with _wcmu:
wc._njoin -= 1
if wc._njoin == 0:
del _wcregistry[wc.mountpoint]
# NOTE not unmounting wcfs - it either runs as separate service, or
# is spawned on demand with -autoexit.
# NOTE ._fwcfs.close can raise IOError (e.g. ENOTCONN after wcfs server crash)
wc._fwcfs.close()
del _wcregistry[wc.mountpoint]
# _default_autostart returns default autostart setting for join.
#
......@@ -177,7 +178,7 @@ def _default_autostart():
# join connects to wcfs server for ZODB @ zurl.
#
# If wcfs for that zurl was already started, join connects to it.
# If wcfs for that zurl is already running, join connects to it.
# Otherwise it starts wcfs for zurl if autostart is True.
#
# For the same zurl join returns the same WCFS object.
......@@ -190,11 +191,16 @@ def join(zurl, autostart=_default_autostart()): # -> WCFS
wc._njoin += 1
return wc
# no. try opening .wcfs - if we succeed - wcfs is already mounted.
# no. try opening .wcfs - if we succeed - wcfs is already running.
unclean = False
try:
f = open(mntpt + "/.wcfs/zurl")
except IOError as e:
if e.errno != ENOENT:
if e.errno == ENOENT: # wcfs cleanly unmounted
pass
elif e.errno == ENOTCONN: # wcfs crashed/killed
unclean = True
else:
raise
else:
# already have it
......@@ -203,11 +209,13 @@ def join(zurl, autostart=_default_autostart()): # -> WCFS
return wc
if not autostart:
raise RuntimeError("wcfs: join %s: server not started" % zurl)
raise RuntimeError("wcfs: join %s: server not running" % zurl)
# start wcfs with telling it to automatically exit when there is no client activity.
# XXX race window if external process starts after ^^^ check
# TODO -> fs-level locking
if unclean:
_fuse_unmount(mntpt)
return _start(zurl, "-autoexit")
......@@ -256,29 +264,9 @@ def _start(zurl, *optv): # -> WCFS
def _(ctx):
# XXX errctx "waitmount"
while 1:
try:
f = open("%s/.wcfs/zurl" % mntpt)
except IOError as e:
if e.errno != ENOENT:
raise
else:
dotwcfs = f.read()
if dotwcfs != zurl:
raise RuntimeError(".wcfs/zurl != zurl (%s != %s)" % (qq(dotwcfs), qq(zurl)))
wc._fwcfs = f
fsready.close()
return
_, _rx = select(
ctx.done().recv, # 0
default, # 1
)
if _ == 0:
raise ctx.err()
time.sleep(0.1)
fwcfs = _waitmount(ctx, zurl, mntpt)
wc._fwcfs = fwcfs
fsready.close()
wg.go(_)
wg.wait()
......@@ -287,6 +275,31 @@ def _start(zurl, *optv): # -> WCFS
_wcregistry[mntpt] = wc
return wc
# _waitmount waits for wcfs filesystem for zurl @mntpt to become ready.
def _waitmount(ctx, zurl, mntpt): # -> fwcfs
while 1:
try:
f = open("%s/.wcfs/zurl" % mntpt)
except IOError as e:
# ENOTCONN (wcfs crashed/killed) is an error here
if e.errno != ENOENT:
raise
else:
dotwcfs = f.read()
if dotwcfs != zurl:
raise RuntimeError(".wcfs/zurl != zurl (%s != %s)" % (qq(dotwcfs), qq(zurl)))
return f
_, _rx = select(
ctx.done().recv, # 0
default, # 1
)
if _ == 0:
raise ctx.err()
time.sleep(0.1)
# ---- misc ----
......@@ -310,8 +323,12 @@ def _mntpt_4zurl(zurl):
m = hashlib.sha1()
m.update(zurl)
# mkdir /tmp/wcfs with stiky bit. This way multiple users can create subdirectories inside.
wcfsroot = "%s/wcfs" % (tempfile.gettempdir())
# WCFS mounts are located under /dev/shm/wcfs. /dev/shm is already used by
# userspace part of wendelin.core memory manager for dirtied pages.
# In a sense WCFS mount provides shared read-only memory backed by ZODB.
# mkdir /dev/shm/wcfs with stiky bit. This way multiple users can create subdirectories inside.
wcfsroot = "/dev/shm/wcfs"
wcfsmode = 0o777 | stat.S_ISVTX
if _mkdir_p(wcfsroot):
os.chmod(wcfsroot, wcfsmode)
......@@ -338,37 +355,50 @@ def _mkdir_p(path, mode=0o777): # -> created(bool)
return False
return True
# fusermount -u.
def _fuse_unmount(mntpt):
subprocess.check_call(["fusermount", "-u", mntpt])
# serve starts and runs wcfs server for ZODB @ zurl.
#
# it mounts wcfs at a location that is with 1-1 correspondence with zurl.
# it then waits for wcfs to exit (either due to unmount or an error).
#
# it is an error if wcfs was already started.
# it is an error if wcfs is already running.
#
# optv is list of options to pass to wcfs server.
# if exec_ is True, wcfs is not spawned, but executed into.
#
# serve(zurl, exec_=False).
def serve(zurl, optv, exec_=False):
def serve(zurl, optv, exec_=False, _tstartingq=None):
mntpt = _mntpt_4zurl(zurl)
# XXX take $WENDELIN_CORE_WCFS_OPTIONS into account?
# try opening .wcfs - it is an error if we can do it.
# XXX -> option to wcfs itself to verify wcfs/something is already mounted?
unclean = False
try:
f = open(mntpt + "/.wcfs/zurl")
except IOError as e:
if e.errno != ENOENT:
if e.errno == ENOENT: # wcfs cleanly unmounted
pass
elif e.errno == ENOTCONN: # wcfs crashed/killed
unclean = True
else:
raise
else:
f.close()
raise RuntimeError("wcfs: start %s: already started" % zurl)
raise RuntimeError("wcfs: start %s: already running" % zurl)
# seems to be ok to start
# XXX race window if external process starts after ^^^ check
# TODO -> fs-level locking
if unclean:
_fuse_unmount(mntpt)
if _tstartingq is not None:
_tstartingq.close()
argv = [_wcfs_exe()] + list(optv) + [zurl, mntpt]
if not exec_:
subprocess.check_call(argv, close_fds=True)
......@@ -411,7 +441,7 @@ def main():
elif cmd == "stop":
mntpt = _mntpt_4zurl(zurl)
subprocess.check_call(["fusermount", "-u", mntpt])
_fuse_unmount(mntpt)
else:
print("wcfs: unknown command %s" % qq(cmd), file=sys.stderr)
......
......@@ -98,6 +98,8 @@ def teardown_module():
# make sure we start every test without wcfs server running.
def setup_function(f):
assert not os.path.exists(testmntpt)
with raises(KeyError):
procmounts_lookup_wcfs(testzurl)
# make sure we unmount wcfs after every test.
# (tDB checks this in more detail, but join tests don't use tDB)
......@@ -107,13 +109,15 @@ def teardown_function(f):
fuse_unmount(testmntpt)
if os.path.exists(testmntpt):
os.rmdir(testmntpt)
with raises(KeyError):
procmounts_lookup_wcfs(testzurl)
# fuse_unmount unmounts FUSE filesystem mounted @ mntpt.
@func
def fuse_unmount(mntpt):
assert is_mountpoint(mntpt)
try:
subprocess.check_call(["fusermount", "-u", mntpt])
wcfs._fuse_unmount(mntpt)
except subprocess.CalledProcessError:
# unmount failed, usually due to "device is busy".
# Print which files are still opened and reraise
......@@ -131,13 +135,13 @@ def fuse_unmount(mntpt):
# ---- test join/autostart ----
# ---- test join/autostart/serve ----
# test that join works.
@func
def test_join():
zurl = testzurl
with raises(RuntimeError, match="wcfs: join .*: server not started"):
with raises(RuntimeError, match="wcfs: join .*: server not running"):
wcfs.join(zurl, autostart=False)
assert wcfs._wcregistry == {}
......@@ -162,7 +166,7 @@ def test_join():
@func
def test_join_autostart():
zurl = testzurl
with raises(RuntimeError, match="wcfs: join .*: server not started"):
with raises(RuntimeError, match="wcfs: join .*: server not running"):
wcfs.join(zurl, autostart=False)
assert wcfs._wcregistry == {}
......@@ -179,6 +183,101 @@ def test_join_autostart():
assert os.path.isdir(wc.mountpoint + "/head/bigfile")
# verify that join successfully starts wcfs if previous wcfs exited uncleanly.
@func
def test_join_after_crash():
zurl = testzurl
mntpt = testmntpt
wc = start_and_crash_wcfs(zurl, mntpt)
# start the server again - it should start ok despite that FUSE connection
# to previously aborted wcfs is still there
wc2 = wcfs.join(zurl, autostart=True)
assert wc2 is not wc
assert wcfs._wcregistry[mntpt] is wc2
assert wc2.mountpoint == mntpt
assert readfile(mntpt + "/.wcfs/zurl") == zurl
# /proc/mounts should contain wcfs entry
assert procmounts_lookup_wcfs(zurl) == mntpt
# stop the server
wc2.close()
fuse_unmount(mntpt)
# /proc/mounts entry should be gone
with raises(KeyError):
procmounts_lookup_wcfs(zurl)
# verify that serve successfully starts if previous wcfs exited uncleanly.
@func
def test_serve_after_crash():
zurl = testzurl
mntpt = testmntpt
wc = start_and_crash_wcfs(zurl, mntpt)
serve_starting = chan(dtype='C.structZ')
serve_done = chan(dtype='C.structZ')
@func
def _():
defer(serve_done.close)
wcfs.serve(zurl, [], _tstartingq=serve_starting)
go(_)
def _():
fuse_unmount(mntpt)
serve_done.recv()
defer(_)
serve_starting.recv() # wait before serve is going to spawn wcfs after cleanup
wcfs._waitmount(timeout(), zurl, mntpt)
assert readfile(mntpt + "/.wcfs/zurl") == zurl
assert procmounts_lookup_wcfs(zurl) == mntpt
# start_and_crash_wcfs starts wcfs and then kills it.
# it returns closed WCFS connection that was connected to the killed WCFS server.
def start_and_crash_wcfs(zurl, mntpt): # -> WCFS
# /proc/mounts should not contain wcfs entry
with raises(KeyError):
procmounts_lookup_wcfs(zurl)
# start the server with attached client
wc = wcfs._start(zurl)
assert wcfs._wcregistry[mntpt] is wc
assert wc.mountpoint == mntpt
assert readfile(mntpt + "/.wcfs/zurl") == zurl
# /proc/mounts should now contain wcfs entry
assert procmounts_lookup_wcfs(zurl) == mntpt
# kill the server
wc._proc.kill() # sends SIGKILL
assert wc._proc.wait() != 0
# access to filesystem should raise "Transport endpoint not connected"
with raises(IOError) as exc:
readfile(mntpt + "/.wcfs/zurl")
assert exc.value.errno == ENOTCONN
# client close should also raise "Transport endpoint not connected" but remove wc from _wcregistry
assert wcfs._wcregistry[mntpt] is wc
with raises(IOError) as exc:
wc.close()
assert exc.value.errno == ENOTCONN
assert mntpt not in wcfs._wcregistry
# /proc/mounts should still contain wcfs entry
assert procmounts_lookup_wcfs(zurl) == mntpt
return wc
# ---- infrastructure for data access tests ----
#
# Testing infrastructure consists of tDB, tFile, tWatch and tWatchLink that
......@@ -1938,6 +2037,16 @@ def is_mountpoint(path): # -> bool
mounted = (0 == subprocess.call(["mountpoint", "-q", path]))
return mounted
# procmounts_lookup_wcfs returns /proc/mount entry for wcfs mounted to serve zurl.
def procmounts_lookup_wcfs(zurl): # -> mountpoint | KeyError
for line in readfile('/proc/mounts').splitlines():
# <zurl> <mountpoint> fuse.wcfs ...
zurl_, mntpt, typ, _ = line.split(None, 3)
if typ != 'fuse.wcfs':
continue
if zurl_ == zurl:
return mntpt
raise KeyError("lookup wcfs %s: no /proc/mounts entry" % zurl)
# eprint prints msg to stderr
def eprint(msg):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment