Commit 34b7a1f4 authored by Kirill Smelkov's avatar Kirill Smelkov

golang: Expose Sema and Mutex as public Python and Cython/nogil API

Until now libgolang had semaphore and mutex functionality, but
implemented only internally and not exposed to users. Since for its
pinner thread wendelin.core v2 needs not only nogil channels, but also
nogil mutexes, timers and so on, it is now time to start providing that.

We start with mutexes:

- Expose Mutex from insides of libgolang.cpp to public API in
  libgolang.h . We actually expose both Mutex and Sema because
  semaphores are sometimes also useful for low-level synchronization,
  and because it is easier to export Mutex and Sema both at the same time.

- Provide pyx/nogil API to those as sync.Mutex and sync.Sema.

- Provide corresponding python wrappers.

- Add Pyx-level test for semaphore wakeup when wakeup is done not by the
  same thread which did the original acquire. This is the test that was
  promised in 5142460d (libgolang/thread: Add links to upstream
  PyThread_release_lock bug), and it used to corrupt memory and deadlock
  on macOS due to CPython & PyPy runtime bugs:

    https://bugs.python.org/issue38106
      -> https://github.com/python/cpython/pull/16047
    https://bitbucket.org/pypy/pypy/issues/3072

Note about python wrappers: At present, due to
https://github.com/cython/cython/issues/3165, C-level panic is not
properly translated into Py-level exception in (Py)Sema/Mutex constructors.
This should not pose a real problem in practice though.
parent f0ac6e45
...@@ -290,7 +290,7 @@ handle concurrency in structured ways: ...@@ -290,7 +290,7 @@ handle concurrency in structured ways:
- `golang.sync` provides `sync.WorkGroup` to spawn group of goroutines working - `golang.sync` provides `sync.WorkGroup` to spawn group of goroutines working
on a common task. It also provides low-level primitives - for example on a common task. It also provides low-level primitives - for example
`sync.Once` and `sync.WaitGroup` - that are sometimes useful too. `sync.Once`, `sync.WaitGroup` and `sync.Mutex` - that are sometimes useful too.
- `golang.time` provides timers integrated with channels. - `golang.time` provides timers integrated with channels.
......
/_golang.cpp /_golang.cpp
/_golang_test.cpp /_golang_test.cpp
/_sync.cpp
/_sync_test.cpp
/_time.cpp /_time.cpp
# cython: language_level=2
# Copyright (C) 2019 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Package sync mirrors Go package sync.
See the following link about Go sync package:
https://golang.org/pkg/sync
"""
cdef extern from "golang/libgolang.h" namespace "golang::sync" nogil:
cppclass Sema:
Sema()
void acquire()
void release()
cppclass Mutex:
void lock()
void unlock()
# cython: language_level=2
# Copyright (C) 2019 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""_sync.pyx implements sync.pyx - see _sync.pxd for package overview."""
from __future__ import print_function, absolute_import
from cython cimport final
@final
cdef class PySema:
cdef Sema sema
# FIXME cannot catch/pyreraise panic of .sema ctor
# https://github.com/cython/cython/issues/3165
def acquire(pysema):
with nogil:
semaacquire_pyexc(&pysema.sema)
def release(pysema):
semarelease_pyexc(&pysema.sema)
# with support
__enter__ = acquire
def __exit__(pysema, exc_typ, exc_val, exc_tb):
pysema.release()
@final
cdef class PyMutex:
cdef Mutex mu
# FIXME cannot catch/pyreraise panic of .mu ctor
# https://github.com/cython/cython/issues/3165
def lock(pymu):
with nogil:
mutexlock_pyexc(&pymu.mu)
def unlock(pymu):
mutexunlock_pyexc(&pymu.mu)
# with support
__enter__ = lock
def __exit__(pymu, exc_typ, exc_val, exc_tb):
pymu.unlock()
# ---- misc ----
from golang cimport topyexc
cdef nogil:
void semaacquire_pyexc(Sema *sema) except +topyexc:
sema.acquire()
void semarelease_pyexc(Sema *sema) except +topyexc:
sema.release()
void mutexlock_pyexc(Mutex *mu) except +topyexc:
mu.lock()
void mutexunlock_pyexc(Mutex *mu) except +topyexc:
mu.unlock()
# -*- coding: utf-8 -*-
# cython: language_level=2
# distutils: language=c++
#
# Copyright (C) 2018-2019 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
# small tests that verifies pyx-level channel API.
# the work of channels themselves is exercised thoroughly mostly in golang_test.py
from __future__ import print_function, absolute_import
from golang cimport go, chan, makechan, structZ, nil, panic, topyexc
from golang cimport sync, time
# C-level _sema + _mutex
# (not exposed in golang.pxd as it exposes only high-level API)
cdef extern from "golang/libgolang.h" namespace "golang" nogil:
"""
namespace golang {
void _mutex_init(sync::Mutex *mu) {
new (mu) sync::Mutex();
}
void _mutex_destroy(sync::Mutex *mu) {
mu->~Mutex();
}
} // golang::
"""
struct _sema
_sema *_makesema()
void _semafree(_sema *)
void _semaacquire(_sema *)
void _semarelease(_sema *)
void _mutex_init(sync.Mutex *)
void _mutex_destroy(sync.Mutex *)
from libc.stdlib cimport calloc, free
from libc.stdio cimport printf
# test for semaphore wakeup, when wakeup is done not by the same thread which
# did the original acquire. This used to corrupt memory and deadlock on macOS
# due to CPython & PyPy runtime bugs:
# https://bugs.python.org/issue38106
# https://bitbucket.org/pypy/pypy/issues/3072
cdef nogil:
struct WorkState:
sync.Mutex mu # protects vvv
_sema *sema # T1 <- T2 wakeup; reallocated on every iteration
bint stop # T1 -> T2: request to stop
chan[structZ] done # T1 <- T2: stopped
void _test_sema_wakeup_T2(void *_state):
state = <WorkState*>_state
cdef int i = 0, j
cdef bint stop
cdef double Wstart, now
while 1:
i += 1
# wait till .sema != NULL and pop it
Wstart = time.now()
j = 0
while 1:
state.mu.lock()
sema = state.sema
if sema != NULL:
state.sema = NULL
stop = state.stop
state.mu.unlock()
if stop:
state.done.close()
return
if sema != NULL:
break
now = time.now()
if (now - Wstart) > 3*time.second:
printf("\nSTUCK on iteration #%d\n", i)
panic("STUCK")
# workaround for "gevent" runtime: yield CPU so that T1 can run
# XXX better yield always for "gevent", don't yield at all for "thread"
j += 1
if (j % 100) == 0:
time.sleep(0)
# we have popped .sema from state. This means that peer is _likely_ waiting on it
_semarelease(sema) # either release or release + wakeup in-progress acquire
void _test_sema_wakeup() except +topyexc:
cdef WorkState *state = <WorkState *>calloc(1, sizeof(WorkState))
if state == NULL:
panic("malloc -> NULL")
state.sema = NULL
_mutex_init(&state.mu)
state.stop = False
state.done = makechan[structZ]()
go(_test_sema_wakeup_T2, state)
N = 100000
cdef _sema *sema_prev = NULL
for i in range(N):
sema = _makesema()
_semaacquire(sema)
#printf("d(sema_prev, sema): %ld\n", <char*>sema - <char*>sema_prev)
state.mu.lock()
state.sema = sema
state.mu.unlock()
_semaacquire(sema)
# _semarelease(sema) # (to free sema in released state)
# # (gets stuck both with and without it)
_semafree(sema)
sema_prev = sema
state.mu.lock()
state.stop = True
state.mu.unlock()
state.done.recv()
state.done = nil
_mutex_destroy(&state.mu)
free(state)
def test_sema_wakeup():
with nogil:
_test_sema_wakeup()
...@@ -40,6 +40,7 @@ ...@@ -40,6 +40,7 @@
// lifetime management. // lifetime management.
// - `panic` throws exception that represent C-level panic. // - `panic` throws exception that represent C-level panic.
// - `time::sleep` pauses current task. // - `time::sleep` pauses current task.
// - `sync::Sema` and `sync::Mutex` provide low-level synchronization.
// //
// For example: // For example:
// //
...@@ -72,6 +73,7 @@ ...@@ -72,6 +73,7 @@
// - `_chansend` and `_chanrecv` send/receive over raw channel. // - `_chansend` and `_chanrecv` send/receive over raw channel.
// - `_chanselect`, `_selsend`, `_selrecv`, ... provide raw select functionality. // - `_chanselect`, `_selsend`, `_selrecv`, ... provide raw select functionality.
// - `_tasknanosleep` pauses current task. // - `_tasknanosleep` pauses current task.
// - `_makesema` and `_sema*` provide semaphore functionality.
// //
// //
// Runtimes // Runtimes
...@@ -197,6 +199,14 @@ _selcase _selrecv_(_chan *ch, void *prx, bool *pok) { ...@@ -197,6 +199,14 @@ _selcase _selrecv_(_chan *ch, void *prx, bool *pok) {
// _default represents default case for _chanselect. // _default represents default case for _chanselect.
extern LIBGOLANG_API const _selcase _default; extern LIBGOLANG_API const _selcase _default;
// _sema corresponds to sync.Sema
// no C-level analog is provided for sync.Mutex
typedef struct _sema _sema;
LIBGOLANG_API _sema *_makesema();
LIBGOLANG_API void _semafree(_sema *sema);
LIBGOLANG_API void _semaacquire(_sema *sema);
LIBGOLANG_API void _semarelease(_sema *sema);
// libgolang runtime - the runtime must be initialized before any other libgolang use. // libgolang runtime - the runtime must be initialized before any other libgolang use.
typedef struct _libgolang_sema _libgolang_sema; typedef struct _libgolang_sema _libgolang_sema;
...@@ -401,6 +411,44 @@ LIBGOLANG_API double now(); ...@@ -401,6 +411,44 @@ LIBGOLANG_API double now();
} // golang::time:: } // golang::time::
// golang::sync::
namespace sync {
// Sema provides semaphore.
class Sema {
_sema *_gsema;
public:
LIBGOLANG_API Sema();
LIBGOLANG_API ~Sema();
LIBGOLANG_API void acquire();
LIBGOLANG_API void release();
private:
Sema(const Sema&); // don't copy
Sema(Sema&&); // don't move
};
// Mutex provides mutex.
class Mutex {
Sema _sema;
public:
LIBGOLANG_API Mutex();
LIBGOLANG_API ~Mutex();
LIBGOLANG_API void lock();
LIBGOLANG_API void unlock();
private:
Mutex(const Mutex&); // don't copy
Mutex(Mutex&&); // don't move
};
} // golang::sync::
} // golang:: } // golang::
#endif // __cplusplus #endif // __cplusplus
......
...@@ -133,58 +133,61 @@ uint64_t _nanotime() { ...@@ -133,58 +133,61 @@ uint64_t _nanotime() {
// ---- semaphores ---- // ---- semaphores ----
// (_sema = _libgolang_sema)
// Sema provides semaphore. // _makesema creates new semaphore.
struct Sema { //
_libgolang_sema *_gsema; // it always returns !NULL and panics on memory allocation failue.
_sema *_makesema() {
_sema *sema = (_sema *)_runtime->sema_alloc();
if (!sema)
panic("makesema: alloc failed");
return sema;
}
Sema(); void _semafree(_sema *sema) {
~Sema(); _runtime->sema_free((_libgolang_sema*)sema);
void acquire(); }
void release();
private: void _semaacquire(_sema *sema) {
Sema(const Sema&); // don't copy _runtime->sema_acquire((_libgolang_sema *)sema);
Sema(Sema&&); // don't move }
};
void _semarelease(_sema *sema) {
_runtime->sema_release((_libgolang_sema *)sema);
}
// golang::sync::
namespace sync {
Sema::Sema() { Sema::Sema() {
Sema *sema = this; Sema *sema = this;
sema->_gsema = _makesema();
sema->_gsema = _runtime->sema_alloc();
if (!sema->_gsema)
panic("sema: alloc failed");
} }
Sema::~Sema() { Sema::~Sema() {
Sema *sema = this; Sema *sema = this;
_semafree(sema->_gsema);
_runtime->sema_free(sema->_gsema);
sema->_gsema = NULL; sema->_gsema = NULL;
} }
void Sema::acquire() { void Sema::acquire() {
Sema *sema = this; Sema *sema = this;
_runtime->sema_acquire(sema->_gsema); _semaacquire(sema->_gsema);
} }
void Sema::release() { void Sema::release() {
Sema *sema = this; Sema *sema = this;
_runtime->sema_release(sema->_gsema); _semarelease(sema->_gsema);
} }
// Mutex provides mutex. // Mutex is currently implemented via Sema.
// currently implemented via Sema. Mutex::Mutex() {}
struct Mutex { Mutex::~Mutex() {}
void lock() { _sema.acquire(); } void Mutex::lock() { _sema.acquire(); }
void unlock() { _sema.release(); } void Mutex::unlock() { _sema.release(); }
Mutex() {}
private: } // golang::sync::
Sema _sema;
Mutex(const Mutex&); // don't copy
Mutex(Mutex&&); // don't move
};
// with_lock mimics `with mu` from python. // with_lock mimics `with mu` from python.
// usage example: // usage example:
...@@ -197,9 +200,9 @@ private: ...@@ -197,9 +200,9 @@ private:
_with_lock.once(); \ _with_lock.once(); \
) )
struct _with_lock_guard { struct _with_lock_guard {
std::lock_guard<Mutex> mug; std::lock_guard<sync::Mutex> mug;
bool done; bool done;
_with_lock_guard(Mutex &mu) : mug(mu), done(false) {} _with_lock_guard(sync::Mutex &mu) : mug(mu), done(false) {}
bool once() { bool _ = !done; done = true; return _; } bool once() { bool _ = !done; done = true; return _; }
}; };
...@@ -240,7 +243,7 @@ struct _chan { ...@@ -240,7 +243,7 @@ struct _chan {
unsigned _cap; // channel capacity (in elements) unsigned _cap; // channel capacity (in elements)
unsigned _elemsize; // size of element unsigned _elemsize; // size of element
Mutex _mu; sync::Mutex _mu;
list_head _recvq; // blocked receivers (_ -> _RecvSendWaiting.in_rxtxq) list_head _recvq; // blocked receivers (_ -> _RecvSendWaiting.in_rxtxq)
list_head _sendq; // blocked senders (_ -> _RecvSendWaiting.in_rxtxq) list_head _sendq; // blocked senders (_ -> _RecvSendWaiting.in_rxtxq)
bool _closed; bool _closed;
...@@ -303,9 +306,9 @@ private: ...@@ -303,9 +306,9 @@ private:
// //
// Only 1 waiter from the group can succeed waiting. // Only 1 waiter from the group can succeed waiting.
struct _WaitGroup { struct _WaitGroup {
Sema _sema; // used for wakeup sync::Sema _sema; // used for wakeup
Mutex _mu; // lock NOTE ∀ chan order is always: chan._mu > ._mu sync::Mutex _mu; // lock NOTE ∀ chan order is always: chan._mu > ._mu
// on wakeup: sender|receiver -> group: // on wakeup: sender|receiver -> group:
// .which _{Send|Recv}Waiting instance which succeeded waiting. // .which _{Send|Recv}Waiting instance which succeeded waiting.
const _RecvSendWaiting *which; const _RecvSendWaiting *which;
...@@ -423,7 +426,7 @@ _chan *_makechan(unsigned elemsize, unsigned size) { ...@@ -423,7 +426,7 @@ _chan *_makechan(unsigned elemsize, unsigned size) {
ch = (_chan *)zalloc(sizeof(_chan) + size*elemsize); ch = (_chan *)zalloc(sizeof(_chan) + size*elemsize);
if (ch == NULL) if (ch == NULL)
panic("makechan: alloc failed"); panic("makechan: alloc failed");
new (&ch->_mu) Mutex(); new (&ch->_mu) sync::Mutex();
ch->_refcnt = 1; ch->_refcnt = 1;
ch->_cap = size; ch->_cap = size;
...@@ -1151,7 +1154,7 @@ void _blockforever() { ...@@ -1151,7 +1154,7 @@ void _blockforever() {
// take a lock twice. It will forever block on the second lock attempt. // take a lock twice. It will forever block on the second lock attempt.
// Under gevent, similarly to Go, this raises "LoopExit: This operation // Under gevent, similarly to Go, this raises "LoopExit: This operation
// would block forever", if there are no other greenlets scheduled to be run. // would block forever", if there are no other greenlets scheduled to be run.
Sema dead; sync::Sema dead;
dead.acquire(); dead.acquire();
dead.acquire(); dead.acquire();
bug("_blockforever: woken up"); bug("_blockforever: woken up");
......
# cython: language_level=2
# Copyright (C) 2019 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Package sync mirrors Go package time.
See _sync.pxd for package documentation.
"""
# redirect cimport: golang.sync -> golang._sync (see __init__.pxd for rationale)
from golang._sync cimport *
...@@ -32,6 +32,11 @@ from golang import context ...@@ -32,6 +32,11 @@ from golang import context
import six import six
from golang._sync import \
PySema as Sema, \
PyMutex as Mutex \
# Once allows to execute an action only once. # Once allows to execute an action only once.
# #
# For example: # For example:
......
...@@ -24,10 +24,66 @@ from golang import go, chan ...@@ -24,10 +24,66 @@ from golang import go, chan
from golang import sync, context, time from golang import sync, context, time
import threading import threading
from pytest import raises from pytest import raises
from golang.golang_test import panics from golang.golang_test import import_pyx_tests, panics
from golang.time_test import dt
from six.moves import range as xrange from six.moves import range as xrange
import six import six
import_pyx_tests("golang._sync_test")
def _test_mutex(mu, lock, unlock):
# verify that g2 mu.lock() blocks until g1 does mu.unlock()
getattr(mu, lock)()
l = []
done = chan()
def _():
getattr(mu, lock)()
l.append('b')
getattr(mu, unlock)()
done.close()
go(_)
time.sleep(1*dt)
l.append('a')
getattr(mu, unlock)()
done.recv()
assert l == ['a', 'b']
# the same via with
with mu:
l = []
done = chan()
def _():
with mu:
l.append('d')
done.close()
go(_)
time.sleep(1*dt)
l.append('c')
done.recv()
assert l == ['c', 'd']
def test_mutex(): _test_mutex(sync.Mutex(), 'lock', 'unlock')
def test_sema(): _test_mutex(sync.Sema(), 'acquire', 'release')
# verify that sema.acquire can be woken up by sema.release not from the same
# thread which did the original sema.acquire.
def test_sema_wakeup_different_thread():
sema = sync.Sema()
sema.acquire()
l = []
done = chan()
def _():
time.sleep(1*dt)
l.append('a')
sema.release()
done.close()
go(_)
sema.acquire()
l.append('b')
done.recv()
assert l == ['a', 'b']
def test_once(): def test_once():
once = sync.Once() once = sync.Once()
l = [] l = []
......
...@@ -214,6 +214,11 @@ setup( ...@@ -214,6 +214,11 @@ setup(
'golang/runtime/libgolang_test_c.c', 'golang/runtime/libgolang_test_c.c',
'golang/runtime/libgolang_test.cpp']), 'golang/runtime/libgolang_test.cpp']),
Ext('golang._sync',
['golang/_sync.pyx']),
Ext('golang._sync_test',
['golang/_sync_test.pyx']),
Ext('golang._time', Ext('golang._time',
['golang/_time.pyx']), ['golang/_time.pyx']),
], ],
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment