Commit d75b2304 authored by Kirill Smelkov's avatar Kirill Smelkov

X wcfs: Move _abort_ontimeout to pyx/nogil

parent 5e60ad89
...@@ -17,10 +17,9 @@ ...@@ -17,10 +17,9 @@
# #
# See COPYING file for full licensing terms. # See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options. # See https://www.nexedi.com/licensing for rationale and options.
from setuptools import setup, Extension, Command, find_packages from golang.pyx.build import setup, Extension as PyGoExt, build_ext as _build_ext
from setuptools import Extension, Command, find_packages
from setuptools.command.build_py import build_py as _build_py from setuptools.command.build_py import build_py as _build_py
from setuptools.command.build_ext import build_ext as _build_ext
from Cython.Build import cythonize # XXX -> golang.pyx.build.Extension ?
from pkg_resources import working_set, EntryPoint from pkg_resources import working_set, EntryPoint
from distutils.errors import DistutilsExecError from distutils.errors import DistutilsExecError
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
...@@ -230,7 +229,18 @@ setup( ...@@ -230,7 +229,18 @@ setup(
keywords = 'bigdata out-of-core numpy virtual-memory', keywords = 'bigdata out-of-core numpy virtual-memory',
ext_modules = [_bigfile] + cythonize("wcfs/internal/*.pyx"), ext_modules = [
_bigfile,
PyGoExt('wcfs.internal.wcfs_test',
['wcfs/internal/wcfs_test.pyx']),
Extension('wcfs.internal.io',
['wcfs/internal/io.pyx']),
Extension('wcfs.internal.mm',
['wcfs/internal/mm.pyx']),
],
package_dir = {'wendelin': ''}, package_dir = {'wendelin': ''},
packages = ['wendelin'] + ['wendelin.%s' % _ for _ in packages = ['wendelin'] + ['wendelin.%s' % _ for _ in
......
...@@ -179,7 +179,7 @@ def _pinner(wconn, ctx): ...@@ -179,7 +179,7 @@ def _pinner(wconn, ctx):
if exc in (None, context.canceled): # canceled = .close asks pinner to stop if exc in (None, context.canceled): # canceled = .close asks pinner to stop
return return
log.critical('pinner failed:', exc_info=1) log.critical('pinner failed:', exc_info=1)
print('CRITICAL: pinned failed:', file=sys.stderr) print('CRITICAL: pinner failed:', file=sys.stderr)
traceback.print_exc(file=sys.stderr) traceback.print_exc(file=sys.stderr)
print('\nCRITICAL: wcfs server will likely kill us soon.', file=sys.stderr) print('\nCRITICAL: wcfs server will likely kill us soon.', file=sys.stderr)
defer(_) defer(_)
......
/io.c /io.c
/mm.c /mm.c
/wcfs_test.c /wcfs_test.cpp
...@@ -18,8 +18,9 @@ ...@@ -18,8 +18,9 @@
# See https://www.nexedi.com/licensing for rationale and options. # See https://www.nexedi.com/licensing for rationale and options.
# cython: language_level=2 # cython: language_level=2
# distutils: language=c++
"""Module wcfs_test complements wcfs_test.py with things that are impossible to implement in Python""" """Module wcfs_test complements wcfs_test.py with things that are not to implement in Python"""
from posix.signal cimport sigaction, sigaction_t, siginfo_t, SA_SIGINFO from posix.signal cimport sigaction, sigaction_t, siginfo_t, SA_SIGINFO
from libc.signal cimport SIGBUS from libc.signal cimport SIGBUS
...@@ -29,11 +30,51 @@ from posix.unistd cimport write, sleep ...@@ -29,11 +30,51 @@ from posix.unistd cimport write, sleep
from posix.types cimport off_t from posix.types cimport off_t
from cpython.exc cimport PyErr_SetFromErrno from cpython.exc cimport PyErr_SetFromErrno
#from cpython.pystate cimport PyGILState_Ensure, PyGILState_Release, PyGILState_STATE
# XXX -> cpython.lifecycle? .run ? from golang cimport pychan, select, panic, topyexc
cdef extern from "Python.h": from golang import time
int PyRun_SimpleString(const char *)
# _tDB is pyx part of tDB.
cdef class _tDB:
cdef readonly pychan _closed # chan[structZ]
cdef readonly pychan _wcfuseaborted # chan[structZ]
def __cinit__(_tDB t):
t._closed = pychan(dtype='C.structZ')
t._wcfuseaborted = pychan(dtype='C.structZ')
# _abort_ontimeout sends abort to fuse control file if timeout happens
# before tDB is closed.
#
# It runs without GIL to avoid deadlock: e.g. if a code that is
# holding GIL will access wcfs-mmapped memory, and wcfs will send pin,
# but pin handler is failing one way or another - select will wake-up
# but, if _abort_ontimeout uses GIL, won't continue to run trying to lock
# GIL -> deadlock.
def _abort_ontimeout(_tDB t, double dt, pychan nogilready not None):
cdef pychan timeoutch = time.after(dt)
cdef int fdabort = t._wcfuseabort.fileno()
emsg1 = "\nC: test timed out after %.1fs\n" % (dt / time.second)
cdef char *_emsg1 = emsg1
with nogil:
# tell main thread that we entered nogil world
nogilready.chan_structZ().close()
t.__abort_ontimeout(dt, timeoutch, fdabort, _emsg1)
cdef void __abort_ontimeout(_tDB t, double dt, pychan timeoutch,
int fdabort, const char *emsg1) nogil except +topyexc:
_ = select([
timeoutch.chan_double().recvs(), # 0
t._closed.chan_structZ().recvs(), # 1
])
if _ == 1:
return # tDB closed = testcase completed
# timeout -> force-umount wcfs
writeerr(emsg1)
writeerr("-> aborting wcfs fuse connection to unblock ...\n\n")
xwrite(fdabort, b"1\n")
t._wcfuseaborted.chan_structZ().close()
# read_nogil reads mem with GIL released and returns its content. # read_nogil reads mem with GIL released and returns its content.
def read_nogil(const unsigned char[::1] mem not None) -> bytes: def read_nogil(const unsigned char[::1] mem not None) -> bytes:
...@@ -94,10 +135,14 @@ cdef void on_sigbus(int sig, siginfo_t *si, void *_uc) nogil: ...@@ -94,10 +135,14 @@ cdef void on_sigbus(int sig, siginfo_t *si, void *_uc) nogil:
# writeerr writes msg to stderr without depending on stdio buffering and locking. # writeerr writes msg to stderr without depending on stdio buffering and locking.
cdef void writeerr(const char *msg) nogil: cdef void writeerr(const char *msg) nogil:
xwrite(2, msg)
# xwrite writes msg to fd without depending on stdio buffering and locking.
cdef void xwrite(int fd, const char *msg) nogil:
cdef ssize_t n, left = strlen(msg) cdef ssize_t n, left = strlen(msg)
while left > 0: while left > 0:
n = write(2, msg, left) n = write(fd, msg, left)
if n == -1: if n == -1:
return # nothing we can do under crash? panic("write: failed")
left -= n left -= n
msg += n msg += n
...@@ -52,7 +52,7 @@ import pytest; xfail = pytest.mark.xfail ...@@ -52,7 +52,7 @@ import pytest; xfail = pytest.mark.xfail
from pytest import raises, fail from pytest import raises, fail
from six import reraise from six import reraise
from .internal import io, mm from .internal import io, mm
from .internal.wcfs_test import read_nogil, install_sigbus_trap, fadvise_dontneed from .internal.wcfs_test import _tDB, read_nogil, install_sigbus_trap, fadvise_dontneed
# XXX `py.test -v` -> WENDELIN_CORE_WCFS_OPTIONS += -v=1? # XXX `py.test -v` -> WENDELIN_CORE_WCFS_OPTIONS += -v=1?
...@@ -213,7 +213,7 @@ class DFile: ...@@ -213,7 +213,7 @@ class DFile:
# tDB must be explicitly closed once no longer used. # tDB must be explicitly closed once no longer used.
# #
# XXX print -> t.trace/debug() + t.verbose depending on py.test -v -v ? # XXX print -> t.trace/debug() + t.verbose depending on py.test -v -v ?
class tDB: class tDB(_tDB):
@func @func
def __init__(t): def __init__(t):
t.root = testdb.dbopen() t.root = testdb.dbopen()
...@@ -234,10 +234,10 @@ class tDB: ...@@ -234,10 +234,10 @@ class tDB:
# cases, when wcfs, even after receiving `kill -9`, will be stuck in kernel. # cases, when wcfs, even after receiving `kill -9`, will be stuck in kernel.
# ( git.kernel.org/linus/a131de0a482a makes in-kernel FUSE client to # ( git.kernel.org/linus/a131de0a482a makes in-kernel FUSE client to
# still wait for request completion even after fatal signal ) # still wait for request completion even after fatal signal )
t._closed = chan()
t._wcfuseaborted = chan()
t._wcfuseabort = open("/sys/fs/fuse/connections/%d/abort" % os.stat(testmntpt).st_dev, "w") t._wcfuseabort = open("/sys/fs/fuse/connections/%d/abort" % os.stat(testmntpt).st_dev, "w")
go(t._abort_ontimeout, 10*time.second) # NOTE must be: with_timeout << · << wcfs_pin_timeout nogilready = chan(dtype='C.structZ')
go(t._abort_ontimeout, 10*time.second, nogilready) # NOTE must be: with_timeout << · << wcfs_pin_timeout
nogilready.recv() # wait till _abort_ontimeout enters nogil
# ZBigFile(s) scheduled for commit # ZBigFile(s) scheduled for commit
t._changed = {} # ZBigFile -> {} blk -> data t._changed = {} # ZBigFile -> {} blk -> data
...@@ -272,25 +272,7 @@ class tDB: ...@@ -272,25 +272,7 @@ class tDB:
def head(t): def head(t):
return t.dFtail[-1].rev return t.dFtail[-1].rev
# _abort_ontimeout is in wcfs_test.pyx
def _abort_ontimeout(t, dt):
# XXX better run whole this function withou GIL - if a code that is
# holding GIL will access wcfs-mmapped memory, and wcfs will send pin,
# but pin handler is failing one way or another - select will wake-up
# but won't continue to run trying to lock GIL -> deadlock.
_, _rx = select(
time.after(dt).recv, # 0
t._closed.recv, # 1
)
if _ == 1:
return # tDB closed = testcase completed
# timeout -> force-umount wcfs
eprint("\nC: test timed out after %.1fs" % (dt / time.second))
eprint("-> aborting wcfs fuse connection to unblock ...\n")
t._wcfuseabort.write(b"1\n")
t._wcfuseabort.flush()
t._wcfuseaborted.close()
# close closes test database as well as all tracked files, watch links and wcfs. # close closes test database as well as all tracked files, watch links and wcfs.
# it also prints change history to help developer overview current testcase. # it also prints change history to help developer overview current testcase.
...@@ -1832,7 +1814,7 @@ def tidfromtime(t): ...@@ -1832,7 +1814,7 @@ def tidfromtime(t):
return ts.raw() return ts.raw()
# verify that tidtime is precise enough to show difference in between transactions. # verify that tidtime is precise enough to show difference in between transactions.
# verify that tidtime -> tidfromtime is identity withing rounding tolerance. # verify that tidtime -> tidfromtime is identity within rounding tolerance.
@func @func
def test_tidtime(): def test_tidtime():
t = tDB() t = tDB()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment