Commit 69db91bf authored by Kirill Smelkov's avatar Kirill Smelkov

libgolang: Add internal semaphores

- Add semaphore alloc/free/acquire/release functionality to libgolang runtime;
- Implement semaphores for thread and gevent runtimes.

  * Thread runtime uses PyThread_acquire_lock/PyThread_release_lock +
    PyThread_acquire_lock/PyThread_release_lock, which, if used
    carefully, do not depend on GIL and on e.g. POSIX are tiny wrappers
    around sem_init(process-private) + sem_post/sem_wait(*).

  * Gevent runtime uses geven't Semaphore in Pyx mode.

- Add Sema and Mutex classes that use semaphores provided by a runtime
  in a RAII style.
- Add with_lock(mu) that mimics `with mu` in Python.

Sema and Mutex will be used in channels implementation in the followup
patch.

(*) during late testing a bug was found in CPython2 and PyPy semaphore
implementations on Darwin (technically speaking on POSIX with
_POSIX_SEMAPHORES undefined). Quoting the patch:

    FIXME On Darwin, even though this is considered as POSIX, Python uses
    mutex+condition variable to implement its lock, and, as of 20190828, Py2.7
    implementation, even though similar issue was fixed for Py3 in 2012, contains
    synchronization bug: the condition is signalled after mutex unlock while the
    correct protocol is to signal condition from under mutex:

      https://github.com/python/cpython/blob/v2.7.16-127-g0229b56d8c0/Python/thread_pthread.h#L486-L506
      https://github.com/python/cpython/commit/187aa545165d (py3 fix)

    PyPy has the same bug for both pypy2 and pypy3:

      https://bitbucket.org/pypy/pypy/src/578667b3fef9/rpython/translator/c/src/thread_pthread.c#lines-443:465
      https://bitbucket.org/pypy/pypy/src/5b42890d48c3/rpython/translator/c/src/thread_pthread.c#lines-443:465

    This way when Pygolang is used with buggy Python/darwin, the bug leads to
    frequently appearing deadlocks, while e.g. CPython3/darwin works ok.

    -> TODO maintain our own semaphore code.

So eventually we'll have push down and maintain our own semaphores,
at least for platforms we care, not to be beaten by CPython runtime bugs.
parent e4dddf15
......@@ -106,10 +106,24 @@ LIBGOLANG_API uint64_t _nanotime(void);
// libgolang runtime - the runtime must be initialized before any other libgolang use.
typedef struct _libgolang_sema _libgolang_sema;
typedef struct _libgolang_runtime_ops {
// go should spawn a task (coroutine/thread/...).
void (*go)(void (*f)(void *), void *arg);
// sema_alloc should allocate a semaphore.
// if allocation fails it must return NULL.
_libgolang_sema* (*sema_alloc)(void);
// sema_free should release previously allocated semaphore.
// libgolang guarantees to call it only once and only for a semaphore
// previously successfully allocated via sema_alloc.
void (*sema_free) (_libgolang_sema*);
// sema_acquire/sema_release should acquire/release live semaphore allocated via sema_alloc.
void (*sema_acquire)(_libgolang_sema*);
void (*sema_release)(_libgolang_sema*);
// nanosleep should pause current goroutine for at least dt nanoseconds.
// nanosleep(0) is not noop - such call must be at least yielding to other goroutines.
void (*nanosleep)(uint64_t dt);
......
......@@ -22,9 +22,16 @@
from libc.stdint cimport uint64_t
cdef extern from "golang/libgolang.h" nogil:
struct _libgolang_sema
struct _libgolang_runtime_ops:
void (*go)(void (*f)(void *) nogil, void *arg);
_libgolang_sema* (*sema_alloc) ()
void (*sema_free) (_libgolang_sema*)
void (*sema_acquire)(_libgolang_sema*)
void (*sema_release)(_libgolang_sema*)
void (*nanosleep)(uint64_t)
uint64_t (*nanotime)()
......
......@@ -21,14 +21,19 @@
from __future__ import print_function, absolute_import
# Gevent runtime uses gevent's greenlets.
# Gevent runtime uses gevent's greenlets and semaphores.
# When sema.acquire() blocks, gevent switches us from current to another greenlet.
IF not PYPY:
from gevent._greenlet cimport Greenlet
from gevent.__semaphore cimport Semaphore
ctypedef Semaphore PYGSema
ELSE:
# on pypy gevent does not compile greenlet.py citing that
# on pypy gevent does not compile greenlet.py and semaphore.py citing that
# "there is no greenlet.h on pypy"
from gevent.greenlet import Greenlet
from gevent._semaphore import Semaphore
ctypedef object PYGSema
from gevent import sleep as pygsleep
......@@ -36,7 +41,8 @@ from libc.stdint cimport uint64_t
from cpython cimport Py_INCREF, Py_DECREF
from cython cimport final
from golang.runtime._libgolang cimport _libgolang_runtime_ops, panic
from golang.runtime._libgolang cimport _libgolang_runtime_ops, _libgolang_sema, \
panic
from golang.runtime cimport _runtime_thread
......@@ -68,6 +74,44 @@ cdef nogil:
panic("pyxgo: gevent: go: failed")
_libgolang_sema* sema_alloc():
with gil:
pygsema = Semaphore()
Py_INCREF(pygsema)
return <_libgolang_sema*>pygsema
# libgolang checks for NULL return
bint _sema_free(_libgolang_sema *gsema):
with gil:
pygsema = <PYGSema>gsema
Py_DECREF(pygsema)
return True
void sema_free(_libgolang_sema *gsema):
ok = _sema_free(gsema)
if not ok:
panic("pyxgo: gevent: sema: free: failed")
bint _sema_acquire(_libgolang_sema *gsema):
with gil:
pygsema = <PYGSema>gsema
pygsema.acquire()
return True
void sema_acquire(_libgolang_sema *gsema):
ok = _sema_acquire(gsema)
if not ok:
panic("pyxgo: gevent: sema: acquire: failed")
bint _sema_release(_libgolang_sema *gsema):
with gil:
pygsema = <PYGSema>gsema
pygsema.release()
return True
void sema_release(_libgolang_sema *gsema):
ok = _sema_release(gsema)
if not ok:
panic("pyxgo: gevent: sema: release: failed")
bint _nanosleep(uint64_t dt):
cdef double dt_s = dt * 1E-9
with gil:
......@@ -82,6 +126,10 @@ cdef nogil:
# XXX const
_libgolang_runtime_ops gevent_ops = _libgolang_runtime_ops(
go = go,
sema_alloc = sema_alloc,
sema_free = sema_free,
sema_acquire = sema_acquire,
sema_release = sema_release,
nanosleep = nanosleep,
nanotime = _runtime_thread.nanotime, # reuse from _runtime_thread
)
......
......@@ -21,25 +21,66 @@
from __future__ import print_function, absolute_import
# Thread runtime reuses C-level Python threadcreate implementation
# for portability.
# Thread runtime reuses C-level Python threadcreate + semaphore implementation
# for portability. In Python semaphores do not depend on GIL and by reusing
# the implementation we can offload us from covering different systems.
#
# PyThread_start_new_thread - Python's C function function to create
# On POSIX, for example, Python uses sem_init(process-private) + sem_post/sem_wait.
#
# Similarly PyThread_start_new_thread - Python's C function function to create
# new thread - does not depend on GIL. On POSIX, for example, it is small
# wrapper around pthread_create.
#
# NOTE Cython declares PyThread_acquire_lock/PyThread_release_lock as nogil
from cpython.pythread cimport PyThread_acquire_lock, PyThread_release_lock, \
PyThread_type_lock, WAIT_LOCK
# FIXME On Darwin, even though this is considered as POSIX, Python uses
# mutex+condition variable to implement its lock, and, as of 20190828, Py2.7
# implementation, even though similar issue was fixed for Py3 in 2012, contains
# synchronization bug: the condition is signalled after mutex unlock while the
# correct protocol is to signal condition from under mutex:
#
# https://github.com/python/cpython/blob/v2.7.16-127-g0229b56d8c0/Python/thread_pthread.h#L486-L506
# https://github.com/python/cpython/commit/187aa545165d (py3 fix)
#
# PyPy has the same bug for both pypy2 and pypy3:
#
# https://bitbucket.org/pypy/pypy/src/578667b3fef9/rpython/translator/c/src/thread_pthread.c#lines-443:465
# https://bitbucket.org/pypy/pypy/src/5b42890d48c3/rpython/translator/c/src/thread_pthread.c#lines-443:465
#
# This way when Pygolang is used with buggy Python/darwin, the bug leads to
# frequently appearing deadlocks, while e.g. CPython3/darwin works ok.
#
# -> TODO maintain our own semaphore code.
import sys, platform
if 'darwin' in sys.platform:
pyimpl = platform.python_implementation()
pyver = sys.version_info
buggy = None
if 'CPython' in pyimpl and pyver < (3, 0):
buggy = "cpython2/darwin"
if 'PyPy' in pyimpl:
buggy = "pypy/darwin"
if buggy:
print("WARNING: pyxgo: thread: %s has race condition bug in runtime"
" that leads to deadlocks" % buggy, file=sys.stderr)
# make sure python threading is initialized, so that there is no concurrent
# calls to PyThread_init_thread later.
# calls to PyThread_init_thread from e.g. PyThread_allocate_lock later.
#
# This allows us to treat PyThread_start_new_thread as nogil.
# This allows us to treat PyThread_allocate_lock & PyThread_start_new_thread as nogil.
from cpython.ceval cimport PyEval_InitThreads
#PyThread_init_thread() # initializes only threading, but _not_ GIL
PyEval_InitThreads() # initializes threading and GIL
cdef extern from "pythread.h" nogil:
# NOTE py3.7 changed to `unsigned long PyThread_start_new_thread ...`
long PyThread_start_new_thread(void (*)(void *), void *)
PyThread_type_lock PyThread_allocate_lock()
void PyThread_free_lock(PyThread_type_lock)
from golang.runtime._libgolang cimport _libgolang_runtime_ops, panic
from golang.runtime._libgolang cimport _libgolang_runtime_ops, _libgolang_sema, \
panic
from libc.stdint cimport uint64_t, UINT64_MAX
IF POSIX:
......@@ -59,6 +100,24 @@ cdef nogil:
if pytid == -1:
panic("pygo: failed")
_libgolang_sema* sema_alloc():
# python calls it "lock", but it is actually a semaphore.
# and in particular can be released by thread different from thread that acquired it.
pysema = PyThread_allocate_lock()
return <_libgolang_sema *>pysema # NULL is ok - libgolang expects it
void sema_free(_libgolang_sema *gsema):
pysema = <PyThread_type_lock>gsema
PyThread_free_lock(pysema)
void sema_acquire(_libgolang_sema *gsema):
pysema = <PyThread_type_lock>gsema
PyThread_acquire_lock(pysema, WAIT_LOCK)
void sema_release(_libgolang_sema *gsema):
pysema = <PyThread_type_lock>gsema
PyThread_release_lock(pysema)
IF POSIX:
void nanosleep(uint64_t dt):
cdef timespec ts
......@@ -110,6 +169,10 @@ cdef nogil:
# XXX const
_libgolang_runtime_ops thread_ops = _libgolang_runtime_ops(
go = go,
sema_alloc = sema_alloc,
sema_free = sema_free,
sema_acquire = sema_acquire,
sema_release = sema_release,
nanosleep = nanosleep,
nanotime = nanotime,
)
......
......@@ -29,6 +29,7 @@
#include <exception>
#include <limits>
#include <mutex> // lock_guard
#include <string>
using std::exception;
......@@ -103,6 +104,62 @@ uint64_t _nanotime() {
return _runtime->nanotime();
}
// ---- semaphores ----
// Sema provides semaphore.
struct Sema {
_libgolang_sema *_gsema;
Sema();
~Sema();
void acquire();
void release();
private:
Sema(const Sema&); // don't copy
};
Sema::Sema() {
Sema *sema = this;
sema->_gsema = _runtime->sema_alloc();
if (!sema->_gsema)
panic("sema: alloc failed");
}
Sema::~Sema() {
Sema *sema = this;
_runtime->sema_free(sema->_gsema);
sema->_gsema = NULL;
}
void Sema::acquire() {
Sema *sema = this;
_runtime->sema_acquire(sema->_gsema);
}
void Sema::release() {
Sema *sema = this;
_runtime->sema_release(sema->_gsema);
}
// Mutex provides mutex.
// currently implemented via Sema.
struct Mutex {
void lock() { _sema.acquire(); }
void unlock() { _sema.release(); }
Mutex() {}
private:
Sema _sema;
Mutex(const Mutex&); // don't copy
};
// with_lock mimics `with mu` from python.
#define with_lock(mu) std::lock_guard<Mutex> _with_lock_ ## __COUNTER__ (mu)
} // golang::
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment