Commit 1335412f authored by Kirill Smelkov's avatar Kirill Smelkov

Y wcfs: Switch to work with mount entries instead of mountpoints

Because mount entries provide more information compared to just one
string mountpoint. For example later for `wcfs status` and `wcfs stop`
we will need to use the ID of a "device" that is attached to the mount,
and also the type of the filesystem that is serving the mount.

-> Introduce internal.os.MountDB to retrieve information from OS
   registry of mounted filesystems and use its entries instead of plain
   mountpoint string.

wcfs_test.py already had some rudimentary code to parse /proc/mounts
which we also replace with querying MountDB.

The API of MountDB might be viewed as a bit of overkill but it will
align with API of upcoming ProcDB for which it will be reasonable.
parent f02bad18
...@@ -85,12 +85,14 @@ from wendelin.wcfs.client._wcfs import \ ...@@ -85,12 +85,14 @@ from wendelin.wcfs.client._wcfs import \
PyWCFS as _WCFS, \ PyWCFS as _WCFS, \
PyWatchLink as WatchLink \ PyWatchLink as WatchLink \
from wendelin.wcfs.internal import os as xos
# Server represents running wcfs server. # Server represents running wcfs server.
# #
# Use start to create it. # Use start to create it.
class Server: class Server:
# .mountpoint path to wcfs mountpoint # ._mnt mount entry
# ._proc wcfs process # ._proc wcfs process
# ._fuseabort opened /sys/fs/fuse/connections/X/abort for this server # ._fuseabort opened /sys/fs/fuse/connections/X/abort for this server
# ._stopOnce # ._stopOnce
...@@ -295,7 +297,7 @@ def _start(zurl, *optv): # -> Server, fwcfs ...@@ -295,7 +297,7 @@ def _start(zurl, *optv): # -> Server, fwcfs
# XXX errctx "wcfs: start" # XXX errctx "wcfs: start"
# spawn wcfs and wait till filesystem-level access to it is ready # spawn wcfs and wait till filesystem-level access to it is ready
wcsrv = Server(mntpt, None, None) wcsrv = Server(None, None, None)
wg = sync.WorkGroup(context.background()) wg = sync.WorkGroup(context.background())
fsready = chan(dtype='C.structZ') fsready = chan(dtype='C.structZ')
def _(ctx): def _(ctx):
...@@ -331,6 +333,7 @@ def _start(zurl, *optv): # -> Server, fwcfs ...@@ -331,6 +333,7 @@ def _start(zurl, *optv): # -> Server, fwcfs
wg.go(_) wg.go(_)
wg.wait() wg.wait()
wcsrv._mnt = _lookup_mnt(mntpt)
log.info("started pid%d @ %s", wcsrv._proc.pid, mntpt) log.info("started pid%d @ %s", wcsrv._proc.pid, mntpt)
fwcfs = wcsrv._fwcfs fwcfs = wcsrv._fwcfs
...@@ -375,12 +378,18 @@ def _waitmount(ctx, zurl, mntpt): # -> fwcfs ...@@ -375,12 +378,18 @@ def _waitmount(ctx, zurl, mntpt): # -> fwcfs
@func(Server) @func(Server)
def __init__(wcsrv, mountpoint, proc, ffuseabort): def __init__(wcsrv, mnt, proc, ffuseabort):
wcsrv.mountpoint = mountpoint wcsrv._mnt = mnt
wcsrv._proc = proc wcsrv._proc = proc
wcsrv._fuseabort = ffuseabort wcsrv._fuseabort = ffuseabort
wcsrv._stopOnce = sync.Once() wcsrv._stopOnce = sync.Once()
# mountpoint returns path to wcfs mountpoint.
@func(Server)
@property
def mountpoint(wcsrv):
return wcsrv._mnt.point
# stop shutdowns the server. # stop shutdowns the server.
@func(Server) @func(Server)
def stop(wcsrv, ctx=None): def stop(wcsrv, ctx=None):
...@@ -528,6 +537,22 @@ def _mntpt_4zurl(zurl): ...@@ -528,6 +537,22 @@ def _mntpt_4zurl(zurl):
_mkdir_p(mntpt) _mkdir_p(mntpt)
return mntpt return mntpt
# _lookup_mnt returns mount entry corresponding to mntpt mountpoint.
def _lookup_mnt(mntpt, nomount_ok=False): # -> xos.Mount (| None if nomount_ok)
mdbc = xos.MountDB.open()
_ = mdbc.query(lambda mnt: mnt.point == mntpt)
_ = list(_)
if len(_) == 0:
if nomount_ok:
return None
raise RuntimeError("no mount entry for %s" % mntpt)
if len(_) > 1:
# NOTE if previous wcfs was lazy unmounted - there won't be multiple mount entries
# because MNT_DETACH (what lazy-unmount uses) removes entry from mount registry
raise RuntimeError("multiple mount entries for %s" % mntpt)
mnt = _[0]
return mnt
# mkdir -p. # mkdir -p.
def _mkdir_p(path, mode=0o777): # -> created(bool) def _mkdir_p(path, mode=0o777): # -> created(bool)
...@@ -554,19 +579,30 @@ class _FUSEUnmountError(RuntimeError): ...@@ -554,19 +579,30 @@ class _FUSEUnmountError(RuntimeError):
pass pass
@func @func
def _fuse_unmount(mntpt, *optv): def _fuse_unmount(mntpt, *optv):
ret, out = _sysproccallout(["fusermount", "-u"] + list(optv) + [mntpt]) mdbc = xos.MountDB.open()
_ = mdbc.query(lambda mnt: mnt.point == mntpt)
_ = list(_)
if len(_) == 0:
raise RuntimeError("not a mountpoint: %s" % mntpt)
assert len(_) == 1, _
mnt = _[0]
return _mnt_fuse_unmount(mnt, *optv)
@func
def _mnt_fuse_unmount(mnt, *optv):
ret, out = _sysproccallout(["fusermount", "-u"] + list(optv) + [mnt.point])
if ret != 0: if ret != 0:
# unmount failed, usually due to "device is busy". # unmount failed, usually due to "device is busy".
# Log which files are still opened and reraise # Log which files are still opened and reraise
def _(): def _():
log.warn("# lsof %s" % mntpt) log.warn("# lsof %s" % mnt.point)
# -w to avoid lots of # -w to avoid lots of
# lsof: WARNING: can't stat() fuse.wcfs file system /dev/shm/wcfs/X # lsof: WARNING: can't stat() fuse.wcfs file system /dev/shm/wcfs/X
# Output information may be incomplete. # Output information may be incomplete.
# if there are other uncleaned wcfs mountpoints. # if there are other uncleaned wcfs mountpoints.
# (lsof stats all filesystems on startup) # (lsof stats all filesystems on startup)
# NOTE lsof +D misbehaves - don't use it # NOTE lsof +D misbehaves - don't use it
ret, out = _sysproccallout(["lsof", "-w", mntpt]) ret, out = _sysproccallout(["lsof", "-w", mnt.point])
log.warn(out) log.warn(out)
if ret: if ret:
log.warn("(lsof failed)") log.warn("(lsof failed)")
...@@ -575,8 +611,8 @@ def _fuse_unmount(mntpt, *optv): ...@@ -575,8 +611,8 @@ def _fuse_unmount(mntpt, *optv):
# XXX fuser should work where lsof starts to fail after wcfs going to EIO mode # XXX fuser should work where lsof starts to fail after wcfs going to EIO mode
# ref:ZzO3wtEdQVDw5Wz5@deca.navytux.spb.ru # ref:ZzO3wtEdQVDw5Wz5@deca.navytux.spb.ru
def _(): def _():
log.warn("# fuser -vmM %s" % mntpt) log.warn("# fuser -vmM %s" % mnt.point)
ret, out = _sysproccallout(["fuser", "-vmM", mntpt]) ret, out = _sysproccallout(["fuser", "-vmM", mnt.point])
log.warn(out) log.warn(out)
if ret: if ret:
log.warn("(fuser failed)") log.warn("(fuser failed)")
...@@ -586,7 +622,7 @@ def _fuse_unmount(mntpt, *optv): ...@@ -586,7 +622,7 @@ def _fuse_unmount(mntpt, *optv):
opts = ' '.join(optv) opts = ' '.join(optv)
if opts != '': if opts != '':
opts += ' ' opts += ' '
emsg = "fuse_unmount %s%s: failed: %s" % (opts, mntpt, out) emsg = "fuse_unmount %s%s: failed: %s" % (opts, mnt.point, out)
log.warn(emsg) log.warn(emsg)
raise _FUSEUnmountError("%s\n(more details logged)" % emsg) raise _FUSEUnmountError("%s\n(more details logged)" % emsg)
......
...@@ -21,11 +21,155 @@ ...@@ -21,11 +21,155 @@
"""Package wcfs.internal.os complements operating system facilities provided by """Package wcfs.internal.os complements operating system facilities provided by
standard package os. standard package os.
- MountDB, MountDBConn and Mount provide access to database of mounted filesystems.
- readfile and writefile are handy utilities to read/write a file as a whole. - readfile and writefile are handy utilities to read/write a file as a whole.
""" """
from __future__ import print_function, absolute_import from __future__ import print_function, absolute_import
import os
from golang import func
# ISOLATION_* specify isolation levels.
#
# snapshot: full state is observed at transaction start and remains constant
# repeatable-read: state parts are observed on access, but observed once remain constant
# none: no isolation from simultaneous changes
ISOLATION_SNAPSHOT = "snapshot"
ISOLATION_REPEATABLE_READ = "repeatable-read"
ISOLATION_NONE = "none"
# MountDB represents database of mounts.
#
# .open(isolation_level) creates read-only MountDBConn view of this database.
class MountDB(object):
__slots__ = ()
# MountDBConn provides view to MountDB.
#
# Semantically it is {} mntid -> Mount; use get for point lookup.
# Query can be also used to find mount entries selected by arbitrary condition.
#
# It is safe to use MountDBConn only from single thread simultaneously.
class MountDBConn(object):
__slots__ = (
'_observed', # {} mntid -> Mount
)
# Mount represents one mount entry of Linux kernel.
#
# See https://www.man7.org/linux/man-pages/man5/proc_pid_mountinfo.5.html and
# https://www.kernel.org/doc/html/latest/filesystems/proc.html for details.
#
# It is safe to use MountDB only from single thread simultaneously.
class Mount(object):
__slots__ = (
'id', # int
'parent_id', # int
'dev', # dev_t (major:minor composed into one int)
'root', # str
'point', # str
'options', # str
'tags', # {} str->str|True
'fstype', # str
'fssrc', # str
'super_options', # str
)
# ---- str/parse ----
@func(MountDBConn)
def __str__(mdbc):
s = ""
for mnt in mdbc.query(lambda mnt: True):
s += "%s\n" % mnt
return s
@func(Mount)
def __str__(mnt):
s = "%d %d %d:%d %s %s %s" % (mnt.id, mnt.parent_id,
os.major(mnt.dev), os.minor(mnt.dev), mnt.root, mnt.point, mnt.options)
for k,v in mnt.tags.items():
s += " %s" % (k,)
if v is not True:
s += ":%s" % (v,)
s += " - %s %s %s" % (mnt.fstype, mnt.fssrc, mnt.super_options)
return s
@func(Mount)
@staticmethod
def _parse(line): # -> Mount
line = line.rstrip()
assert '\n' not in line, line
v = line.split()
mnt = Mount()
# XXX error ctx if something fails
mnt.id = int(v[0])
mnt.parent_id = int(v[1])
major, minor = v[2].split(':')
mnt.dev = os.makedev(int(major), int(minor))
mnt.root = v[3]
mnt.point = v[4]
mnt.options = v[5]
v = v[6:]
sep = v.index('-')
mnt.tags = {}
for _ in v[:sep]:
if ':' in _:
k, kv = _.split(':')
else:
k, kv = _, True
assert k not in mnt.tags, k # XXX duplicate tags
mnt.tags[k] = kv
v = v[sep+1:]
mnt.fstype = v[0]
mnt.fssrc = v[1]
mnt.super_options = v[2]
return mnt
# ---- open ----
# open opens connection to mount database.
@func(MountDB)
@staticmethod
def open(isolation_level=ISOLATION_SNAPSHOT): # -> MountDBConn
assert isolation_level in (ISOLATION_SNAPSHOT, ISOLATION_REPEATABLE_READ,
ISOLATION_NONE), isolation_level
if isolation_level != ISOLATION_SNAPSHOT:
raise NotImplementedError("support for isolation_level %r is not implemented" % isolation_level)
mdbc = MountDBConn()
mdbc._observed = {}
for line in readfile("/proc/self/mountinfo").splitlines():
mnt = Mount._parse(line)
assert mnt.id not in mdbc._observed, mnt # XXX dup mount entry
mdbc._observed[mnt.id] = mnt
return mdbc
# ---- get/query ----
# get looks up mount by its mntid.
#
# None is returned if there is no such process.
@func(MountDBConn)
def get(mdbc, mntid): # -> Mount | None
return mdbc._observed.get(mntid, None)
# query returns mount entries selected by selectf.
@func(MountDBConn)
def query(mdbc, selectf): # -> i[]Mount
for mntid in sorted(mdbc._observed.keys()):
mnt = mdbc._observed[mntid]
if selectf(mnt):
yield mnt
# ---- misc ----
# readfile reads file @ path. # readfile reads file @ path.
def readfile(path): def readfile(path):
......
...@@ -2116,14 +2116,17 @@ def dump_history(t): ...@@ -2116,14 +2116,17 @@ def dump_history(t):
# procmounts_lookup_wcfs returns /proc/mount entry for wcfs mounted to serve zurl. # procmounts_lookup_wcfs returns /proc/mount entry for wcfs mounted to serve zurl.
def procmounts_lookup_wcfs(zurl): # -> mountpoint | KeyError def procmounts_lookup_wcfs(zurl): # -> mountpoint | KeyError
for line in xos.readfile('/proc/mounts').splitlines(): mdbc = xos.MountDB.open()
# <zurl> <mountpoint> fuse.wcfs ... _ = mdbc.query(lambda mnt: mnt.fstype == 'fuse.wcfs' and mnt.fssrc == zurl)
zurl_, mntpt, typ, _ = line.split(None, 3) _ = list(_)
if typ != 'fuse.wcfs': if len(_) == 0:
continue raise KeyError("lookup wcfs %s: no mountdb entry" % zurl)
if zurl_ == zurl: if len(_) > 1:
return mntpt raise KeyError("lookup wcfs %s: multiple mountdb entries:%s" % (zurl,
raise KeyError("lookup wcfs %s: no /proc/mounts entry" % zurl) ''.join('\n\t%s' % mnt for mnt in _)))
mnt = _[0]
return mnt.point
# eprint prints msg to stderr # eprint prints msg to stderr
def eprint(msg): def eprint(msg):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment