Commit 34b7a1f4 authored by Kirill Smelkov's avatar Kirill Smelkov

golang: Expose Sema and Mutex as public Python and Cython/nogil API

Until now libgolang had semaphore and mutex functionality, but
implemented only internally and not exposed to users. Since for its
pinner thread wendelin.core v2 needs not only nogil channels, but also
nogil mutexes, timers and so on, it is now time to start providing that.

We start with mutexes:

- Expose Mutex from insides of libgolang.cpp to public API in
  libgolang.h . We actually expose both Mutex and Sema because
  semaphores are sometimes also useful for low-level synchronization,
  and because it is easier to export Mutex and Sema both at the same time.

- Provide pyx/nogil API to those as sync.Mutex and sync.Sema.

- Provide corresponding python wrappers.

- Add Pyx-level test for semaphore wakeup when wakeup is done not by the
  same thread which did the original acquire. This is the test that was
  promised in 5142460d (libgolang/thread: Add links to upstream
  PyThread_release_lock bug), and it used to corrupt memory and deadlock
  on macOS due to CPython & PyPy runtime bugs:

    https://bugs.python.org/issue38106
      -> https://github.com/python/cpython/pull/16047
    https://bitbucket.org/pypy/pypy/issues/3072

Note about python wrappers: At present, due to
https://github.com/cython/cython/issues/3165, C-level panic is not
properly translated into Py-level exception in (Py)Sema/Mutex constructors.
This should not pose a real problem in practice though.
parent f0ac6e45
......@@ -290,7 +290,7 @@ handle concurrency in structured ways:
- `golang.sync` provides `sync.WorkGroup` to spawn group of goroutines working
on a common task. It also provides low-level primitives - for example
`sync.Once` and `sync.WaitGroup` - that are sometimes useful too.
`sync.Once`, `sync.WaitGroup` and `sync.Mutex` - that are sometimes useful too.
- `golang.time` provides timers integrated with channels.
......
/_golang.cpp
/_golang_test.cpp
/_sync.cpp
/_sync_test.cpp
/_time.cpp
# cython: language_level=2
# Copyright (C) 2019 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Package sync mirrors Go package sync.
See the following link about Go sync package:
https://golang.org/pkg/sync
"""
cdef extern from "golang/libgolang.h" namespace "golang::sync" nogil:
cppclass Sema:
Sema()
void acquire()
void release()
cppclass Mutex:
void lock()
void unlock()
# cython: language_level=2
# Copyright (C) 2019 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""_sync.pyx implements sync.pyx - see _sync.pxd for package overview."""
from __future__ import print_function, absolute_import
from cython cimport final
@final
cdef class PySema:
cdef Sema sema
# FIXME cannot catch/pyreraise panic of .sema ctor
# https://github.com/cython/cython/issues/3165
def acquire(pysema):
with nogil:
semaacquire_pyexc(&pysema.sema)
def release(pysema):
semarelease_pyexc(&pysema.sema)
# with support
__enter__ = acquire
def __exit__(pysema, exc_typ, exc_val, exc_tb):
pysema.release()
@final
cdef class PyMutex:
cdef Mutex mu
# FIXME cannot catch/pyreraise panic of .mu ctor
# https://github.com/cython/cython/issues/3165
def lock(pymu):
with nogil:
mutexlock_pyexc(&pymu.mu)
def unlock(pymu):
mutexunlock_pyexc(&pymu.mu)
# with support
__enter__ = lock
def __exit__(pymu, exc_typ, exc_val, exc_tb):
pymu.unlock()
# ---- misc ----
from golang cimport topyexc
cdef nogil:
void semaacquire_pyexc(Sema *sema) except +topyexc:
sema.acquire()
void semarelease_pyexc(Sema *sema) except +topyexc:
sema.release()
void mutexlock_pyexc(Mutex *mu) except +topyexc:
mu.lock()
void mutexunlock_pyexc(Mutex *mu) except +topyexc:
mu.unlock()
# -*- coding: utf-8 -*-
# cython: language_level=2
# distutils: language=c++
#
# Copyright (C) 2018-2019 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
# small tests that verifies pyx-level channel API.
# the work of channels themselves is exercised thoroughly mostly in golang_test.py
from __future__ import print_function, absolute_import
from golang cimport go, chan, makechan, structZ, nil, panic, topyexc
from golang cimport sync, time
# C-level _sema + _mutex
# (not exposed in golang.pxd as it exposes only high-level API)
cdef extern from "golang/libgolang.h" namespace "golang" nogil:
"""
namespace golang {
void _mutex_init(sync::Mutex *mu) {
new (mu) sync::Mutex();
}
void _mutex_destroy(sync::Mutex *mu) {
mu->~Mutex();
}
} // golang::
"""
struct _sema
_sema *_makesema()
void _semafree(_sema *)
void _semaacquire(_sema *)
void _semarelease(_sema *)
void _mutex_init(sync.Mutex *)
void _mutex_destroy(sync.Mutex *)
from libc.stdlib cimport calloc, free
from libc.stdio cimport printf
# test for semaphore wakeup, when wakeup is done not by the same thread which
# did the original acquire. This used to corrupt memory and deadlock on macOS
# due to CPython & PyPy runtime bugs:
# https://bugs.python.org/issue38106
# https://bitbucket.org/pypy/pypy/issues/3072
cdef nogil:
struct WorkState:
sync.Mutex mu # protects vvv
_sema *sema # T1 <- T2 wakeup; reallocated on every iteration
bint stop # T1 -> T2: request to stop
chan[structZ] done # T1 <- T2: stopped
void _test_sema_wakeup_T2(void *_state):
state = <WorkState*>_state
cdef int i = 0, j
cdef bint stop
cdef double Wstart, now
while 1:
i += 1
# wait till .sema != NULL and pop it
Wstart = time.now()
j = 0
while 1:
state.mu.lock()
sema = state.sema
if sema != NULL:
state.sema = NULL
stop = state.stop
state.mu.unlock()
if stop:
state.done.close()
return
if sema != NULL:
break
now = time.now()
if (now - Wstart) > 3*time.second:
printf("\nSTUCK on iteration #%d\n", i)
panic("STUCK")
# workaround for "gevent" runtime: yield CPU so that T1 can run
# XXX better yield always for "gevent", don't yield at all for "thread"
j += 1
if (j % 100) == 0:
time.sleep(0)
# we have popped .sema from state. This means that peer is _likely_ waiting on it
_semarelease(sema) # either release or release + wakeup in-progress acquire
void _test_sema_wakeup() except +topyexc:
cdef WorkState *state = <WorkState *>calloc(1, sizeof(WorkState))
if state == NULL:
panic("malloc -> NULL")
state.sema = NULL
_mutex_init(&state.mu)
state.stop = False
state.done = makechan[structZ]()
go(_test_sema_wakeup_T2, state)
N = 100000
cdef _sema *sema_prev = NULL
for i in range(N):
sema = _makesema()
_semaacquire(sema)
#printf("d(sema_prev, sema): %ld\n", <char*>sema - <char*>sema_prev)
state.mu.lock()
state.sema = sema
state.mu.unlock()
_semaacquire(sema)
# _semarelease(sema) # (to free sema in released state)
# # (gets stuck both with and without it)
_semafree(sema)
sema_prev = sema
state.mu.lock()
state.stop = True
state.mu.unlock()
state.done.recv()
state.done = nil
_mutex_destroy(&state.mu)
free(state)
def test_sema_wakeup():
with nogil:
_test_sema_wakeup()
......@@ -40,6 +40,7 @@
// lifetime management.
// - `panic` throws exception that represent C-level panic.
// - `time::sleep` pauses current task.
// - `sync::Sema` and `sync::Mutex` provide low-level synchronization.
//
// For example:
//
......@@ -72,6 +73,7 @@
// - `_chansend` and `_chanrecv` send/receive over raw channel.
// - `_chanselect`, `_selsend`, `_selrecv`, ... provide raw select functionality.
// - `_tasknanosleep` pauses current task.
// - `_makesema` and `_sema*` provide semaphore functionality.
//
//
// Runtimes
......@@ -197,6 +199,14 @@ _selcase _selrecv_(_chan *ch, void *prx, bool *pok) {
// _default represents default case for _chanselect.
extern LIBGOLANG_API const _selcase _default;
// _sema corresponds to sync.Sema
// no C-level analog is provided for sync.Mutex
typedef struct _sema _sema;
LIBGOLANG_API _sema *_makesema();
LIBGOLANG_API void _semafree(_sema *sema);
LIBGOLANG_API void _semaacquire(_sema *sema);
LIBGOLANG_API void _semarelease(_sema *sema);
// libgolang runtime - the runtime must be initialized before any other libgolang use.
typedef struct _libgolang_sema _libgolang_sema;
......@@ -401,6 +411,44 @@ LIBGOLANG_API double now();
} // golang::time::
// golang::sync::
namespace sync {
// Sema provides semaphore.
class Sema {
_sema *_gsema;
public:
LIBGOLANG_API Sema();
LIBGOLANG_API ~Sema();
LIBGOLANG_API void acquire();
LIBGOLANG_API void release();
private:
Sema(const Sema&); // don't copy
Sema(Sema&&); // don't move
};
// Mutex provides mutex.
class Mutex {
Sema _sema;
public:
LIBGOLANG_API Mutex();
LIBGOLANG_API ~Mutex();
LIBGOLANG_API void lock();
LIBGOLANG_API void unlock();
private:
Mutex(const Mutex&); // don't copy
Mutex(Mutex&&); // don't move
};
} // golang::sync::
} // golang::
#endif // __cplusplus
......
......@@ -133,58 +133,61 @@ uint64_t _nanotime() {
// ---- semaphores ----
// (_sema = _libgolang_sema)
// Sema provides semaphore.
struct Sema {
_libgolang_sema *_gsema;
// _makesema creates new semaphore.
//
// it always returns !NULL and panics on memory allocation failue.
_sema *_makesema() {
_sema *sema = (_sema *)_runtime->sema_alloc();
if (!sema)
panic("makesema: alloc failed");
return sema;
}
Sema();
~Sema();
void acquire();
void release();
void _semafree(_sema *sema) {
_runtime->sema_free((_libgolang_sema*)sema);
}
private:
Sema(const Sema&); // don't copy
Sema(Sema&&); // don't move
};
void _semaacquire(_sema *sema) {
_runtime->sema_acquire((_libgolang_sema *)sema);
}
void _semarelease(_sema *sema) {
_runtime->sema_release((_libgolang_sema *)sema);
}
// golang::sync::
namespace sync {
Sema::Sema() {
Sema *sema = this;
sema->_gsema = _runtime->sema_alloc();
if (!sema->_gsema)
panic("sema: alloc failed");
sema->_gsema = _makesema();
}
Sema::~Sema() {
Sema *sema = this;
_runtime->sema_free(sema->_gsema);
_semafree(sema->_gsema);
sema->_gsema = NULL;
}
void Sema::acquire() {
Sema *sema = this;
_runtime->sema_acquire(sema->_gsema);
_semaacquire(sema->_gsema);
}
void Sema::release() {
Sema *sema = this;
_runtime->sema_release(sema->_gsema);
_semarelease(sema->_gsema);
}
// Mutex provides mutex.
// currently implemented via Sema.
struct Mutex {
void lock() { _sema.acquire(); }
void unlock() { _sema.release(); }
Mutex() {}
// Mutex is currently implemented via Sema.
Mutex::Mutex() {}
Mutex::~Mutex() {}
void Mutex::lock() { _sema.acquire(); }
void Mutex::unlock() { _sema.release(); }
private:
Sema _sema;
Mutex(const Mutex&); // don't copy
Mutex(Mutex&&); // don't move
};
} // golang::sync::
// with_lock mimics `with mu` from python.
// usage example:
......@@ -197,9 +200,9 @@ private:
_with_lock.once(); \
)
struct _with_lock_guard {
std::lock_guard<Mutex> mug;
std::lock_guard<sync::Mutex> mug;
bool done;
_with_lock_guard(Mutex &mu) : mug(mu), done(false) {}
_with_lock_guard(sync::Mutex &mu) : mug(mu), done(false) {}
bool once() { bool _ = !done; done = true; return _; }
};
......@@ -240,7 +243,7 @@ struct _chan {
unsigned _cap; // channel capacity (in elements)
unsigned _elemsize; // size of element
Mutex _mu;
sync::Mutex _mu;
list_head _recvq; // blocked receivers (_ -> _RecvSendWaiting.in_rxtxq)
list_head _sendq; // blocked senders (_ -> _RecvSendWaiting.in_rxtxq)
bool _closed;
......@@ -303,9 +306,9 @@ private:
//
// Only 1 waiter from the group can succeed waiting.
struct _WaitGroup {
Sema _sema; // used for wakeup
sync::Sema _sema; // used for wakeup
Mutex _mu; // lock NOTE ∀ chan order is always: chan._mu > ._mu
sync::Mutex _mu; // lock NOTE ∀ chan order is always: chan._mu > ._mu
// on wakeup: sender|receiver -> group:
// .which _{Send|Recv}Waiting instance which succeeded waiting.
const _RecvSendWaiting *which;
......@@ -423,7 +426,7 @@ _chan *_makechan(unsigned elemsize, unsigned size) {
ch = (_chan *)zalloc(sizeof(_chan) + size*elemsize);
if (ch == NULL)
panic("makechan: alloc failed");
new (&ch->_mu) Mutex();
new (&ch->_mu) sync::Mutex();
ch->_refcnt = 1;
ch->_cap = size;
......@@ -1151,7 +1154,7 @@ void _blockforever() {
// take a lock twice. It will forever block on the second lock attempt.
// Under gevent, similarly to Go, this raises "LoopExit: This operation
// would block forever", if there are no other greenlets scheduled to be run.
Sema dead;
sync::Sema dead;
dead.acquire();
dead.acquire();
bug("_blockforever: woken up");
......
# cython: language_level=2
# Copyright (C) 2019 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Package sync mirrors Go package time.
See _sync.pxd for package documentation.
"""
# redirect cimport: golang.sync -> golang._sync (see __init__.pxd for rationale)
from golang._sync cimport *
......@@ -32,6 +32,11 @@ from golang import context
import six
from golang._sync import \
PySema as Sema, \
PyMutex as Mutex \
# Once allows to execute an action only once.
#
# For example:
......
......@@ -24,10 +24,66 @@ from golang import go, chan
from golang import sync, context, time
import threading
from pytest import raises
from golang.golang_test import panics
from golang.golang_test import import_pyx_tests, panics
from golang.time_test import dt
from six.moves import range as xrange
import six
import_pyx_tests("golang._sync_test")
def _test_mutex(mu, lock, unlock):
# verify that g2 mu.lock() blocks until g1 does mu.unlock()
getattr(mu, lock)()
l = []
done = chan()
def _():
getattr(mu, lock)()
l.append('b')
getattr(mu, unlock)()
done.close()
go(_)
time.sleep(1*dt)
l.append('a')
getattr(mu, unlock)()
done.recv()
assert l == ['a', 'b']
# the same via with
with mu:
l = []
done = chan()
def _():
with mu:
l.append('d')
done.close()
go(_)
time.sleep(1*dt)
l.append('c')
done.recv()
assert l == ['c', 'd']
def test_mutex(): _test_mutex(sync.Mutex(), 'lock', 'unlock')
def test_sema(): _test_mutex(sync.Sema(), 'acquire', 'release')
# verify that sema.acquire can be woken up by sema.release not from the same
# thread which did the original sema.acquire.
def test_sema_wakeup_different_thread():
sema = sync.Sema()
sema.acquire()
l = []
done = chan()
def _():
time.sleep(1*dt)
l.append('a')
sema.release()
done.close()
go(_)
sema.acquire()
l.append('b')
done.recv()
assert l == ['a', 'b']
def test_once():
once = sync.Once()
l = []
......
......@@ -214,6 +214,11 @@ setup(
'golang/runtime/libgolang_test_c.c',
'golang/runtime/libgolang_test.cpp']),
Ext('golang._sync',
['golang/_sync.pyx']),
Ext('golang._sync_test',
['golang/_sync_test.pyx']),
Ext('golang._time',
['golang/_time.pyx']),
],
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment