Commit d75b2304 authored by Kirill Smelkov's avatar Kirill Smelkov

X wcfs: Move _abort_ontimeout to pyx/nogil

parent 5e60ad89
......@@ -17,10 +17,9 @@
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from setuptools import setup, Extension, Command, find_packages
from golang.pyx.build import setup, Extension as PyGoExt, build_ext as _build_ext
from setuptools import Extension, Command, find_packages
from setuptools.command.build_py import build_py as _build_py
from setuptools.command.build_ext import build_ext as _build_ext
from Cython.Build import cythonize # XXX -> golang.pyx.build.Extension ?
from pkg_resources import working_set, EntryPoint
from distutils.errors import DistutilsExecError
from subprocess import Popen, PIPE
......@@ -230,7 +229,18 @@ setup(
keywords = 'bigdata out-of-core numpy virtual-memory',
ext_modules = [_bigfile] + cythonize("wcfs/internal/*.pyx"),
ext_modules = [
_bigfile,
PyGoExt('wcfs.internal.wcfs_test',
['wcfs/internal/wcfs_test.pyx']),
Extension('wcfs.internal.io',
['wcfs/internal/io.pyx']),
Extension('wcfs.internal.mm',
['wcfs/internal/mm.pyx']),
],
package_dir = {'wendelin': ''},
packages = ['wendelin'] + ['wendelin.%s' % _ for _ in
......
......@@ -179,7 +179,7 @@ def _pinner(wconn, ctx):
if exc in (None, context.canceled): # canceled = .close asks pinner to stop
return
log.critical('pinner failed:', exc_info=1)
print('CRITICAL: pinned failed:', file=sys.stderr)
print('CRITICAL: pinner failed:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
print('\nCRITICAL: wcfs server will likely kill us soon.', file=sys.stderr)
defer(_)
......
/io.c
/mm.c
/wcfs_test.c
/wcfs_test.cpp
......@@ -18,8 +18,9 @@
# See https://www.nexedi.com/licensing for rationale and options.
# cython: language_level=2
# distutils: language=c++
"""Module wcfs_test complements wcfs_test.py with things that are impossible to implement in Python"""
"""Module wcfs_test complements wcfs_test.py with things that are not to implement in Python"""
from posix.signal cimport sigaction, sigaction_t, siginfo_t, SA_SIGINFO
from libc.signal cimport SIGBUS
......@@ -29,11 +30,51 @@ from posix.unistd cimport write, sleep
from posix.types cimport off_t
from cpython.exc cimport PyErr_SetFromErrno
#from cpython.pystate cimport PyGILState_Ensure, PyGILState_Release, PyGILState_STATE
# XXX -> cpython.lifecycle? .run ?
cdef extern from "Python.h":
int PyRun_SimpleString(const char *)
from golang cimport pychan, select, panic, topyexc
from golang import time
# _tDB is pyx part of tDB.
cdef class _tDB:
cdef readonly pychan _closed # chan[structZ]
cdef readonly pychan _wcfuseaborted # chan[structZ]
def __cinit__(_tDB t):
t._closed = pychan(dtype='C.structZ')
t._wcfuseaborted = pychan(dtype='C.structZ')
# _abort_ontimeout sends abort to fuse control file if timeout happens
# before tDB is closed.
#
# It runs without GIL to avoid deadlock: e.g. if a code that is
# holding GIL will access wcfs-mmapped memory, and wcfs will send pin,
# but pin handler is failing one way or another - select will wake-up
# but, if _abort_ontimeout uses GIL, won't continue to run trying to lock
# GIL -> deadlock.
def _abort_ontimeout(_tDB t, double dt, pychan nogilready not None):
cdef pychan timeoutch = time.after(dt)
cdef int fdabort = t._wcfuseabort.fileno()
emsg1 = "\nC: test timed out after %.1fs\n" % (dt / time.second)
cdef char *_emsg1 = emsg1
with nogil:
# tell main thread that we entered nogil world
nogilready.chan_structZ().close()
t.__abort_ontimeout(dt, timeoutch, fdabort, _emsg1)
cdef void __abort_ontimeout(_tDB t, double dt, pychan timeoutch,
int fdabort, const char *emsg1) nogil except +topyexc:
_ = select([
timeoutch.chan_double().recvs(), # 0
t._closed.chan_structZ().recvs(), # 1
])
if _ == 1:
return # tDB closed = testcase completed
# timeout -> force-umount wcfs
writeerr(emsg1)
writeerr("-> aborting wcfs fuse connection to unblock ...\n\n")
xwrite(fdabort, b"1\n")
t._wcfuseaborted.chan_structZ().close()
# read_nogil reads mem with GIL released and returns its content.
def read_nogil(const unsigned char[::1] mem not None) -> bytes:
......@@ -94,10 +135,14 @@ cdef void on_sigbus(int sig, siginfo_t *si, void *_uc) nogil:
# writeerr writes msg to stderr without depending on stdio buffering and locking.
cdef void writeerr(const char *msg) nogil:
xwrite(2, msg)
# xwrite writes msg to fd without depending on stdio buffering and locking.
cdef void xwrite(int fd, const char *msg) nogil:
cdef ssize_t n, left = strlen(msg)
while left > 0:
n = write(2, msg, left)
n = write(fd, msg, left)
if n == -1:
return # nothing we can do under crash?
panic("write: failed")
left -= n
msg += n
......@@ -52,7 +52,7 @@ import pytest; xfail = pytest.mark.xfail
from pytest import raises, fail
from six import reraise
from .internal import io, mm
from .internal.wcfs_test import read_nogil, install_sigbus_trap, fadvise_dontneed
from .internal.wcfs_test import _tDB, read_nogil, install_sigbus_trap, fadvise_dontneed
# XXX `py.test -v` -> WENDELIN_CORE_WCFS_OPTIONS += -v=1?
......@@ -213,7 +213,7 @@ class DFile:
# tDB must be explicitly closed once no longer used.
#
# XXX print -> t.trace/debug() + t.verbose depending on py.test -v -v ?
class tDB:
class tDB(_tDB):
@func
def __init__(t):
t.root = testdb.dbopen()
......@@ -234,10 +234,10 @@ class tDB:
# cases, when wcfs, even after receiving `kill -9`, will be stuck in kernel.
# ( git.kernel.org/linus/a131de0a482a makes in-kernel FUSE client to
# still wait for request completion even after fatal signal )
t._closed = chan()
t._wcfuseaborted = chan()
t._wcfuseabort = open("/sys/fs/fuse/connections/%d/abort" % os.stat(testmntpt).st_dev, "w")
go(t._abort_ontimeout, 10*time.second) # NOTE must be: with_timeout << · << wcfs_pin_timeout
nogilready = chan(dtype='C.structZ')
go(t._abort_ontimeout, 10*time.second, nogilready) # NOTE must be: with_timeout << · << wcfs_pin_timeout
nogilready.recv() # wait till _abort_ontimeout enters nogil
# ZBigFile(s) scheduled for commit
t._changed = {} # ZBigFile -> {} blk -> data
......@@ -272,25 +272,7 @@ class tDB:
def head(t):
return t.dFtail[-1].rev
def _abort_ontimeout(t, dt):
# XXX better run whole this function withou GIL - if a code that is
# holding GIL will access wcfs-mmapped memory, and wcfs will send pin,
# but pin handler is failing one way or another - select will wake-up
# but won't continue to run trying to lock GIL -> deadlock.
_, _rx = select(
time.after(dt).recv, # 0
t._closed.recv, # 1
)
if _ == 1:
return # tDB closed = testcase completed
# timeout -> force-umount wcfs
eprint("\nC: test timed out after %.1fs" % (dt / time.second))
eprint("-> aborting wcfs fuse connection to unblock ...\n")
t._wcfuseabort.write(b"1\n")
t._wcfuseabort.flush()
t._wcfuseaborted.close()
# _abort_ontimeout is in wcfs_test.pyx
# close closes test database as well as all tracked files, watch links and wcfs.
# it also prints change history to help developer overview current testcase.
......@@ -1832,7 +1814,7 @@ def tidfromtime(t):
return ts.raw()
# verify that tidtime is precise enough to show difference in between transactions.
# verify that tidtime -> tidfromtime is identity withing rounding tolerance.
# verify that tidtime -> tidfromtime is identity within rounding tolerance.
@func
def test_tidtime():
t = tDB()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment