Commit 3b241983 by Kirill Smelkov

Port/move channels to C/C++/Pyx

- Move channels implementation to be done in C++ inside libgolang. The
  code and logic is based on previous Python-level channels
  implementation, but the new code is just C++ and does not depend on
  Python nor GIL at all, and so works without GIL if libgolang
  runtime works without GIL(*).

  (*) for example "thread" runtime works without GIL, while "gevent" runtime
      acquires GIL on every semaphore acquire.

  New channels implementation is located in δ(libgolang.cpp).

- Provide low-level C channels API to the implementation. The low-level
  C API was inspired by Libtask[1] and Plan9/Libthread[2].

  [1] Libtask: a Coroutine Library for C and Unix. https://swtch.com/libtask.
  [2] http://9p.io/magic/man2html/2/thread.

- Provide high-level C++ channels API that provides type-safety and
  automatic channel lifetime management.

  Overview of C and C++ APIs are in δ(libgolang.h).

- Expose C++ channels API at Pyx level as Cython/nogil API so that Cython
  programs could use channels with ease and without need to care about
  lifetime management and low-level details.

  Overview of Cython/nogil channels API is in δ(README.rst) and
  δ(_golang.pxd).

- Turn Python channels to be tiny wrapper around chan<PyObject>.

Implementation note:

- gevent case needs special care because greenlet, which gevent uses,
  swaps coroutine stack from C stack to heap on coroutine park, and
  replaces that space on C stack with stack of activated coroutine
  copied back from heap. This way if an object on g's stack is accessed
  while g is parked it would be memory of another g's stack.

  The channels implementation explicitly cares about this issue so that
  stack -> * channel send, or * -> stack channel receive work correctly.

  It should be noted that greenlet approach, which it inherits from
  stackless, is not only a bit tricky, but also comes with overhead
  (stack <-> heap copy), and prevents a coroutine to migrate from 1 OS
  thread to another OS thread as that would change addresses of on-stack
  things for that coroutine.

  As the latter property prevents to use multiple CPUs even if the
  program / runtime are prepared to work without GIL, it would be more
  logical to change gevent/greenlet to use separate stack for each
  coroutine. That would remove stack <-> heap copy and the need for
  special care in channels implementation for stack - stack sends.
  Such approach should be possible to implement with e.g. swapcontext or
  similar mechanism, and a proof of concept of such work wrapped into
  greenlet-compatible API exists[3]. It would be good if at some point
  there would be a chance to explore such approach in Pygolang context.

  [3] https://github.com/python-greenlet/greenlet/issues/113#issuecomment-264529838 and below

Just this patch brings in the following speedup at Python level:

 (on i7@2.6GHz)

thread runtime:

    name             old time/op  new time/op  delta
    go               20.0µs ± 1%  15.6µs ± 1%  -21.84%  (p=0.000 n=10+10)
    chan             9.37µs ± 4%  2.89µs ± 6%  -69.12%  (p=0.000 n=10+10)
    select           20.2µs ± 4%   3.4µs ± 5%  -83.20%  (p=0.000 n=8+10)
    def              58.0ns ± 0%  60.0ns ± 0%   +3.45%  (p=0.000 n=8+10)
    func_def         43.8µs ± 1%  43.9µs ± 1%     ~     (p=0.796 n=10+10)
    call             62.4ns ± 1%  63.5ns ± 1%   +1.76%  (p=0.001 n=10+10)
    func_call        1.06µs ± 1%  1.05µs ± 1%   -0.63%  (p=0.002 n=10+10)
    try_finally       136ns ± 0%   137ns ± 0%   +0.74%  (p=0.000 n=9+10)
    defer            2.28µs ± 1%  2.33µs ± 1%   +2.34%  (p=0.000 n=10+10)
    workgroup_empty  48.2µs ± 1%  34.1µs ± 2%  -29.18%  (p=0.000 n=9+10)
    workgroup_raise  58.9µs ± 1%  45.5µs ± 1%  -22.74%  (p=0.000 n=10+10)

gevent runtime:

    name             old time/op  new time/op  delta
    go               24.7µs ± 1%  15.9µs ± 1%  -35.72%  (p=0.000 n=9+9)
    chan             11.6µs ± 1%   7.3µs ± 1%  -36.74%  (p=0.000 n=10+10)
    select           22.5µs ± 1%  10.4µs ± 1%  -53.73%  (p=0.000 n=10+10)
    def              55.0ns ± 0%  55.0ns ± 0%     ~     (all equal)
    func_def         43.6µs ± 1%  43.6µs ± 1%     ~     (p=0.684 n=10+10)
    call             63.0ns ± 0%  64.0ns ± 0%   +1.59%  (p=0.000 n=10+10)
    func_call        1.06µs ± 1%  1.07µs ± 1%   +0.45%  (p=0.045 n=10+9)
    try_finally       135ns ± 0%   137ns ± 0%   +1.48%  (p=0.000 n=10+10)
    defer            2.31µs ± 1%  2.33µs ± 1%   +0.89%  (p=0.000 n=10+10)
    workgroup_empty  70.2µs ± 0%  55.8µs ± 0%  -20.63%  (p=0.000 n=10+10)
    workgroup_raise  90.3µs ± 0%  70.9µs ± 1%  -21.51%  (p=0.000 n=9+10)

The whole Cython/nogil work - starting from 8fa3c15b (Start using Cython
and providing Cython/nogil API) to this patch - brings in the following
speedup at Python level:

 (on i7@2.6GHz)

thread runtime:

    name             old time/op  new time/op  delta
    go               92.9µs ± 1%  15.6µs ± 1%  -83.16%  (p=0.000 n=10+10)
    chan             13.9µs ± 1%   2.9µs ± 6%  -79.14%  (p=0.000 n=10+10)
    select           29.7µs ± 6%   3.4µs ± 5%  -88.55%  (p=0.000 n=10+10)
    def              57.0ns ± 0%  60.0ns ± 0%   +5.26%  (p=0.000 n=10+10)
    func_def         44.0µs ± 1%  43.9µs ± 1%     ~     (p=0.055 n=10+10)
    call             63.5ns ± 1%  63.5ns ± 1%     ~     (p=1.000 n=10+10)
    func_call        1.06µs ± 0%  1.05µs ± 1%   -1.31%  (p=0.000 n=10+10)
    try_finally       139ns ± 0%   137ns ± 0%   -1.44%  (p=0.000 n=10+10)
    defer            2.36µs ± 1%  2.33µs ± 1%   -1.26%  (p=0.000 n=10+10)
    workgroup_empty  98.4µs ± 1%  34.1µs ± 2%  -65.32%  (p=0.000 n=10+10)
    workgroup_raise   135µs ± 1%    46µs ± 1%  -66.35%  (p=0.000 n=10+10)

gevent runtime:

    name             old time/op  new time/op  delta
    go               68.8µs ± 1%  15.9µs ± 1%  -76.91%  (p=0.000 n=10+9)
    chan             14.8µs ± 1%   7.3µs ± 1%  -50.67%  (p=0.000 n=10+10)
    select           32.0µs ± 0%  10.4µs ± 1%  -67.57%  (p=0.000 n=10+10)
    def              58.0ns ± 0%  55.0ns ± 0%   -5.17%  (p=0.000 n=10+10)
    func_def         43.9µs ± 1%  43.6µs ± 1%   -0.53%  (p=0.035 n=10+10)
    call             63.5ns ± 1%  64.0ns ± 0%   +0.79%  (p=0.033 n=10+10)
    func_call        1.08µs ± 1%  1.07µs ± 1%   -1.74%  (p=0.000 n=10+9)
    try_finally       142ns ± 0%   137ns ± 0%   -3.52%  (p=0.000 n=10+10)
    defer            2.32µs ± 1%  2.33µs ± 1%   +0.71%  (p=0.005 n=10+10)
    workgroup_empty  90.3µs ± 0%  55.8µs ± 0%  -38.26%  (p=0.000 n=10+10)
    workgroup_raise   108µs ± 1%    71µs ± 1%  -34.64%  (p=0.000 n=10+10)

This patch is the final patch in series to reach the goal of providing
channels that could be used in Cython/nogil code.

Cython/nogil channels work is dedicated to the memory of Вера Павловна Супрун[4].

[4] https://navytux.spb.ru/memory/%D0%A2%D1%91%D1%82%D1%8F%20%D0%92%D0%B5%D1%80%D0%B0.pdf#page=3
1 parent 9efb6575
......@@ -170,7 +170,7 @@ located in `src/` under `$GOPATH`.
Cython/nogil API
----------------
Cython package `golang` provides *nogil* API with goroutines and
Cython package `golang` provides *nogil* API with goroutines, channels and
other features that mirror corresponding Python package. Cython API is not only
faster compared to Python version, but also, due to *nogil* property, allows to
build concurrent systems without limitations imposed by Python's GIL. All that
......@@ -178,14 +178,59 @@ while still programming in Python-like language. Brief description of
Cython/nogil API follows:
`go` spawns new task - a coroutine, or thread, depending on activated runtime.
For example::
`chan[T]` represents a channel with Go semantic and elements of type `T`.
Use `makechan[T]` to create new channel, and `chan[T].recv`, `chan[T].send`,
`chan[T].close` for communication. `nil` stands for the nil channel. `select`
can be used to multiplex on several channels. For example::
cdef nogil:
void worker():
pass
struct Point:
int x
int y
void worker(chan[int] chi, chan[Point] chp):
chi.send(1)
cdef Point p
p.x = 3
p.y = 4
chp.send(p)
void myfunc():
go(worker)
cdef chan[int] chi = makechan[int]() # synchronous channel of integers
cdef chan[Point] chp = makechan[Point](3) # channel with buffer of size 3 and Point elements
go(worker, chi, chp)
i = chi.recv() # will give 1
p = chp.recv() # will give Point(3,4)
chp = nil # rebind chp to nil channel
cdef cbool ok
cdef int j = 33
_ = select([
chi.recvs(&i) # 0
chi.recvs(&i, &ok), # 1
chi.sends(&j), # 2
chp.recvs(&p), # 3
default, # 4
])
if _ == 0:
# i is what was received from chi
...
if _ == 1:
# (i, ok) is what was received from chi
...
if _ == 2:
# we know j was sent to chi
...
if _ == 3:
# this case will be never selected because
# send/recv on nil channel block forever.
...
if _ == 4:
# default case
...
`panic` stops normal execution of current goroutine by throwing a C-level
exception. On Python/C boundaries C-level exceptions have to be converted to
......
......@@ -23,6 +23,7 @@ Cython/nogil API
----------------
- `go` spawns lightweight thread.
- `chan[T]`, `makechan[T]` and `select` provide C-level channels with Go semantic.
- `panic` stops normal execution of current goroutine by throwing a C-level exception.
Everything in Cython/nogil API do not depend on Python runtime and in
......@@ -38,10 +39,16 @@ Golang.py runtime
In addition to Cython/nogil API, golang.pyx provides runtime for golang.py:
- Python-level channels are represented by pychan + pyselect.
- Python-level panic is represented by pypanic.
"""
from libcpp cimport nullptr_t, nullptr as nil
from libcpp.utility cimport pair
cdef extern from *:
ctypedef bint cbool "bool"
# nogil pyx-level golang API.
#
# NOTE even though many functions may panic (= throw C++ exception) nothing is
......@@ -58,14 +65,63 @@ cdef extern from "golang/libgolang.h" namespace "golang" nogil:
void go(...) # typechecking is done by C
struct _chan
cppclass chan[T]:
chan();
# send/recv/close
void send(const T&)
T recv()
pair[T, cbool] recv_()
void close()
# send/recv in select
_selcase sends(const T *ptx)
_selcase recvs()
_selcase recvs(T* prx)
_selcase recvs(T* prx, cbool *pok)
# length/capacity
unsigned len()
unsigned cap()
# compare wrt nil; =nil
cbool operator==(nullptr_t)
cbool operator!=(nullptr_t)
void operator=(nullptr_t)
# for tests
_chan *_rawchan()
chan[T] makechan[T]()
chan[T] makechan[T](unsigned size)
struct structZ:
pass
enum _chanop:
_CHANSEND
_CHANRECV
_DEFAULT
struct _selcase:
_chanop op
void *data
cbool *rxok
const _selcase default "golang::_default"
int select(_selcase casev[])
# ---- python bits ----
cdef void topyexc() except *
cpdef pypanic(arg)
# pychan is chan<object>
from cpython cimport PyObject
ctypedef PyObject *pPyObject # https://github.com/cython/cython/issues/534
from cython cimport final
@final
cdef class pychan:
cdef dict __dict__
cdef chan[pPyObject] ch
......@@ -33,10 +33,15 @@ from __future__ import print_function, absolute_import
_init_libgolang()
from cpython cimport Py_INCREF, Py_DECREF, PY_MAJOR_VERSION
cdef extern from "Python.h":
ctypedef struct PyTupleObject:
PyObject **ob_item
void Py_FatalError(const char *msg)
from libcpp.vector cimport vector
from cython cimport final
import sys
import threading, collections, random
# ---- panic ----
......@@ -136,311 +141,98 @@ cdef void __goviac(void *arg) nogil:
# ---- channels ----
# _RecvWaiting represents a receiver waiting on a chan.
class _RecvWaiting(object):
# .group _WaitGroup group of waiters this receiver is part of
# .chan chan channel receiver is waiting on
#
# on wakeup: sender|closer -> receiver:
# .rx_ rx_ for recv
def __init__(self, group, ch):
self.group = group
self.chan = ch
group.register(self)
# wakeup notifies waiting receiver that recv_ completed.
def wakeup(self, rx, ok):
self.rx_ = (rx, ok)
self.group.wakeup()
# _SendWaiting represents a sender waiting on a chan.
class _SendWaiting(object):
# .group _WaitGroup group of waiters this sender is part of
# .chan chan channel sender is waiting on
# .obj object that was passed to send
#
# on wakeup: receiver|closer -> sender:
# .ok bool whether send succeeded (it will not on close)
def __init__(self, group, ch, obj):
self.group = group
self.chan = ch
self.obj = obj
group.register(self)
# wakeup notifies waiting sender that send completed.
def wakeup(self, ok):
self.ok = ok
self.group.wakeup()
# _WaitGroup is a group of waiting senders and receivers.
#
# Only 1 waiter from the group can succeed waiting.
class _WaitGroup(object):
# ._waitv [] of _{Send|Recv}Waiting
# ._sema semaphore used for wakeup
#
# ._mu lock NOTE ∀ chan order is always: chan._mu > ._mu
#
# on wakeup: sender|receiver -> group:
# .which _{Send|Recv}Waiting instance which succeeded waiting.
def __init__(self):
self._waitv = []
self._sema = threading.Lock() # in python it is valid to release lock from another thread.
self._sema.acquire()
self._mu = threading.Lock()
self.which = None
def register(self, wait):
self._waitv.append(wait)
# try_to_win tries to win waiter after it was dequeued from a channel's {_send|_recv}q.
#
# -> ok: true if won, false - if not.
def try_to_win(self, waiter):
with self._mu:
if self.which is not None:
return False
else:
self.which = waiter
return True
# wait waits for winning case of group to complete.
def wait(self):
self._sema.acquire()
# wakeup wakes up the group.
#
# prior to wakeup try_to_win must have been called.
# in practice this means that waiters queued to chan.{_send|_recv}q must
# be dequeued with _dequeWaiter.
def wakeup(self):
assert self.which is not None
self._sema.release()
# dequeAll removes all registered waiters from their wait queues.
def dequeAll(self):
for w in self._waitv:
ch = w.chan
if isinstance(w, _SendWaiting):
queue = ch._sendq
else:
assert isinstance(w, _RecvWaiting)
queue = ch._recvq
with ch._mu:
try:
queue.remove(w)
except ValueError:
pass
# _dequeWaiter dequeues a send or recv waiter from a channel's _recvq or _sendq.
#
# the channel owning {_recv|_send}q must be locked.
def _dequeWaiter(queue):
while len(queue) > 0:
w = queue.popleft()
# if this waiter can win its group - return it.
# if not - someone else from its group already has won, and so we anyway have
# to remove the waiter from the queue.
if w.group.try_to_win(w):
return w
return None
# pychan is Python channel with Go semantic.
# pychan is chan<object>.
@final
cdef class pychan:
# ._cap channel capacity
# ._mu lock
# ._dataq deque *: data buffer
# ._recvq deque _RecvWaiting: blocked receivers
# ._sendq deque _SendWaiting: blocked senders
# ._closed bool
def __init__(ch, size=0):
ch._cap = size
ch._mu = threading.Lock()
ch._dataq = collections.deque()
ch._recvq = collections.deque()
ch._sendq = collections.deque()
ch._closed = False
# send sends object to a receiver.
def send(ch, obj):
if ch is pynilchan:
_blockforever()
ch._mu.acquire()
if 1:
ok = ch._trysend(obj)
if ok:
return
def __cinit__(pych, size=0):
pych.ch = makechan_pyobj_pyexc(size)
def __dealloc__(pych):
# on del: drain buffered channel to decref sent objects.
# verify that the channel is not connected anywhere outside us.
# (if it was present also somewhere else - draining would be incorrect)
if pych.ch == nil:
return
cdef int refcnt = _chanrefcnt(pych.ch._rawchan())
if refcnt != 1:
# cannot raise py-level exception in __dealloc__
Py_FatalError("pychan.__dealloc__: chan.refcnt=%d ; must be =1" % refcnt)
cdef chan[pPyObject] ch = pych.ch
pych.ch = nil # does _chanxdecref(ch)
cdef PyObject *_rx
while ch.len() != 0:
# NOTE *not* chanrecv_pyexc(ch):
# - recv must not block and must not panic as we verified that we
# are the only holder of the channel and that ch buffer is not empty.
# - even if recv panics, we cannot convert that panic to python
# exception in __dealloc__. So if it really panics - let the
# panic make it and crash the process similarly to Py_FatalError above.
_rx = ch.recv()
Py_DECREF(<object>_rx)
# ch is decref'ed automatically at return
g = _WaitGroup()
me = _SendWaiting(g, ch, obj)
ch._sendq.append(me)
ch._mu.release()
# send sends object to a receiver.
def send(pych, obj):
# increment obj reference count - until received the channel is holding pointer to the object.
Py_INCREF(obj)
g.wait()
assert g.which is me
if not me.ok:
pypanic("send on closed channel")
try:
with nogil:
chansend_pyexc(pych.ch, <PyObject *>obj)
except: # not only _PanicError as send can also throw e.g. bad_alloc
# the object was not sent - e.g. it was "send on a closed channel"
Py_DECREF(obj)
raise
# recv_ is "comma-ok" version of recv.
#
# ok is true - if receive was delivered by a successful send.
# ok is false - if receive is due to channel being closed and empty.
def recv_(ch): # -> (rx, ok)
if ch is pynilchan:
_blockforever()
ch._mu.acquire()
if 1:
rx_, ok = ch._tryrecv()
if ok:
return rx_
def recv_(pych): # -> (rx, ok)
cdef PyObject *_rx = NULL
cdef bint ok
g = _WaitGroup()
me = _RecvWaiting(g, ch)
ch._recvq.append(me)
with nogil:
_rx, ok = chanrecv__pyexc(pych.ch)
ch._mu.release()
if not ok:
return (None, ok)
g.wait()
assert g.which is me
return me.rx_
# we received the object and the channel dropped pointer to it.
rx = <object>_rx
Py_DECREF(rx)
return (rx, ok)
# recv receives from the channel.
def recv(ch): # -> rx
rx, _ = ch.recv_()
def recv(pych): # -> rx
rx, _ = pych.recv_() # TODO call recv_ via C
return rx
# _trysend(obj) -> ok
#
# must be called with ._mu held.
# if ok or panic - returns with ._mu released.
# if !ok - returns with ._mu still being held.
def _trysend(ch, obj):
if ch._closed:
ch._mu.release()
pypanic("send on closed channel")
# synchronous channel
if ch._cap == 0:
recv = _dequeWaiter(ch._recvq)
if recv is None:
return False
ch._mu.release()
recv.wakeup(obj, True)
return True
# buffered channel
else:
if len(ch._dataq) >= ch._cap:
return False
ch._dataq.append(obj)
recv = _dequeWaiter(ch._recvq)
if recv is not None:
rx = ch._dataq.popleft()
ch._mu.release()
recv.wakeup(rx, True)
else:
ch._mu.release()
return True
# _tryrecv() -> rx_=(rx, ok), ok
#
# must be called with ._mu held.
# if ok or panic - returns with ._mu released.
# if !ok - returns with ._mu still being held.
def _tryrecv(ch):
# buffered
if len(ch._dataq) > 0:
rx = ch._dataq.popleft()
# wakeup a blocked writer, if there is any
send = _dequeWaiter(ch._sendq)
if send is not None:
ch._dataq.append(send.obj)
ch._mu.release()
send.wakeup(True)
else:
ch._mu.release()
return (rx, True), True
# closed
if ch._closed:
ch._mu.release()
return (None, False), True
# sync | empty: there is waiting writer
send = _dequeWaiter(ch._sendq)
if send is None:
return (None, False), False
ch._mu.release()
rx = send.obj
send.wakeup(True)
return (rx, True), True
# close closes sending side of the channel.
def close(pych):
with nogil:
chanclose_pyexc(pych.ch)
def __len__(pych):
return chanlen_pyexc(pych.ch)
# close closes sending side of the channel.
def close(ch):
if ch is pynilchan:
pypanic("close of nil channel")
recvv = []
sendv = []
with ch._mu:
if ch._closed:
pypanic("close of closed channel")
ch._closed = True
# schedule: wake-up all readers
while 1:
recv = _dequeWaiter(ch._recvq)
if recv is None:
break
recvv.append(recv)
# schedule: wake-up all writers (they will panic)
while 1:
send = _dequeWaiter(ch._sendq)
if send is None:
break
sendv.append(send)
# perform scheduled wakeups outside of ._mu
for recv in recvv:
recv.wakeup(None, False)
for send in sendv:
send.wakeup(False)
def __len__(ch):
return len(ch._dataq)
def __repr__(ch):
if ch is pynilchan:
def __repr__(pych):
if pych.ch == nil:
return "nilchan"
else:
return super(pychan, ch).__repr__()
return super(pychan, pych).__repr__()
# pynilchan is the nil py channel.
#
# On nil channel: send/recv block forever; close panics.
pynilchan = pychan(None) # TODO -> <chan*>(NULL) after move to Cython
cdef pychan _pynilchan = pychan()
_pynilchan.ch = chan[pPyObject]() # = NULL
pynilchan = _pynilchan
# pydefault represents default case for pyselect.
......@@ -474,164 +266,90 @@ pydefault = object()
# # default case
# ...
def pyselect(*pycasev):
# select promise: if multiple cases are ready - one will be selected randomly
npycasev = list(enumerate(pycasev))
random.shuffle(npycasev)
# first pass: poll all cases and bail out in the end if default was provided
recvv = [] # [](n, ch, commaok)
sendv = [] # [](n, ch, tx)
ndefault = None
for (n, pycase) in npycasev:
# default: remember we have it
cdef int i, n = len(pycasev), selected
cdef vector[_selcase] casev = vector[_selcase](n)
cdef pychan pych
cdef PyObject *_rx = NULL # all select recvs are setup to receive into _rx
cdef cbool rxok = False # (its ok as only one receive will be actually executed)
# prepare casev for chanselect
for i in range(n):
pycase = pycasev[i]
# default
if pycase is pydefault:
if ndefault is not None:
pypanic("pyselect: multiple default")
ndefault = n
casev[i] = default
# send
elif type(pycase) is tuple:
if len(pycase) != 2:
pypanic("pyselect: invalid [%d]() case" % len(pycase))
_tcase = <PyTupleObject *>pycase
pysend, tx = pycase
pysend = <object>(_tcase.ob_item[0])
if pysend.__self__.__class__ is not pychan:
pypanic("pyselect: send on non-chan: %r" % (pysend.__self__.__class__,))
ch = pysend.__self__
pych = pysend.__self__
if pysend.__name__ != "send": # XXX better check PyCFunction directly
pypanic("pyselect: send expected: %r" % (pysend,))
if ch is not pynilchan: # nil chan is never ready
ch._mu.acquire()
if 1:
ok = ch._trysend(tx)
if ok:
return n, None
ch._mu.release()
# wire ptx through pycase[1]
p_tx = &(_tcase.ob_item[1])
tx = <object>(p_tx[0])
sendv.append((n, ch, tx))
# incref tx as if corresponding channel is holding pointer to the object while it is being sent.
# we'll decref the object if it won't be sent.
# see pychan.send for details.
Py_INCREF(tx)
casev[i] = pych.ch.sends(p_tx)
# recv
else:
pyrecv = pycase
if pyrecv.__self__.__class__ is not pychan:
pypanic("pyselect: recv on non-chan: %r" % (pyrecv.__self__.__class__,))
ch = pyrecv.__self__
pych = pyrecv.__self__
if pyrecv.__name__ == "recv": # XXX better check PyCFunction directly
commaok = False
casev[i] = pych.ch.recvs(&_rx)
elif pyrecv.__name__ == "recv_": # XXX better check PyCFunction directly
commaok = True
casev[i] = pych.ch.recvs(&_rx, &rxok)
else:
pypanic("pyselect: recv expected: %r" % (pyrecv,))
if ch is not pynilchan: # nil chan is never ready
ch._mu.acquire()
if 1:
rx_, ok = ch._tryrecv()
if ok:
if not commaok:
rx, ok = rx_
rx_ = rx
return n, rx_
ch._mu.release()
recvv.append((n, ch, commaok))
# execute default if we have it
if ndefault is not None:
return ndefault, None
# select{} or with nil-channels only -> block forever
if len(recvv) + len(sendv) == 0:
_blockforever()
# second pass: subscribe and wait on all rx/tx cases
g = _WaitGroup()
# selected returns what was selected in g.
# the return signature is the one of select.
def selected():
g.wait()
sel = g.which
if isinstance(sel, _SendWaiting):
if not sel.ok:
pypanic("send on closed channel")
return sel.sel_n, None
if isinstance(sel, _RecvWaiting):
rx_ = sel.rx_
if not sel.sel_commaok:
rx, ok = rx_
rx_ = rx
return sel.sel_n, rx_
raise AssertionError("select: unreachable")
selected = -1
try:
for n, ch, tx in sendv:
ch._mu.acquire()
with g._mu:
# a case that we previously queued already won
if g.which is not None:
ch._mu.release()
return selected()
ok = ch._trysend(tx)
if ok:
# don't let already queued cases win
g.which = "tx prepoll won" # !None
return n, None
w = _SendWaiting(g, ch, tx)
w.sel_n = n
ch._sendq.append(w)
ch._mu.release()
for n, ch, commaok in recvv:
ch._mu.acquire()
with g._mu:
# a case that we previously queued already won
if g.which is not None:
ch._mu.release()
return selected()
rx_, ok = ch._tryrecv()
if ok:
# don't let already queued cases win
g.which = "rx prepoll won" # !None
if not commaok:
rx, ok = rx_
rx_ = rx
return n, rx_
w = _RecvWaiting(g, ch)
w.sel_n = n
w.sel_commaok = commaok
ch._recvq.append(w)
ch._mu.release()
return selected()
with nogil:
selected = _chanselect_pyexc(&casev[0], casev.size())
finally:
# unsubscribe not-succeeded waiters
g.dequeAll()
# _blockforever blocks current goroutine forever.
_tblockforever = None
def _blockforever():
if _tblockforever is not None:
_tblockforever()
# take a lock twice. It will forever block on the second lock attempt.
# Under gevent, similarly to Go, this raises "LoopExit: This operation
# would block forever", if there are no other greenlets scheduled to be run.
dead = threading.Lock()
dead.acquire()
dead.acquire()
# decref not sent tx (see ^^^ send prepare)
for i in range(n):
if casev[i].op == _CHANSEND and (i != selected):
p_tx = <PyObject **>casev[i].data
_tx = p_tx[0]
tx = <object>_tx
Py_DECREF(tx)
# return what was selected
cdef _chanop op = casev[selected].op
if op == _DEFAULT:
return selected, None
if op == _CHANSEND:
return selected, None
if op != _CHANRECV:
raise AssertionError("pyselect: chanselect returned with bad op")
# we received NULL or the object; if it is object, corresponding channel
# dropped pointer to it (see pychan.recv_ for details).
cdef object rx = None
if _rx != NULL:
rx = <object>_rx
Py_DECREF(rx)
if casev[selected].rxok != NULL:
return selected, (rx, rxok)
else:
return selected, rx
# ---- init libgolang runtime ---
......@@ -668,9 +386,30 @@ cdef void _init_libgolang() except*:
# ---- misc ----
cdef extern from "golang/libgolang.h" namespace "golang" nogil:
int _chanrefcnt(_chan *ch)
int _chanselect(_selcase *casev, int casec)
void _taskgo(void (*f)(void *), void *arg)
cdef nogil:
chan[pPyObject] makechan_pyobj_pyexc(unsigned size) except +topyexc:
return makechan[pPyObject](size)
void chansend_pyexc(chan[pPyObject] ch, PyObject *_tx) except +topyexc:
ch.send(_tx)
(PyObject*, bint) chanrecv__pyexc(chan[pPyObject] ch) except +topyexc:
_ = ch.recv_()
return (_.first, _.second) # TODO teach Cython to coerce pair[X,Y] -> (X,Y)
void chanclose_pyexc(chan[pPyObject] ch) except +topyexc:
ch.close()
unsigned chanlen_pyexc(chan[pPyObject] ch) except +topyexc:
return ch.len()
int _chanselect_pyexc(const _selcase *casev, int casec) except +topyexc:
return _chanselect(casev, casec)
void _taskgo_pyexc(void (*f)(void *) nogil, void *arg) except +topyexc:
_taskgo(f, arg)
......@@ -26,21 +26,30 @@
from __future__ import print_function, absolute_import
from golang cimport go, pychan, panic, pypanic, topyexc
from golang import nilchan
from golang import _golang
from golang cimport go, chan, _chan, makechan, pychan, nil, select, \
default, structZ, panic, pypanic, topyexc, cbool
from golang import time
cdef extern from "golang/libgolang.h" namespace "golang" nogil:
int _tchanrecvqlen(_chan *ch)
int _tchansendqlen(_chan *ch)
void (*_tblockforever)()
# pylen_{recv,send}q returns len(pych._{recv,send}q)
# pylen_{recv,send}q returns len(_chan._{recv,send}q)
def pylen_recvq(pychan pych not None): # -> int
if pych is nilchan:
if pych.ch == nil:
raise AssertionError('len(.recvq) on nil channel')
return len(pych._recvq)
return _tchanrecvqlen(pych.ch._rawchan())
def pylen_sendq(pychan pych not None): # -> int
if pych is nilchan:
if pych.ch == nil:
raise AssertionError('len(.sendq) on nil channel')
return len(pych._sendq)
return _tchansendqlen(pych.ch._rawchan())
# runtime/libgolang_test.cpp
cdef extern from *:
"""
extern void waitBlocked(golang::_chan *ch, bool rx, bool tx);
"""
void waitBlocked(_chan *, bint rx, bint tx) nogil except +topyexc
# pywaitBlocked waits till a receive or send pychan operation blocks waiting on the channel.
#
......@@ -49,7 +58,8 @@ def pywaitBlocked(pychanop):
if pychanop.__self__.__class__ is not pychan:
pypanic("wait blocked: %r is method of a non-chan: %r" % (pychanop, pychanop.__self__.__class__))
cdef pychan pych = pychanop.__self__
recv = send = False
cdef bint recv = False
cdef bint send = False
if pychanop.__name__ == "recv": # XXX better check PyCFunction directly
recv = True
elif pychanop.__name__ == "send": # XXX better check PyCFunction directly
......@@ -57,41 +67,84 @@ def pywaitBlocked(pychanop):
else:
pypanic("wait blocked: unexpected chan method: %r" % (pychanop,))
t0 = time.now()
while 1:
with pych._mu:
if recv and pylen_recvq(pych) > 0:
return
if send and pylen_sendq(pych) > 0:
return
now = time.now()
if now-t0 > 10: # waited > 10 seconds - likely deadlock
pypanic("deadlock")
time.sleep(0) # yield to another thread / coroutine
with nogil:
waitBlocked(pych.ch._rawchan(), recv, send)
# `with pypanicWhenBlocked` hooks into _golang._blockforever to raise panic with
# `with pypanicWhenBlocked` hooks into libgolang _blockforever to raise panic with
# "t: blocks forever" instead of blocking.
cdef class pypanicWhenBlocked:
def __enter__(t):
assert _golang._tblockforever is None
_golang._tblockforever = _panicblocked
global _tblockforever
_tblockforever = _panicblocked
return t
def __exit__(t, typ, val, tb):
_golang._tblockforever = None
def _panicblocked():
pypanic("t: blocks forever")
_tblockforever = NULL
cdef void _panicblocked() nogil:
panic("t: blocks forever")
# small test to verify pyx(nogil) channels.
ctypedef struct Point:
int x
int y
# TODO kill this and teach Cython to coerce pair[X,Y] -> (X,Y)
cdef (int, cbool) recv_(chan[int] ch) nogil:
_ = ch.recv_()
return (_.first, _.second)
cdef void _test_chan_nogil() nogil except +topyexc:
cdef chan[structZ] done = makechan[structZ]()
cdef chan[int] chi = makechan[int](1)
cdef chan[Point] chp = makechan[Point]()
chp = nil # reset to nil
cdef int i, j
cdef Point p
cdef cbool jok
i=+1; chi.send(i)
j=-1; j = chi.recv()
if not (j == i):
panic("send -> recv != I")
i = 2
_=select([
done.recvs(), # 0
chi.sends(&i), # 1
chp.recvs(&p), # 2
chi.recvs(&j, &jok), # 3
default, # 4
])
if _ != 1:
panic("select: selected !1")
j, jok = recv_(chi)
if not (j == 2 and jok == True):
panic("recv_ != (2, true)")
chi.close()
j, jok = recv_(chi)
if not (j == 0 and jok == False):
panic("recv_ from closed != (0, false)")
def test_chan_nogil():
with nogil:
_test_chan_nogil()