Commit b316e504 authored by Kirill Smelkov's avatar Kirill Smelkov

sync: Move/Port WaitGroup to C++/Pyx nogil

Provide sync.WaitGroup that can be used directly from C++ and Pyx/nogil codes.
Python-level sync.WaitGroup becomes small wrapper around pyx/nogil one.

Python-level tests should be enough to cover C++/Pyx functionality at
zero-level approximation.
parent 0fb53e33
...@@ -36,3 +36,8 @@ cdef extern from "golang/libgolang.h" namespace "golang::sync" nogil: ...@@ -36,3 +36,8 @@ cdef extern from "golang/libgolang.h" namespace "golang::sync" nogil:
cppclass Once: cppclass Once:
void do "do_" (...) # ... = std::function void do "do_" (...) # ... = std::function
cppclass WaitGroup:
void done()
void add(int delta)
void wait()
...@@ -103,6 +103,27 @@ cdef void _pycall_fromnogil(PyObject *f) nogil except *: ...@@ -103,6 +103,27 @@ cdef void _pycall_fromnogil(PyObject *f) nogil except *:
(<object>f)() (<object>f)()
@final
cdef class PyWaitGroup:
"""WaitGroup allows to wait for collection of tasks to finish."""
cdef WaitGroup wg
# FIXME cannot catch/pyreraise panic of .wg ctor
# https://github.com/cython/cython/issues/3165
def done(PyWaitGroup pywg):
with nogil:
wg_done_pyexc(&pywg.wg)
def add(PyWaitGroup pywg, int delta):
with nogil:
wg_add_pyexc(&pywg.wg, delta)
def wait(PyWaitGroup pywg):
with nogil:
wg_wait_pyexc(&pywg.wg)
# ---- misc ---- # ---- misc ----
cdef nogil: cdef nogil:
...@@ -116,3 +137,10 @@ cdef nogil: ...@@ -116,3 +137,10 @@ cdef nogil:
mu.lock() mu.lock()
void mutexunlock_pyexc(Mutex *mu) except +topyexc: void mutexunlock_pyexc(Mutex *mu) except +topyexc:
mu.unlock() mu.unlock()
void wg_done_pyexc(WaitGroup *wg) except +topyexc:
wg.done()
void wg_add_pyexc(WaitGroup *wg, int delta) except +topyexc:
wg.add(delta)
void wg_wait_pyexc(WaitGroup *wg) except +topyexc:
wg.wait()
...@@ -530,6 +530,24 @@ private: ...@@ -530,6 +530,24 @@ private:
Once(Once&&); // don't move Once(Once&&); // don't move
}; };
// WaitGroup allows to wait for collection of tasks to finish.
class WaitGroup {
Mutex _mu;
int _count;
chan<structZ> _done; // closed & recreated every time ._count drops to 0
public:
LIBGOLANG_API WaitGroup();
LIBGOLANG_API ~WaitGroup();
LIBGOLANG_API void done();
LIBGOLANG_API void add(int delta);
LIBGOLANG_API void wait();
private:
WaitGroup(const WaitGroup&); // don't copy
WaitGroup(WaitGroup&&); // don't move
};
} // golang::sync:: } // golang::sync::
......
...@@ -1239,6 +1239,7 @@ double now() { ...@@ -1239,6 +1239,7 @@ double now() {
namespace golang { namespace golang {
namespace sync { namespace sync {
// Once
Once::Once() { Once::Once() {
Once *once = this; Once *once = this;
once->_done = false; once->_done = false;
...@@ -1259,6 +1260,55 @@ void Once::do_(const std::function<void(void)> &f) { ...@@ -1259,6 +1260,55 @@ void Once::do_(const std::function<void(void)> &f) {
} }
} }
// WaitGroup
WaitGroup::WaitGroup() {
WaitGroup& wg = *this;
wg._count = 0;
wg._done = makechan<structZ>();
}
WaitGroup::~WaitGroup() {}
void WaitGroup::done() {
WaitGroup& wg = *this;
wg.add(-1);
}
void WaitGroup::add(int delta) {
WaitGroup& wg = *this;
if (delta == 0)
return;
wg._mu.lock();
defer([&]() {
wg._mu.unlock();
});
wg._count += delta;
if (wg._count < 0)
panic("sync: negative WaitGroup counter");
if (wg._count == 0) {
wg._done.close();
wg._done = makechan<structZ>();
}
}
void WaitGroup::wait() {
WaitGroup& wg = *this;
chan<structZ> done = NULL;
wg._mu.lock();
if (wg._count != 0)
done = wg._done;
wg._mu.unlock();
if (done == NULL) // wg._count was =0
return;
done.recv();
}
}} // golang::sync:: }} // golang::sync::
// ---- misc ---- // ---- misc ----
......
...@@ -27,7 +27,7 @@ See the following link about Go sync package: ...@@ -27,7 +27,7 @@ See the following link about Go sync package:
from __future__ import print_function, absolute_import from __future__ import print_function, absolute_import
import sys import sys
from golang import go, chan, defer, func, panic from golang import go, defer, func
from golang import context from golang import context
import six import six
...@@ -35,36 +35,8 @@ import six ...@@ -35,36 +35,8 @@ import six
from golang._sync import \ from golang._sync import \
PySema as Sema, \ PySema as Sema, \
PyMutex as Mutex, \ PyMutex as Mutex, \
PyOnce as Once \ PyOnce as Once, \
PyWaitGroup as WaitGroup \
# WaitGroup allows to wait for collection of tasks to finish.
class WaitGroup(object):
def __init__(wg):
wg._mu = Mutex()
wg._count = 0
wg._done = chan() # closed & recreated every time ._count drops to 0
def done(wg):
wg.add(-1)
def add(wg, delta):
if delta == 0:
return
with wg._mu:
wg._count += delta
if wg._count < 0:
panic("sync: negative WaitGroup counter")
if wg._count == 0:
wg._done.close()
wg._done = chan()
def wait(wg):
with wg._mu:
if wg._count == 0:
return
done = wg._done
done.recv()
# WorkGroup is a group of goroutines working on a common task. # WorkGroup is a group of goroutines working on a common task.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment