Commit 6f0cdaff authored by Kirill Smelkov's avatar Kirill Smelkov

wcfs: Provide isolation to clients

Via custom isolation protocol that both server and clients must cooperatively
follow. This is the core change that enables file cache to be practically
shared while each client can still be provided with isolated view of the database.

This patch brings only server changes, tests + the minimum client bits to support the tests.
The client library, that will implement isolation protocol on client side, will come next.

This patch is organized as follows:

- wcfs.go brings in description of the protocol, overview of how server
  implements that protocol and the implementation itself.
  See also notes.txt

- wcfs_test.py brings in tests for server implementation.
  tWCFS._abort_ontimeout had to be moved into nogil mode into wcfs_test.pyx
  to avoid deadlock on the GIL (see comments in wcfs_test.pyx for details).

- files added in wcfs/client/ are needed to provide client-side
  implementation of WatchLink - the message exchange protocol over
  opened head/watch file - for tests. Client-side watchlink implementation
  lives in wcfs/client/wcfs_watchlink.{h,cpp}. The other additions in
  wcfs/client/ are to support that and to expose the WatchLink to Python.

  Client-side bits are done right in C++ because upcoming WCFS client
  library will be implemented in C++ to work in nogil mode in order to
  avoid deadlock on the GIL because client-side pinner thread might be
  woken-up synchronously by WCFS server at any moment, including when
  another client thread already holds the GIL and is paused by WCFS.

Some preliminary history:

9b4a42a3    X invalidation design draftly settled
27d91d47    X δFtail settled
c27c1940    X mmap over under pagefault to this mmapping works
d36b171f    X ptrace when client is under pagefault or syscall won't work
c1f5bb19    X notes on why lazy-invalidate approach was taken
4fbdd270    X Proof that that it is possible to change mmapping while under pagefault to it
33e0dfce    X ΔTail draftly done
12628943    X make sure "bye" is always processed immediately - even if a handleWatch is currently blocked
af0a64cb    X test for "bye" canceling blocked handlers
996dc6a8    X Fix race in test
43915fe9    X wcfs: Don't forbid simultaneous watch requests
941dc54b    X wcfs: threading.Lock -> sync.Mutex
d75b2304    X wcfs: Move _abort_ontimeout to pyx/nogil
79234659    X Notes on why eagier invalidation was rejected
f05271b1    X Test that sysread(/head/watch) can be interrupted
5ba816da    X restore test_wcfs_watch_robust after f05271b1.
4bd88564    X "Invalidation protocol" -> "Isolation protocol"
f7b54ca4    X avoid fmt::vsprintf  (now compils again with latest pygolang@master)
0a8fcd9d    X wcfs/client: Move EOF -> pygolang
153e02e6    X test_wcfs_watch_setup and test_wcfs_watch_setup_ahead work again
17f98edc    X wcfs: client: os: Factor syserr -> string into _sysErrString
7b0c301c    X wcfs: tests: Fix tFile.assertBlk not to segfault on a test failure
b74dda09    X Start switching Track from Track(key) to Track(keycov)
8b5d8523    X Move tracking of which blocks were accessed from wcfs to ΔFtail
parent 4430de41
/* Wendelin.bigfile | virtual memory tests
* Copyright (C) 2014-2019 Nexedi SA and Contributors.
* Copyright (C) 2014-2021 Nexedi SA and Contributors.
* Kirill Smelkov <kirr@nexedi.com>
*
* This program is free software: you can Use, Study, Modify and Redistribute
......@@ -338,7 +338,8 @@ void test_file_access_synthetic(void)
size_t PS, PSb;
int err;
/* MUST_FAULT(code) - checks that code faults */
/* MUST_FAULT(code) - checks that code faults */
/* somewhat dup in wcfs/internal/wcfs_test.pyx */
sigjmp_buf fault_jmp;
volatile int fault_expected = 0;
void sigfault_handler(int sig) {
......
......@@ -282,6 +282,12 @@ libvirtmem_h = [
'include/wendelin/utils.h',
]
libwcfs_h = [
'wcfs/client/wcfs.h',
'wcfs/client/wcfs_misc.h',
'wcfs/client/wcfs_watchlink.h',
]
setup(
name = 'wendelin.core',
version = '0.13',
......@@ -306,7 +312,13 @@ setup(
'lib/utils.c'],
depends = libvirtmem_h,
define_macros = [('_GNU_SOURCE',None)],
language = 'c')],
language = 'c'),
DSO('wendelin.wcfs.client.libwcfs',
['wcfs/client/wcfs.cpp',
'wcfs/client/wcfs_watchlink.cpp',
'wcfs/client/wcfs_misc.cpp'],
depends = libwcfs_h)],
ext_modules = [
PyGoExt('wendelin.bigfile._bigfile',
......@@ -319,6 +331,11 @@ setup(
language = 'c',
dsos = ['wendelin.bigfile.libvirtmem']),
PyGoExt('wendelin.wcfs.client._wcfs',
['wcfs/client/_wcfs.pyx'],
depends = libwcfs_h,
dsos = ['wendelin.wcfs.client.libwcfs']),
PyGoExt('wendelin.wcfs.internal.wcfs_test',
['wcfs/internal/wcfs_test.pyx']),
......
......@@ -60,6 +60,10 @@ from persistent import Persistent
from zodbtools.util import ashex as h
from six.moves.urllib.parse import urlsplit, urlunsplit
from .client._wcfs import \
PyWCFS as _WCFS, \
PyWatchLink as WatchLink \
# Server represents running wcfs server.
#
......@@ -79,7 +83,7 @@ class Server:
# Raw files on wcfs can be accessed with ._path/._read/._stat/._open .
#
# WCFS logically mirrors ZODB.DB .
class WCFS:
class WCFS(_WCFS):
# .mountpoint path to wcfs mountpoint
# ._fwcfs /.wcfs/zurl opened to keep the server from going away (at least cleanly)
# ._njoin this connection was returned for so many joins
......
# -*- coding: utf-8 -*-
# Copyright (C) 2018-2021 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
# cython: language_level=2
# distutils: language=c++
# Package _wcfs provides Python-wrappers for C++ wcfs client package.
#
# It wraps WCFS and WatchLink.
from golang cimport chan, structZ, string, error, refptr
from golang cimport context
from libc.stdint cimport int64_t, uint64_t
from libcpp.utility cimport pair
from libcpp.vector cimport vector
cdef extern from "wcfs/client/wcfs_misc.h" namespace "zodb" nogil:
ctypedef uint64_t Tid
ctypedef uint64_t Oid
cdef extern from "wcfs/client/wcfs_misc.h" namespace "wcfs" nogil:
const Tid TidHead
# pyx/nogil description for C++ classes
cdef extern from "wcfs/client/wcfs_watchlink.h" namespace "wcfs" nogil:
cppclass _WatchLink:
error close()
error closeWrite()
pair[string, error] sendReq(context.Context ctx, const string &req)
error recvReq(context.Context ctx, PinReq *prx)
error replyReq(context.Context ctx, const PinReq *req, const string& reply);
vector[string] fatalv
chan[structZ] rx_eof
cppclass WatchLink (refptr[_WatchLink]):
# WatchLink.X = WatchLink->X in C++
error close "_ptr()->close" ()
error closeWrite "_ptr()->closeWrite"()
pair[string, error] sendReq "_ptr()->sendReq" (context.Context ctx, const string &req)
error recvReq "_ptr()->recvReq" (context.Context ctx, PinReq *prx)
error replyReq "_ptr()->replyReq" (context.Context ctx, const PinReq *req, const string& reply);
vector[string] fatalv "_ptr()->fatalv"
chan[structZ] rx_eof "_ptr()->rx_eof"
cppclass PinReq:
Oid foid
int64_t blk
Tid at
string msg
error _twlinkwrite(WatchLink wlink, const string& pkt)
cdef extern from "wcfs/client/wcfs.h" namespace "wcfs" nogil:
cppclass WCFS:
string mountpoint
pair[WatchLink, error] _openwatch()
# ---- python bits ----
cdef class PyWCFS:
cdef WCFS wc
cdef class PyWatchLink:
cdef WatchLink wlink
cdef class PyPinReq:
cdef PinReq pinreq
# -*- coding: utf-8 -*-
# Copyright (C) 2018-2021 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
# cython: language_level=2
# cython: auto_pickle=False
# distutils: language=c++
# Package _wcfs provides Python-wrappers for C++ wcfs client package.
# See _wcfs.pxd for package overview.
from golang cimport pychan, pyerror, nil
from golang cimport io
from ZODB.utils import p64
cdef class PyWCFS:
property mountpoint:
def __get__(PyWCFS pywc):
return pywc.wc.mountpoint
def __set__(PyWCFS pywc, string v):
pywc.wc.mountpoint = v
cdef class PyWatchLink:
def __init__(PyWatchLink pywlink, PyWCFS pywc):
with nogil:
_ = wcfs_openwatch_pyexc(&pywc.wc)
pywlink.wlink = _.first
err = _.second
if err != nil:
raise pyerr(err)
def __dealloc__(PyWatchLink pywlink):
pywlink.wlink = nil
def close(PyWatchLink pywlink):
with nogil:
err = wlink_close_pyexc(pywlink.wlink)
if err != nil:
raise pyerr(err)
def closeWrite(PyWatchLink pywlink):
with nogil:
err = wlink_closeWrite_pyexc(pywlink.wlink)
if err != nil:
raise pyerr(err)
def sendReq(PyWatchLink pywlink, context.PyContext pyctx, string req): # -> reply(string)
with nogil:
_ = wlink_sendReq_pyexc(pywlink.wlink, pyctx.ctx, req)
reply = _.first
err = _.second
if err != nil:
raise pyerr(err)
return reply
def recvReq(PyWatchLink pywlink, context.PyContext pyctx): # -> PinReq | None when EOF
cdef PyPinReq pyreq = PyPinReq.__new__(PyPinReq)
with nogil:
err = wlink_recvReq_pyexc(pywlink.wlink, pyctx.ctx, &pyreq.pinreq)
if err.eq(io.EOF):
return None
if err != nil:
raise pyerr(err)
return pyreq
def replyReq(PyWatchLink pywlink, context.PyContext pyctx, PyPinReq pyreq, string reply):
with nogil:
err = wlink_replyReq_pyexc(pywlink.wlink, pyctx.ctx, &pyreq.pinreq, reply)
if err != nil:
raise pyerr(err)
return
# XXX for tests
property fatalv:
def __get__(PyWatchLink pywlink):
return pywlink.wlink.fatalv
property rx_eof:
def __get__(PyWatchLink pywlink):
return pychan.from_chan_structZ(pywlink.wlink.rx_eof)
cdef class PyPinReq:
property foid:
def __get__(PyPinReq pypin):
return p64(pypin.pinreq.foid)
property blk:
def __get__(PyPinReq pypin):
return pypin.pinreq.blk
property at:
def __get__(PyPinReq pypin):
at = pypin.pinreq.at
if at == TidHead:
return None
return p64(at)
# wcfs_test.py uses req.msg in several places
property msg:
def __get__(PyPinReq pypin):
return pypin.pinreq.msg
def _tpywlinkwrite(PyWatchLink pywlink, bytes pypkt):
cdef string pkt = pypkt
with nogil:
err = _twlinkwrite_pyexc(pywlink.wlink, pkt)
if err != nil:
raise pyerr(err)
# ---- misc ----
# pyerr converts error into python error.
cdef object pyerr(error err):
return pyerror.from_error(err)
from golang cimport topyexc
cdef nogil:
pair[WatchLink, error] wcfs_openwatch_pyexc(WCFS *wcfs) except +topyexc:
return wcfs._openwatch()
error wlink_close_pyexc(WatchLink wlink) except +topyexc:
return wlink.close()
error wlink_closeWrite_pyexc(WatchLink wlink) except +topyexc:
return wlink.closeWrite()
pair[string, error] wlink_sendReq_pyexc(WatchLink wlink, context.Context ctx, const string &req) except +topyexc:
return wlink.sendReq(ctx, req)
error wlink_recvReq_pyexc(WatchLink wlink, context.Context ctx, PinReq *prx) except +topyexc:
return wlink.recvReq(ctx, prx)
error wlink_replyReq_pyexc(WatchLink wlink, context.Context ctx, const PinReq *req, const string& reply) except +topyexc:
return wlink.replyReq(ctx, req, reply)
error _twlinkwrite_pyexc(WatchLink wlink, const string& pkt) except +topyexc:
return _twlinkwrite(wlink, pkt)
// Copyright (C) 2018-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package wcfs provides WCFS client.
#include "wcfs_misc.h"
#include "wcfs.h"
#include "wcfs_watchlink.h"
#include <golang/errors.h>
#include <golang/fmt.h>
// wcfs::
namespace wcfs {
// ---- WCFS raw file access ----
// _path returns path for object on wcfs.
// - str: wcfs root + obj;
string WCFS::_path(const string &obj) {
WCFS& wc = *this;
return wc.mountpoint + "/" + obj;
}
tuple<os::File, error> WCFS::_open(const string &path, int flags) {
WCFS& wc = *this;
string path_ = wc._path(path);
return os::open(path_, flags);
}
// ---- misc ----
string WCFS::String() const {
const WCFS& wc = *this;
return fmt::sprintf("wcfs %s", v(wc.mountpoint));
}
} // wcfs::
// Copyright (C) 2018-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package wcfs provides WCFS client.
#ifndef _NXD_WCFS_H_
#define _NXD_WCFS_H_
#include <golang/libgolang.h>
#include <tuple>
#include "wcfs_misc.h"
// wcfs::
namespace wcfs {
using namespace golang;
using std::tuple;
using std::pair;
typedef refptr<struct _WatchLink> WatchLink;
struct PinReq;
// WCFS represents filesystem-level connection to wcfs server.
//
// Use wcfs.join in Python API to create it.
//
// WCFS logically mirrors ZODB.DB .
// It is safe to use WCFS from multiple threads simultaneously.
struct WCFS {
string mountpoint;
pair<WatchLink, error> _openwatch();
string String() const;
// at OS-level, on-WCFS raw files can be accessed via ._path and ._open.
string _path(const string &obj);
tuple<os::File, error> _open(const string &path, int flags=O_RDONLY);
};
} // wcfs::
#endif
// Copyright (C) 2019-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
#include "wcfs_misc.h"
#include <golang/libgolang.h>
#include <golang/errors.h>
#include <golang/fmt.h>
#include <golang/io.h>
using namespace golang;
#include <inttypes.h>
#include <stdarg.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <algorithm>
#include <memory>
// golang::
namespace golang {
// os::
namespace os {
// TODO -> os.PathError + err=syscall.Errno
static error _pathError(const char *op, const string &path, int syserr);
static string _sysErrString(int syserr);
int _File::fd() const { return _fd; }
string _File::name() const { return _path; }
_File::_File() {}
_File::~_File() {}
void _File::decref() {
if (__decref())
delete this;
}
tuple<File, error> open(const string &path, int flags, mode_t mode) {
int fd = ::open(path.c_str(), flags, mode);
if (fd == -1)
return make_tuple(nil, _pathError("open", path, errno));
File f = adoptref(new _File);
f->_path = path;
f->_fd = fd;
return make_tuple(f, nil);
}
error _File::close() {
_File& f = *this;
int err = ::close(f._fd);
if (err != 0)
return f._errno("close");
f._fd = -1;
return nil;
}
tuple<int, error> _File::read(void *buf, size_t count) {
_File& f = *this;
int n;
n = ::read(f._fd, buf, count);
if (n == 0)
return make_tuple(n, io::EOF_);
if (n < 0)
return make_tuple(0, f._errno("read"));
return make_tuple(n, nil);
}
tuple <int, error> _File::write(const void *buf, size_t count) {
_File& f = *this;
int n, wrote=0;
// NOTE contrary to write(2) we have to write all data as io.Writer requires.
while (count != 0) {
n = ::write(f._fd, buf, count);
if (n < 0)
return make_tuple(wrote, f._errno("write"));
wrote += n;
buf = ((const char *)buf) + n;
count -= n;
}
return make_tuple(wrote, nil);
}
error _File::stat(struct stat *st) {
_File& f = *this;
int err = fstat(f._fd, st);
if (err != 0)
return f._errno("stat");
return nil;
}
// _errno returns error corresponding to op(file) and errno.
error _File::_errno(const char *op) {
_File& f = *this;
return _pathError(op, f._path, errno);
}
// _pathError returns os.PathError-like for op/path and system error
// indicated by syserr.
static error _pathError(const char *op, const string &path, int syserr) {
// TODO v(_sysErrString(syserr)) -> v(syscall.Errno(syserr))
return fmt::errorf("%s %s: %s", op, v(path), v(_sysErrString(syserr)));
}
// _sysErrString returns string corresponding to system error syserr.
static string _sysErrString(int syserr) {
char ebuf[128];
char *estr = strerror_r(syserr, ebuf, sizeof(ebuf));
return string(estr);
}
} // os::
// xstrconv:: (strconv-like)
namespace xstrconv {
// parseHex64 decodes 16-character-wide hex-encoded string into uint64.
tuple<uint64_t, error> parseHex64(const string& s) {
if (s.size() != 16)
return make_tuple(0, fmt::errorf("hex64 %s invalid", v(s)));
uint64_t v;
int n = sscanf(s.c_str(), "%16" SCNx64, &v);
if (n != 1)
return make_tuple(0, fmt::errorf("hex64 %s invalid", v(s)));
return make_tuple(v, nil);
}
// parseInt decodes string s as signed decimal integer.
tuple<int64_t, error> parseInt(const string& s) {
int64_t v;
int n = sscanf(s.c_str(), "%" SCNi64, &v);
if (!(n == 1 && std::to_string(v) == s))
return make_tuple(0, fmt::errorf("int %s invalid", v(s)));
return make_tuple(v, nil);
}
// parseUint decodes string s as unsigned decimal integer.
tuple<uint64_t, error> parseUint(const string& s) {
uint64_t v;
int n = sscanf(s.c_str(), "%" SCNu64, &v);
if (!(n == 1 && std::to_string(v) == s))
return make_tuple(0, fmt::errorf("uint %s invalid", v(s)));
return make_tuple(v, nil);
}
} // xstrconv::
} // golang::
// xerr::
namespace xerr {
// XXX don't require fmt::vsprintf
#if 0
Contextf::Contextf(const char *format, ...) {
Contextf& c = *this;
va_list argp;
va_start(argp, format);
c.errctx = fmt::sprintfv(format, argp);
va_end(argp);
}
#endif
error Contextf::operator() (error err) const {
const Contextf& c = *this;
if (err == nil)
return nil;
return fmt::errorf("%s: %w", v(c.errctx), err);
}
} // xerr::
#include <golang/time.h>
#include <time.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/syscall.h>
// golang::log::
namespace golang {
namespace log {
void __Logf(const char *file, int line, char level, const char *format, ...) {
double t = time::now();
time_t t_int = time_t(t);
struct tm tm_loc;
localtime_r(&t_int, &tm_loc);
char t_buf[32];
strftime(t_buf, sizeof(t_buf), "%m%d %H:%M:%S", &tm_loc);
int t_us = int((t-t_int)*1E6);
pid_t tid = syscall(SYS_gettid);
string prefix = fmt::sprintf("%c%s.%06d % 7d %s:%d] ", level, t_buf, t_us, tid, file, line);
// TODO better to emit prefix and msg in one go.
flockfile(stderr);
fprintf(stderr, "%s", v(prefix));
va_list argp;
va_start(argp, format);
vfprintf(stderr, format, argp);
va_end(argp);
fprintf(stderr, "\n");
funlockfile(stderr);
}
}} // golang::log::
// wcfs::
namespace wcfs {
template<> string v_(error err) {
return (err != nil) ? err->Error() : "nil";
}
static string h016(uint64_t v) { return fmt::sprintf("%016lx", v); }
template<> string v_(const zodb::Tid& tid) { return h016(tid); }
//template<> string v_(zodb::Oid oid) { return h016(oid); }
// XXX Tid and Oid are typedefs for uint64_t and C++ reduces template
// specializations to the underlying type. This providing specialization for
// both Tid and Oid results in "multiple definition" error.
} // wcfs::
// Copyright (C) 2019-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// wcfs_misc.{h,cpp} provide miscellaneous utilities for other wcfs_* files.
#ifndef _NXD_WCFS_MISC_H_
#define _NXD_WCFS_MISC_H_
// XXX hack: C++ does not have __builtin_types_compatible_p, but CCAN configure
// thinks it does because CCAN is configured via C, not C++.
#include <config.h>
#undef HAVE_BUILTIN_TYPES_COMPATIBLE_P
#define HAVE_BUILTIN_TYPES_COMPATIBLE_P 0
#include <ccan/array_size/array_size.h>
#include <stddef.h>
#include <stdint.h>
#include <golang/libgolang.h>
using namespace golang;
#include <string>
using std::string;
#include <utility>
using std::pair;
using std::make_pair;
#include <tuple>
using std::tuple;
using std::make_tuple;
using std::tie;
#include <vector>
using std::vector;
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
// golang::
namespace golang {
// os::
namespace os {
// os::File mimics os.File from Go.
// its operations return error with full file context.
typedef refptr<class _File> File;
class _File : public object {
int _fd;
string _path;
// don't new - create via open
private:
_File();
~_File();
friend tuple<File, error> open(const string &path, int flags, mode_t mode);
public:
void decref();
public:
int fd() const;
string name() const;
error close();
// read implements io.Reader from Go: it reads into buf up-to count bytes.
// XXX buf -> slice<byte> ?
tuple<int, error> read(void *buf, size_t count);
// write implements io.Writer from Go: it writes all data from buf.
//
// NOTE write behaves like io.Writer in Go - it tries to write as much
// bytes as requested, and if it could write only less - it returns error.
// XXX buf -> slice<byte> ?
tuple<int, error> write(const void *buf, size_t count);
error stat(struct stat *st);
private:
error _errno(const char *op);
};
// open opens file @path.
tuple<File, error> open(const string &path, int flags = O_RDONLY,
mode_t mode = S_IRUSR | S_IWUSR | S_IXUSR |
S_IRGRP | S_IWGRP | S_IXGRP |
S_IROTH | S_IWOTH | S_IXOTH);
} // os::
// ---- misc ----
// xstrconv::
namespace xstrconv {
tuple<uint64_t, error> parseHex64(const string& s);
tuple<int64_t, error> parseInt(const string& s);
tuple<uint64_t, error> parseUint(const string& s);
} // xstrconv::
// log::
namespace log {
#define Debugf(format, ...) __Logf(__FILE__, __LINE__, 'D', format, ##__VA_ARGS__)
#define Infof(format, ...) __Logf(__FILE__, __LINE__, 'I', format, ##__VA_ARGS__)
#define Warnf(format, ...) __Logf(__FILE__, __LINE__, 'W', format, ##__VA_ARGS__)
#define Errorf(format, ...) __Logf(__FILE__, __LINE__, 'E', format, ##__VA_ARGS__)
#define Fatalf(format, ...) __Logf(__FILE__, __LINE__, 'F', format, ##__VA_ARGS__)
void __Logf(const char *file, int line, char level, const char *format, ...);
} // log::
} // golang::
// zodb::
namespace zodb {
typedef uint64_t Tid;
typedef uint64_t Oid;
} // zodb::
#include <golang/fmt.h>
// xerr::
namespace xerr {
// xerr::Contextf mimics xerr.Contextf from Go.
//
// Usage is a bit different(*) compared to Go:
//
// func doSomething(arg) {
// xerr.Contextf E("doing something %s", v(arg));
// ...
// return E(err);
// }
//
// (*) because C++ does not allow to modify returned value on the fly.
class Contextf {
string errctx;
public:
template<typename ...Argv>
inline Contextf(const char *format, Argv... argv) {
// XXX string() to avoid "error: format not a string literal" given by -Werror=format-security
errctx = fmt::sprintf(string(format), argv...);
}
error operator() (error) const;
};
} // xerr::
// wcfs::
namespace wcfs {
// TidHead is invalid Tid which is largest Tid value and means @head.
const zodb::Tid TidHead = -1ULL;
// v mimics %v for T to be used in printf & friends.
//
// NOTE returned char* pointer is guaranteed to stay valid only till end of
// current expression. For example
//
// printf("hello %s", v(obj))
//
// is valid, while
//
// x = v(obj);
// use(x);
//
// is not valid.
#define v(obj) (wcfs::v_(obj).c_str())
template<typename T> string v_(T* obj) { return obj->String(); }
template<typename T> string v_(const T* obj) { return obj->String(); }
template<typename T> string v_(const T& obj) { return obj.String(); }
template<typename T> string v_(refptr<T> obj) { return obj->String(); }
template<> inline string v_(const string& s) { return s; }
template<> string v_(error);
template<> string v_(const zodb::Tid&);
template<> string v_(const zodb::Oid&);
} // wcfs::
#endif
This diff is collapsed.
// Copyright (C) 2018-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// wcfs_watchlink provides WatchLink class that implements message exchange
// over /head/watch on wcfs.
#ifndef _NXD_WCFS_WATCHLINK_H_
#define _NXD_WCFS_WATCHLINK_H_
#include <golang/libgolang.h>
#include <golang/context.h>
#include <golang/cxx.h>
#include <golang/sync.h>
using namespace golang;
using cxx::dict;
using cxx::set;
#include "wcfs.h"
#include "wcfs_misc.h"
// wcfs::
namespace wcfs {
struct PinReq;
// StreamID stands for ID of a stream multiplexed over WatchLink.
typedef uint64_t StreamID;
// rxPkt internally represents data of one message received over WatchLink.
struct rxPkt {
// stream over which the data was received
StreamID stream;
// raw data received/to-be-sent.
// XXX not e.g. string, as chan<T> currently does not support types with
// non-trivial copy. Note: we anyway need to limit rx line length to
// avoid DoS, but just for DoS the limit would be higher.
uint16_t datalen;
char data[256 - sizeof(StreamID) - sizeof(uint16_t)];
error from_string(const string& rx);
string to_string() const;
};
static_assert(sizeof(rxPkt) == 256, "rxPkt miscompiled"); // NOTE 128 is too low for long error message
// WatchLink represents /head/watch link opened on wcfs.
//
// It is created by WCFS._openwatch().
//
// .sendReq()/.recvReq() provides raw IO in terms of wcfs isolation protocol messages.
// .close() closes the link.
//
// It is safe to use WatchLink from multiple threads simultaneously.
typedef refptr<class _WatchLink> WatchLink;
class _WatchLink : public object {
WCFS *_wc;
os::File _f; // head/watch file handle
string _rxbuf; // buffer for data already read from _f
// iso.protocol message IO
chan<rxPkt> _acceptq; // server originated messages go here
sync::Mutex _rxmu;
bool _down; // y when the link is no-longer operational
bool _rxeof; // y if EOF was received from server
dict<StreamID, chan<rxPkt>>
_rxtab; // {} stream -> rxq server replies go via here
set<StreamID> _accepted; // streams we accepted but did not replied yet
StreamID _req_next; // stream ID for next client-originated request TODO -> atomic
sync::Mutex _txmu; // serializes writes
sync::Once _txclose1;
sync::WorkGroup _serveWG; // _serveRX is running under _serveWG
func<void()> _serveCancel;
// XXX for tests
public:
vector<string> fatalv; // ad-hoc, racy. TODO rework to send messages to control channel
chan<structZ> rx_eof; // becomes ready when wcfs closes its tx side
// don't new - create only via WCFS._openwatch()
private:
_WatchLink();
virtual ~_WatchLink();
friend pair<WatchLink, error> WCFS::_openwatch();
public:
void incref();
void decref();
public:
error close();
error closeWrite();
pair<string, error> sendReq(context::Context ctx, const string &req);
error recvReq(context::Context ctx, PinReq *rx_into);
error replyReq(context::Context ctx, const PinReq *req, const string& reply);
string String() const;
int fd() const;
private:
error _serveRX(context::Context ctx);
tuple<string, error> _readline();
error _send(StreamID stream, const string &msg);
error _write(const string &pkt);
StreamID _nextReqID();
tuple<chan<rxPkt>, error> _sendReq(context::Context ctx, StreamID stream, const string &req);
friend error _twlinkwrite(WatchLink wlink, const string &pkt);
};
// PinReq represents 1 server-initiated wcfs pin request received over /head/watch link.
struct PinReq {
StreamID stream; // request was received with this stream ID
zodb::Oid foid; // request is about this file
int64_t blk; // ----//---- about this block
zodb::Tid at; // pin to this at; TidHead means unpin to head
string msg; // XXX raw message for tests (TODO kill)
};
// for testing
error _twlinkwrite(WatchLink wlink, const string &pkt);
} // wcfs::
#endif
......@@ -144,6 +144,18 @@ def map_ro(int fd, off_t offset, size_t size):
return <unsigned char[:size:1]>addr
# map_into_ro is similar to map_ro, but mmaps fd[offset:...] into mem's memory.
def map_into_ro(unsigned char[::1] mem not None, int fd, off_t offset):
cdef void *addr = &mem[0]
cdef size_t size = mem.shape[0]
addr = mman.mmap(addr, size, mman.PROT_READ, mman.MAP_FIXED |
mman.MAP_SHARED, fd, offset)
if addr == mman.MAP_FAILED:
PyErr_SetFromErrno(OSError)
return
# unmap unmaps memory covered by mem.
def unmap(const unsigned char[::1] mem not None):
cdef const void *addr = &mem[0]
......
......@@ -23,15 +23,140 @@
"""Module wcfs_test.pyx complements wcfs_test.py with things that cannot be
implemented in Python."""
from posix.signal cimport sigaction, sigaction_t, siginfo_t, SA_SIGINFO
from libc.signal cimport SIGBUS
from posix.signal cimport sigaction, sigaction_t, siginfo_t, SA_SIGINFO, sigemptyset
from libc.signal cimport SIGBUS, SIGSEGV
from libc.setjmp cimport sigjmp_buf, sigsetjmp, siglongjmp
from libc.stdlib cimport abort
from libc.string cimport strlen
from posix.unistd cimport write, sleep
from posix.types cimport off_t
from cpython.exc cimport PyErr_SetFromErrno
from golang cimport panic
from golang cimport chan, pychan, select, panic, topyexc, cbool
from golang cimport sync, time
# _tWCFS is pyx part of tWCFS.
cdef class _tWCFS:
cdef readonly pychan _closed # chan[structZ]
cdef readonly pychan _wcfuseaborted # chan[structZ]
def __cinit__(_tWCFS t):
t._closed = pychan(dtype='C.structZ')
t._wcfuseaborted = pychan(dtype='C.structZ')
# _abort_ontimeout sends abort to fuse control file if timeout happens
# before tDB is closed.
#
# It runs without GIL to avoid deadlock: e.g. if a code that is
# holding GIL will access wcfs-mmapped memory, and wcfs will send pin,
# but pin handler is failing one way or another - select will wake-up
# but, if _abort_ontimeout uses GIL, won't continue to run trying to lock
# GIL -> deadlock.
def _abort_ontimeout(_tWCFS t, int fdabort, double dt, pychan nogilready not None):
cdef chan[double] timeoutch = time.after(dt)
emsg1 = "\nC: test timed out after %.1fs\n" % (dt / time.second)
cdef char *_emsg1 = emsg1
with nogil:
# tell main thread that we entered nogil world
nogilready.chan_structZ().close()
t.__abort_ontimeout(dt, timeoutch, fdabort, _emsg1)
cdef void __abort_ontimeout(_tWCFS t, double dt, chan[double] timeoutch,
int fdabort, const char *emsg1) nogil except +topyexc:
_ = select([
timeoutch.recvs(), # 0
t._closed.chan_structZ().recvs(), # 1
])
if _ == 1:
return # tDB closed = testcase completed
# timeout -> force-umount wcfs
writeerr(emsg1)
writeerr("-> aborting wcfs fuse connection to unblock ...\n\n")
xwrite(fdabort, b"1\n")
t._wcfuseaborted.chan_structZ().close()
# read_exfault_nogil reads mem with GIL released and returns its content.
#
# If reading hits segmentation fault, it is converted to SegmentationFault exception.
class SegmentationFault(Exception): pass
cdef sync.Mutex exfaultMu # one at a time as sigaction is per-process
cdef sigjmp_buf exfaultJmp
cdef cbool faulted
def read_exfault_nogil(const unsigned char[::1] mem not None) -> bytes:
assert len(mem) == 1, "read_exfault_nogil: only [1] mem is supported for now"
cdef unsigned char b
global faulted
cdef cbool faulted_
# somewhat dup of MUST_FAULT in test_virtmem.c
with nogil:
exfaultMu.lock()
faulted = False
try:
with nogil:
b = _read_exfault(&mem[0])
finally:
faulted_ = faulted
with nogil:
exfaultMu.unlock()
if faulted_:
raise SegmentationFault()
return bytes(bytearray([b]))
cdef void exfaultSighand(int sig) nogil:
# return from sighandler to proper place with faulted=True
global faulted
faulted = True
siglongjmp(exfaultJmp, 1)
cdef unsigned char _read_exfault(const unsigned char *p) nogil except +topyexc:
global faulted
cdef sigaction_t act, saveact
act.sa_handler = exfaultSighand
act.sa_flags = 0
err = sigemptyset(&act.sa_mask)
if err != 0:
panic("sigemptyset: failed")
err = sigaction(SIGSEGV, &act, &saveact)
if err != 0:
panic("sigaction SIGSEGV -> exfaultSighand: failed")
b = 0xff
if sigsetjmp(exfaultJmp, 1) == 0:
b = p[0] # should pagefault -> sighandler does longjmp
else:
# faulted
if not faulted:
panic("faulted, but !faulted")
err = sigaction(SIGSEGV, &saveact, NULL)
if err != 0:
panic("sigaction SIGSEGV <- restore: failed")
return b
# --------
cdef extern from "<fcntl.h>" nogil:
int posix_fadvise(int fd, off_t offset, off_t len, int advice);
enum: POSIX_FADV_DONTNEED
# fadvise_dontneed tells the kernel that file<fd>[offset +len) is not needed.
#
# see fadvise(2) for details.
def fadvise_dontneed(int fd, off_t offset, off_t len):
cdef int err = posix_fadvise(fd, offset, len, POSIX_FADV_DONTNEED)
if err:
PyErr_SetFromErrno(OSError)
# ---- signal handling ----
# TODO -> golang.signal ?
......
......@@ -140,7 +140,9 @@ package xbtree
// from that job, it first waits for corresponding job(s) to complete.
//
// Explained rebuild organization allows non-overlapping queries/track-requests
// to run simultaneously.
// to run simultaneously. This property is essential to WCFS because otherwise
// WCFS would not be able to serve several non-overlapping READ requests to one
// file in parallel.
//
// --------
//
......
......@@ -85,7 +85,9 @@ package zdata
// Track/queries requests for long.
//
// Combined this organization allows non-overlapping queries/track-requests
// to run simultaneously.
// to run simultaneously. This property is essential to WCFS because otherwise
// WCFS would not be able to serve several non-overlapping READ requests to one
// file in parallel.
//
// See also "Concurrency" in ΔBtail organization for more details.
......
......@@ -25,6 +25,8 @@ import (
"fmt"
"io"
"math"
"strconv"
"strings"
"sync/atomic"
"syscall"
......@@ -35,6 +37,8 @@ import (
"github.com/pkg/errors"
"lab.nexedi.com/kirr/go123/xio"
"lab.nexedi.com/kirr/neo/go/zodb"
)
// ---- FUSE ----
......@@ -105,7 +109,7 @@ func err2LogStatus(err error) fuse.Status {
// from any single node will make the kernel think that the filesystem does not
// support Open at all.
//
// In wcfs we have dynamic files (e.g. upcoming /head/watch) and this way we have to
// In wcfs we have dynamic files (e.g. /head/watch) and this way we have to
// avoid returning ENOSYS on nodes, that do not need file handles.
//
// fsNode is like nodefs.defaultNode, but by default Open returns to kernel
......@@ -424,6 +428,67 @@ func (f *skFile) Release() {
}
// ---- parsing ----
// parseWatchFrame parses line going through /head/watch into (stream, msg)
//
// <stream> <msg...>
func parseWatchFrame(line string) (stream uint64, msg string, err error) {
sp := strings.IndexByte(line, ' ')
if sp == -1 {
return 0, "", fmt.Errorf("invalid frame: %q", line)
}
stream, err = strconv.ParseUint(line[:sp], 10, 64)
if err != nil {
return 0, "", fmt.Errorf("invalid frame: %q (invalid stream)", line)
}
msg = strings.TrimSuffix(line[sp+1:], "\n")
return stream, msg, nil
}
// parseWatch parses watch request wcfs received over /head/watch.
//
// watch <file> (@<at>|-)
//
// at="-" is returned as zodb.InvalidTid .
func parseWatch(msg string) (oid zodb.Oid, at zodb.Tid, err error) {
defer func() {
if err != nil {
oid = zodb.InvalidOid
at = zodb.InvalidTid
err = fmt.Errorf("bad watch: %s", err)
}
}()
if !strings.HasPrefix(msg, "watch ") {
return 0, 0, fmt.Errorf("not a watch request: %q", msg)
}
argv := strings.Split(msg[len("watch "):], " ")
if len(argv) != 2 {
return 0, 0, fmt.Errorf("expected 2 arguments, got %d", len(argv))
}
oid, err = zodb.ParseOid(argv[0])
if err != nil {
return 0, 0, fmt.Errorf("invalid oid")
}
switch {
case argv[1] == "-":
at = zodb.InvalidTid
case strings.HasPrefix(argv[1], "@"):
at, err = zodb.ParseTid(argv[1][1:])
default:
err = fmt.Errorf("x") // just anything
}
if err != nil {
return 0, 0, fmt.Errorf("invalid at")
}
return oid, at, nil
}
// ---- make df happy (else it complains "function not supported") ----
func (root *Root) StatFs() *fuse.StatfsOut {
......
......@@ -39,6 +39,143 @@ part of Linux 5.2:
https://git.kernel.org/linus/ad2ba64dd489
Invalidations to wcfs clients are delayed until block access
============================================================
Initially it was planned that wcfs would send invalidation messages to its
clients right after receiving invalidation message from ZODB at transaction
boundary time. That simplifies logic but requires that for a particular file,
wcfs has to send to clients whole range of where the file was changed.
Emitting whole δR right at transaction-boundary time requires to keep whole
ZBigFile.blktab index in RAM. Even though from space point of view it is
somewhat acceptable (~ 0.01% of whole-file data size, i.e. ~ 128MB of index for
~ 1TB of data), it is not good from time overhead point of view - initial open
of a file this way would be potentially slow as full blktab scan - including
Trees _and_ Buckets nodes - would be required.
-> we took the approach where we send invalidation to client about a block
lazily only when the block is actually accessed.
Rejected alternative:
Building δFtail lazily along serving FUSE reads during scope of one
transaction is not trivial and would create concurrency bottlenecks if simple
locking scheme is used. With the main difficulty being to populate tracking set
of δBtree lazily. However as the first approach we could still build complete
tracking set for a BTree at the time of file open: we need to scan through all
trees but _not_ buckets: this way we'll know oid of all tree nodes: trees _and_
buckets, while avoiding loading buckets makes this approach practical: with
default LOBTree settings (1 bucket = 60·objects, 1 tree = 500·buckets) it will
require ~ 20 trees to cover 1TB of data. And we can scan those trees very
quickly even if doing so serially. For 1PB of data it will require to scan ~
10⁴ trees. If RTT to load 1 object is ~1ms this will become 10 seconds if done
serially. However if we load all those tree objects in parallel it will be
much less. Still the number of trees to scan is linear to the amount of data.
-> rejected: ΔFtail and ΔBtail were instead fixed to allow several Track and
queries requests to run in parallel. See "Concurrency" section in ΔFtail/ΔBtail
organization overview.
Changing mmapping while under pagefault is possible
===================================================
We can change a mapping while a page from it is under pagefault:
- the kernel, upon handling pagefault, queues read request to filesystem
server. As of Linux 4.20 this is done _with_ holding client->mm->mmap_sem:
kprobe:fuse_readpages (client->mm->mmap_sem.count: 1)
fuse_readpages+1
read_pages+109
__do_page_cache_readahead+401
filemap_fault+635
__do_fault+31
__handle_mm_fault+3403
handle_mm_fault+220
__do_page_fault+598
page_fault+30
- however the read request is queued to be performed asynchronously -
the kernel does not wait for it in fuse_readpages, because
* git.kernel.org/linus/c1aa96a5,
* git.kernel.org/linus/9cd68455,
* and go-fuse initially negotiating CAP_ASYNC_READ to the kernel.
- the kernel then _releases_ client->mm->mmap_sem and then waits
for to-read pages to become ready:
* https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/mm/filemap.c?id=v4.20-rc3-83-g06e68fed3282#n2411
* https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/mm/filemap.c?id=v4.20-rc3-83-g06e68fed3282#n2457
* https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/mm/filemap.c?id=v4.20-rc3-83-g06e68fed3282#n1301
- the filesystem server, upon receiving the read request, can manipulate
client's address space. This requires to write-lock client->mm->mmap_sem,
but we can be sure it won't deadlock because the kernel releases it
before waiting (see previous point).
in practice the manipulation is done by another client thread, because
on Linux it is not possible to change mm of another process. However
the main point here is that the manipulation is possible because
there will be no deadlock on client->mm->mmap_sem.
For the reference here is how filesystem server reply looks under trace:
kprobe:fuse_readpages_end
fuse_readpages_end+1
request_end+188
fuse_dev_do_write+1921
fuse_dev_write+78
do_iter_readv_writev+325
do_iter_write+128
vfs_writev+152
do_writev+94
do_syscall_64+85
entry_SYSCALL_64_after_hwframe+68
and a test program that demonstrates that it is possible to change
mmapping while under pagefault to it:
https://lab.nexedi.com/kirr/go-fuse/commit/f822c9db
Starting from Linux 5.1 mmap_sem should be generally released while doing any IO:
https://git.kernel.org/linus/6b4c9f4469
but before that the analysis remains FUSE-specific.
The property that changing mmapping while under pagefault is possible is
verified by wcfs testsuite in `test_wcfs_remmap_on_pin` test.
Client cannot be ptraced while under pagefault
==============================================
We cannot use ptrace to run code on client thread that is under pagefault:
The kernel sends SIGSTOP to interrupt tracee, but the signal will be
processed only when the process returns from kernel space, e.g. here
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/arch/x86/entry/common.c?id=v4.19-rc8-151-g23469de647c4#n160
This way the tracer won't receive obligatory information that tracee
stopped (via wait...) and even though ptrace(ATTACH) succeeds, all other
ptrace commands will fail:
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/kernel/ptrace.c?id=v4.19-rc8-151-g23469de647c4#n1140
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/kernel/ptrace.c?id=v4.19-rc8-151-g23469de647c4#n207
My original idea was to use ptrace to run code in process to change it's
memory mappings, while the triggering process is under pagefault/read
to wcfs, and the above shows it won't work - trying to ptrace the
client from under wcfs will just block forever (the kernel will be
waiting for read operation to finish for ptrace, and read will be first
waiting on ptrace stopping to complete = deadlock)
Kernel locks page on read/cache store/... - we have to be careful not to deadlock
=================================================================================
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2019-2021 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Program wcfs_readcancel is helper for wcfs_test to verify that
sysread(/head/watch) is unblocked and canceled when kernel asks WCFS to cancel
that read request.
Without proper FUSE INTERRUPT handling on WCFS side, such reads are not
cancelled, which results in processes that were aborted or even `kill-9`ed being
stuck forever waiting for WCFS to release them.
"""
from __future__ import print_function, absolute_import
from golang import select, default
from golang import context, sync, time
import os, sys
def main():
wcfs_root = sys.argv[1]
f = open("%s/head/watch" % wcfs_root)
wg = sync.WorkGroup(context.background())
def _(ctx):
data = f.read() # should block forever
raise AssertionError("read: woken up: data=%r" % data)
wg.go(_)
def _(ctx):
time.sleep(100*time.millisecond)
_, _rx = select(
default, # 0
ctx.done().recv, # 1
)
if _ == 1:
raise ctx.err()
os._exit(0)
wg.go(_)
wg.wait()
raise AssertionError("should be unreachable")
if __name__ == '__main__':
main()
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment