Commit 3b241983 authored by Kirill Smelkov's avatar Kirill Smelkov

Port/move channels to C/C++/Pyx

- Move channels implementation to be done in C++ inside libgolang. The
  code and logic is based on previous Python-level channels
  implementation, but the new code is just C++ and does not depend on
  Python nor GIL at all, and so works without GIL if libgolang
  runtime works without GIL(*).

  (*) for example "thread" runtime works without GIL, while "gevent" runtime
      acquires GIL on every semaphore acquire.

  New channels implementation is located in δ(libgolang.cpp).

- Provide low-level C channels API to the implementation. The low-level
  C API was inspired by Libtask[1] and Plan9/Libthread[2].

  [1] Libtask: a Coroutine Library for C and Unix. https://swtch.com/libtask.
  [2] http://9p.io/magic/man2html/2/thread.

- Provide high-level C++ channels API that provides type-safety and
  automatic channel lifetime management.

  Overview of C and C++ APIs are in δ(libgolang.h).

- Expose C++ channels API at Pyx level as Cython/nogil API so that Cython
  programs could use channels with ease and without need to care about
  lifetime management and low-level details.

  Overview of Cython/nogil channels API is in δ(README.rst) and
  δ(_golang.pxd).

- Turn Python channels to be tiny wrapper around chan<PyObject>.

Implementation note:

- gevent case needs special care because greenlet, which gevent uses,
  swaps coroutine stack from C stack to heap on coroutine park, and
  replaces that space on C stack with stack of activated coroutine
  copied back from heap. This way if an object on g's stack is accessed
  while g is parked it would be memory of another g's stack.

  The channels implementation explicitly cares about this issue so that
  stack -> * channel send, or * -> stack channel receive work correctly.

  It should be noted that greenlet approach, which it inherits from
  stackless, is not only a bit tricky, but also comes with overhead
  (stack <-> heap copy), and prevents a coroutine to migrate from 1 OS
  thread to another OS thread as that would change addresses of on-stack
  things for that coroutine.

  As the latter property prevents to use multiple CPUs even if the
  program / runtime are prepared to work without GIL, it would be more
  logical to change gevent/greenlet to use separate stack for each
  coroutine. That would remove stack <-> heap copy and the need for
  special care in channels implementation for stack - stack sends.
  Such approach should be possible to implement with e.g. swapcontext or
  similar mechanism, and a proof of concept of such work wrapped into
  greenlet-compatible API exists[3]. It would be good if at some point
  there would be a chance to explore such approach in Pygolang context.

  [3] https://github.com/python-greenlet/greenlet/issues/113#issuecomment-264529838 and below

Just this patch brings in the following speedup at Python level:

 (on i7@2.6GHz)

thread runtime:

    name             old time/op  new time/op  delta
    go               20.0µs ± 1%  15.6µs ± 1%  -21.84%  (p=0.000 n=10+10)
    chan             9.37µs ± 4%  2.89µs ± 6%  -69.12%  (p=0.000 n=10+10)
    select           20.2µs ± 4%   3.4µs ± 5%  -83.20%  (p=0.000 n=8+10)
    def              58.0ns ± 0%  60.0ns ± 0%   +3.45%  (p=0.000 n=8+10)
    func_def         43.8µs ± 1%  43.9µs ± 1%     ~     (p=0.796 n=10+10)
    call             62.4ns ± 1%  63.5ns ± 1%   +1.76%  (p=0.001 n=10+10)
    func_call        1.06µs ± 1%  1.05µs ± 1%   -0.63%  (p=0.002 n=10+10)
    try_finally       136ns ± 0%   137ns ± 0%   +0.74%  (p=0.000 n=9+10)
    defer            2.28µs ± 1%  2.33µs ± 1%   +2.34%  (p=0.000 n=10+10)
    workgroup_empty  48.2µs ± 1%  34.1µs ± 2%  -29.18%  (p=0.000 n=9+10)
    workgroup_raise  58.9µs ± 1%  45.5µs ± 1%  -22.74%  (p=0.000 n=10+10)

gevent runtime:

    name             old time/op  new time/op  delta
    go               24.7µs ± 1%  15.9µs ± 1%  -35.72%  (p=0.000 n=9+9)
    chan             11.6µs ± 1%   7.3µs ± 1%  -36.74%  (p=0.000 n=10+10)
    select           22.5µs ± 1%  10.4µs ± 1%  -53.73%  (p=0.000 n=10+10)
    def              55.0ns ± 0%  55.0ns ± 0%     ~     (all equal)
    func_def         43.6µs ± 1%  43.6µs ± 1%     ~     (p=0.684 n=10+10)
    call             63.0ns ± 0%  64.0ns ± 0%   +1.59%  (p=0.000 n=10+10)
    func_call        1.06µs ± 1%  1.07µs ± 1%   +0.45%  (p=0.045 n=10+9)
    try_finally       135ns ± 0%   137ns ± 0%   +1.48%  (p=0.000 n=10+10)
    defer            2.31µs ± 1%  2.33µs ± 1%   +0.89%  (p=0.000 n=10+10)
    workgroup_empty  70.2µs ± 0%  55.8µs ± 0%  -20.63%  (p=0.000 n=10+10)
    workgroup_raise  90.3µs ± 0%  70.9µs ± 1%  -21.51%  (p=0.000 n=9+10)

The whole Cython/nogil work - starting from 8fa3c15b (Start using Cython
and providing Cython/nogil API) to this patch - brings in the following
speedup at Python level:

 (on i7@2.6GHz)

thread runtime:

    name             old time/op  new time/op  delta
    go               92.9µs ± 1%  15.6µs ± 1%  -83.16%  (p=0.000 n=10+10)
    chan             13.9µs ± 1%   2.9µs ± 6%  -79.14%  (p=0.000 n=10+10)
    select           29.7µs ± 6%   3.4µs ± 5%  -88.55%  (p=0.000 n=10+10)
    def              57.0ns ± 0%  60.0ns ± 0%   +5.26%  (p=0.000 n=10+10)
    func_def         44.0µs ± 1%  43.9µs ± 1%     ~     (p=0.055 n=10+10)
    call             63.5ns ± 1%  63.5ns ± 1%     ~     (p=1.000 n=10+10)
    func_call        1.06µs ± 0%  1.05µs ± 1%   -1.31%  (p=0.000 n=10+10)
    try_finally       139ns ± 0%   137ns ± 0%   -1.44%  (p=0.000 n=10+10)
    defer            2.36µs ± 1%  2.33µs ± 1%   -1.26%  (p=0.000 n=10+10)
    workgroup_empty  98.4µs ± 1%  34.1µs ± 2%  -65.32%  (p=0.000 n=10+10)
    workgroup_raise   135µs ± 1%    46µs ± 1%  -66.35%  (p=0.000 n=10+10)

gevent runtime:

    name             old time/op  new time/op  delta
    go               68.8µs ± 1%  15.9µs ± 1%  -76.91%  (p=0.000 n=10+9)
    chan             14.8µs ± 1%   7.3µs ± 1%  -50.67%  (p=0.000 n=10+10)
    select           32.0µs ± 0%  10.4µs ± 1%  -67.57%  (p=0.000 n=10+10)
    def              58.0ns ± 0%  55.0ns ± 0%   -5.17%  (p=0.000 n=10+10)
    func_def         43.9µs ± 1%  43.6µs ± 1%   -0.53%  (p=0.035 n=10+10)
    call             63.5ns ± 1%  64.0ns ± 0%   +0.79%  (p=0.033 n=10+10)
    func_call        1.08µs ± 1%  1.07µs ± 1%   -1.74%  (p=0.000 n=10+9)
    try_finally       142ns ± 0%   137ns ± 0%   -3.52%  (p=0.000 n=10+10)
    defer            2.32µs ± 1%  2.33µs ± 1%   +0.71%  (p=0.005 n=10+10)
    workgroup_empty  90.3µs ± 0%  55.8µs ± 0%  -38.26%  (p=0.000 n=10+10)
    workgroup_raise   108µs ± 1%    71µs ± 1%  -34.64%  (p=0.000 n=10+10)

This patch is the final patch in series to reach the goal of providing
channels that could be used in Cython/nogil code.

Cython/nogil channels work is dedicated to the memory of Вера Павловна Супрун[4].

[4] https://navytux.spb.ru/memory/%D0%A2%D1%91%D1%82%D1%8F%20%D0%92%D0%B5%D1%80%D0%B0.pdf#page=3
parent 9efb6575
......@@ -170,7 +170,7 @@ located in `src/` under `$GOPATH`.
Cython/nogil API
----------------
Cython package `golang` provides *nogil* API with goroutines and
Cython package `golang` provides *nogil* API with goroutines, channels and
other features that mirror corresponding Python package. Cython API is not only
faster compared to Python version, but also, due to *nogil* property, allows to
build concurrent systems without limitations imposed by Python's GIL. All that
......@@ -178,14 +178,59 @@ while still programming in Python-like language. Brief description of
Cython/nogil API follows:
`go` spawns new task - a coroutine, or thread, depending on activated runtime.
For example::
`chan[T]` represents a channel with Go semantic and elements of type `T`.
Use `makechan[T]` to create new channel, and `chan[T].recv`, `chan[T].send`,
`chan[T].close` for communication. `nil` stands for the nil channel. `select`
can be used to multiplex on several channels. For example::
cdef nogil:
void worker():
pass
struct Point:
int x
int y
void worker(chan[int] chi, chan[Point] chp):
chi.send(1)
cdef Point p
p.x = 3
p.y = 4
chp.send(p)
void myfunc():
go(worker)
cdef chan[int] chi = makechan[int]() # synchronous channel of integers
cdef chan[Point] chp = makechan[Point](3) # channel with buffer of size 3 and Point elements
go(worker, chi, chp)
i = chi.recv() # will give 1
p = chp.recv() # will give Point(3,4)
chp = nil # rebind chp to nil channel
cdef cbool ok
cdef int j = 33
_ = select([
chi.recvs(&i) # 0
chi.recvs(&i, &ok), # 1
chi.sends(&j), # 2
chp.recvs(&p), # 3
default, # 4
])
if _ == 0:
# i is what was received from chi
...
if _ == 1:
# (i, ok) is what was received from chi
...
if _ == 2:
# we know j was sent to chi
...
if _ == 3:
# this case will be never selected because
# send/recv on nil channel block forever.
...
if _ == 4:
# default case
...
`panic` stops normal execution of current goroutine by throwing a C-level
exception. On Python/C boundaries C-level exceptions have to be converted to
......
......@@ -23,6 +23,7 @@ Cython/nogil API
----------------
- `go` spawns lightweight thread.
- `chan[T]`, `makechan[T]` and `select` provide C-level channels with Go semantic.
- `panic` stops normal execution of current goroutine by throwing a C-level exception.
Everything in Cython/nogil API do not depend on Python runtime and in
......@@ -38,10 +39,16 @@ Golang.py runtime
In addition to Cython/nogil API, golang.pyx provides runtime for golang.py:
- Python-level channels are represented by pychan + pyselect.
- Python-level panic is represented by pypanic.
"""
from libcpp cimport nullptr_t, nullptr as nil
from libcpp.utility cimport pair
cdef extern from *:
ctypedef bint cbool "bool"
# nogil pyx-level golang API.
#
# NOTE even though many functions may panic (= throw C++ exception) nothing is
......@@ -58,14 +65,63 @@ cdef extern from "golang/libgolang.h" namespace "golang" nogil:
void go(...) # typechecking is done by C
struct _chan
cppclass chan[T]:
chan();
# send/recv/close
void send(const T&)
T recv()
pair[T, cbool] recv_()
void close()
# send/recv in select
_selcase sends(const T *ptx)
_selcase recvs()
_selcase recvs(T* prx)
_selcase recvs(T* prx, cbool *pok)
# length/capacity
unsigned len()
unsigned cap()
# compare wrt nil; =nil
cbool operator==(nullptr_t)
cbool operator!=(nullptr_t)
void operator=(nullptr_t)
# for tests
_chan *_rawchan()
chan[T] makechan[T]()
chan[T] makechan[T](unsigned size)
struct structZ:
pass
enum _chanop:
_CHANSEND
_CHANRECV
_DEFAULT
struct _selcase:
_chanop op
void *data
cbool *rxok
const _selcase default "golang::_default"
int select(_selcase casev[])
# ---- python bits ----
cdef void topyexc() except *
cpdef pypanic(arg)
# pychan is chan<object>
from cpython cimport PyObject
ctypedef PyObject *pPyObject # https://github.com/cython/cython/issues/534
from cython cimport final
@final
cdef class pychan:
cdef dict __dict__
cdef chan[pPyObject] ch
......@@ -33,10 +33,15 @@ from __future__ import print_function, absolute_import
_init_libgolang()
from cpython cimport Py_INCREF, Py_DECREF, PY_MAJOR_VERSION
cdef extern from "Python.h":
ctypedef struct PyTupleObject:
PyObject **ob_item
void Py_FatalError(const char *msg)
from libcpp.vector cimport vector
from cython cimport final
import sys
import threading, collections, random
# ---- panic ----
......@@ -136,311 +141,98 @@ cdef void __goviac(void *arg) nogil:
# ---- channels ----
# _RecvWaiting represents a receiver waiting on a chan.
class _RecvWaiting(object):
# .group _WaitGroup group of waiters this receiver is part of
# .chan chan channel receiver is waiting on
#
# on wakeup: sender|closer -> receiver:
# .rx_ rx_ for recv
def __init__(self, group, ch):
self.group = group
self.chan = ch
group.register(self)
# wakeup notifies waiting receiver that recv_ completed.
def wakeup(self, rx, ok):
self.rx_ = (rx, ok)
self.group.wakeup()
# _SendWaiting represents a sender waiting on a chan.
class _SendWaiting(object):
# .group _WaitGroup group of waiters this sender is part of
# .chan chan channel sender is waiting on
# .obj object that was passed to send
#
# on wakeup: receiver|closer -> sender:
# .ok bool whether send succeeded (it will not on close)
def __init__(self, group, ch, obj):
self.group = group
self.chan = ch
self.obj = obj
group.register(self)
# wakeup notifies waiting sender that send completed.
def wakeup(self, ok):
self.ok = ok
self.group.wakeup()
# _WaitGroup is a group of waiting senders and receivers.
#
# Only 1 waiter from the group can succeed waiting.
class _WaitGroup(object):
# ._waitv [] of _{Send|Recv}Waiting
# ._sema semaphore used for wakeup
#
# ._mu lock NOTE ∀ chan order is always: chan._mu > ._mu
#
# on wakeup: sender|receiver -> group:
# .which _{Send|Recv}Waiting instance which succeeded waiting.
def __init__(self):
self._waitv = []
self._sema = threading.Lock() # in python it is valid to release lock from another thread.
self._sema.acquire()
self._mu = threading.Lock()
self.which = None
def register(self, wait):
self._waitv.append(wait)
# try_to_win tries to win waiter after it was dequeued from a channel's {_send|_recv}q.
#
# -> ok: true if won, false - if not.
def try_to_win(self, waiter):
with self._mu:
if self.which is not None:
return False
else:
self.which = waiter
return True
# wait waits for winning case of group to complete.
def wait(self):
self._sema.acquire()
# wakeup wakes up the group.
#
# prior to wakeup try_to_win must have been called.
# in practice this means that waiters queued to chan.{_send|_recv}q must
# be dequeued with _dequeWaiter.
def wakeup(self):
assert self.which is not None
self._sema.release()
# dequeAll removes all registered waiters from their wait queues.
def dequeAll(self):
for w in self._waitv:
ch = w.chan
if isinstance(w, _SendWaiting):
queue = ch._sendq
else:
assert isinstance(w, _RecvWaiting)
queue = ch._recvq
with ch._mu:
try:
queue.remove(w)
except ValueError:
pass
# _dequeWaiter dequeues a send or recv waiter from a channel's _recvq or _sendq.
#
# the channel owning {_recv|_send}q must be locked.
def _dequeWaiter(queue):
while len(queue) > 0:
w = queue.popleft()
# if this waiter can win its group - return it.
# if not - someone else from its group already has won, and so we anyway have
# to remove the waiter from the queue.
if w.group.try_to_win(w):
return w
return None
# pychan is Python channel with Go semantic.
# pychan is chan<object>.
@final
cdef class pychan:
# ._cap channel capacity
# ._mu lock
# ._dataq deque *: data buffer
# ._recvq deque _RecvWaiting: blocked receivers
# ._sendq deque _SendWaiting: blocked senders
# ._closed bool
def __init__(ch, size=0):
ch._cap = size
ch._mu = threading.Lock()
ch._dataq = collections.deque()
ch._recvq = collections.deque()
ch._sendq = collections.deque()
ch._closed = False
# send sends object to a receiver.
def send(ch, obj):
if ch is pynilchan:
_blockforever()
ch._mu.acquire()
if 1:
ok = ch._trysend(obj)
if ok:
return
def __cinit__(pych, size=0):
pych.ch = makechan_pyobj_pyexc(size)
def __dealloc__(pych):
# on del: drain buffered channel to decref sent objects.
# verify that the channel is not connected anywhere outside us.
# (if it was present also somewhere else - draining would be incorrect)
if pych.ch == nil:
return
cdef int refcnt = _chanrefcnt(pych.ch._rawchan())
if refcnt != 1:
# cannot raise py-level exception in __dealloc__
Py_FatalError("pychan.__dealloc__: chan.refcnt=%d ; must be =1" % refcnt)
cdef chan[pPyObject] ch = pych.ch
pych.ch = nil # does _chanxdecref(ch)
cdef PyObject *_rx
while ch.len() != 0:
# NOTE *not* chanrecv_pyexc(ch):
# - recv must not block and must not panic as we verified that we
# are the only holder of the channel and that ch buffer is not empty.
# - even if recv panics, we cannot convert that panic to python
# exception in __dealloc__. So if it really panics - let the
# panic make it and crash the process similarly to Py_FatalError above.
_rx = ch.recv()
Py_DECREF(<object>_rx)
# ch is decref'ed automatically at return
g = _WaitGroup()
me = _SendWaiting(g, ch, obj)
ch._sendq.append(me)
ch._mu.release()
# send sends object to a receiver.
def send(pych, obj):
# increment obj reference count - until received the channel is holding pointer to the object.
Py_INCREF(obj)
g.wait()
assert g.which is me
if not me.ok:
pypanic("send on closed channel")
try:
with nogil:
chansend_pyexc(pych.ch, <PyObject *>obj)
except: # not only _PanicError as send can also throw e.g. bad_alloc
# the object was not sent - e.g. it was "send on a closed channel"
Py_DECREF(obj)
raise
# recv_ is "comma-ok" version of recv.
#
# ok is true - if receive was delivered by a successful send.
# ok is false - if receive is due to channel being closed and empty.
def recv_(ch): # -> (rx, ok)
if ch is pynilchan:
_blockforever()
ch._mu.acquire()
if 1:
rx_, ok = ch._tryrecv()
if ok:
return rx_
def recv_(pych): # -> (rx, ok)
cdef PyObject *_rx = NULL
cdef bint ok
g = _WaitGroup()
me = _RecvWaiting(g, ch)
ch._recvq.append(me)
with nogil:
_rx, ok = chanrecv__pyexc(pych.ch)
ch._mu.release()
if not ok:
return (None, ok)
g.wait()
assert g.which is me
return me.rx_
# we received the object and the channel dropped pointer to it.
rx = <object>_rx
Py_DECREF(rx)
return (rx, ok)
# recv receives from the channel.
def recv(ch): # -> rx
rx, _ = ch.recv_()
def recv(pych): # -> rx
rx, _ = pych.recv_() # TODO call recv_ via C
return rx
# _trysend(obj) -> ok
#
# must be called with ._mu held.
# if ok or panic - returns with ._mu released.
# if !ok - returns with ._mu still being held.
def _trysend(ch, obj):
if ch._closed:
ch._mu.release()
pypanic("send on closed channel")
# synchronous channel
if ch._cap == 0:
recv = _dequeWaiter(ch._recvq)
if recv is None:
return False
ch._mu.release()
recv.wakeup(obj, True)
return True
# buffered channel
else:
if len(ch._dataq) >= ch._cap:
return False
ch._dataq.append(obj)
recv = _dequeWaiter(ch._recvq)
if recv is not None:
rx = ch._dataq.popleft()
ch._mu.release()
recv.wakeup(rx, True)
else:
ch._mu.release()
return True
# _tryrecv() -> rx_=(rx, ok), ok
#
# must be called with ._mu held.
# if ok or panic - returns with ._mu released.
# if !ok - returns with ._mu still being held.
def _tryrecv(ch):
# buffered
if len(ch._dataq) > 0:
rx = ch._dataq.popleft()
# wakeup a blocked writer, if there is any
send = _dequeWaiter(ch._sendq)
if send is not None:
ch._dataq.append(send.obj)
ch._mu.release()
send.wakeup(True)
else:
ch._mu.release()
return (rx, True), True
# closed
if ch._closed:
ch._mu.release()
return (None, False), True
# sync | empty: there is waiting writer
send = _dequeWaiter(ch._sendq)
if send is None:
return (None, False), False
ch._mu.release()
rx = send.obj
send.wakeup(True)
return (rx, True), True
# close closes sending side of the channel.
def close(pych):
with nogil:
chanclose_pyexc(pych.ch)
def __len__(pych):
return chanlen_pyexc(pych.ch)
# close closes sending side of the channel.
def close(ch):
if ch is pynilchan:
pypanic("close of nil channel")
recvv = []
sendv = []
with ch._mu:
if ch._closed:
pypanic("close of closed channel")
ch._closed = True
# schedule: wake-up all readers
while 1:
recv = _dequeWaiter(ch._recvq)
if recv is None:
break
recvv.append(recv)
# schedule: wake-up all writers (they will panic)
while 1:
send = _dequeWaiter(ch._sendq)
if send is None:
break
sendv.append(send)
# perform scheduled wakeups outside of ._mu
for recv in recvv:
recv.wakeup(None, False)
for send in sendv:
send.wakeup(False)
def __len__(ch):
return len(ch._dataq)
def __repr__(ch):
if ch is pynilchan:
def __repr__(pych):
if pych.ch == nil:
return "nilchan"
else:
return super(pychan, ch).__repr__()
return super(pychan, pych).__repr__()
# pynilchan is the nil py channel.
#
# On nil channel: send/recv block forever; close panics.
pynilchan = pychan(None) # TODO -> <chan*>(NULL) after move to Cython
cdef pychan _pynilchan = pychan()
_pynilchan.ch = chan[pPyObject]() # = NULL
pynilchan = _pynilchan
# pydefault represents default case for pyselect.
......@@ -474,164 +266,90 @@ pydefault = object()
# # default case
# ...
def pyselect(*pycasev):
# select promise: if multiple cases are ready - one will be selected randomly
npycasev = list(enumerate(pycasev))
random.shuffle(npycasev)
# first pass: poll all cases and bail out in the end if default was provided
recvv = [] # [](n, ch, commaok)
sendv = [] # [](n, ch, tx)
ndefault = None
for (n, pycase) in npycasev:
# default: remember we have it
cdef int i, n = len(pycasev), selected
cdef vector[_selcase] casev = vector[_selcase](n)
cdef pychan pych
cdef PyObject *_rx = NULL # all select recvs are setup to receive into _rx
cdef cbool rxok = False # (its ok as only one receive will be actually executed)
# prepare casev for chanselect
for i in range(n):
pycase = pycasev[i]
# default
if pycase is pydefault:
if ndefault is not None:
pypanic("pyselect: multiple default")
ndefault = n
casev[i] = default
# send
elif type(pycase) is tuple:
if len(pycase) != 2:
pypanic("pyselect: invalid [%d]() case" % len(pycase))
_tcase = <PyTupleObject *>pycase
pysend, tx = pycase
pysend = <object>(_tcase.ob_item[0])
if pysend.__self__.__class__ is not pychan:
pypanic("pyselect: send on non-chan: %r" % (pysend.__self__.__class__,))
ch = pysend.__self__
pych = pysend.__self__
if pysend.__name__ != "send": # XXX better check PyCFunction directly
pypanic("pyselect: send expected: %r" % (pysend,))
if ch is not pynilchan: # nil chan is never ready
ch._mu.acquire()
if 1:
ok = ch._trysend(tx)
if ok:
return n, None
ch._mu.release()
# wire ptx through pycase[1]
p_tx = &(_tcase.ob_item[1])
tx = <object>(p_tx[0])
sendv.append((n, ch, tx))
# incref tx as if corresponding channel is holding pointer to the object while it is being sent.
# we'll decref the object if it won't be sent.
# see pychan.send for details.
Py_INCREF(tx)
casev[i] = pych.ch.sends(p_tx)
# recv
else:
pyrecv = pycase
if pyrecv.__self__.__class__ is not pychan:
pypanic("pyselect: recv on non-chan: %r" % (pyrecv.__self__.__class__,))
ch = pyrecv.__self__
pych = pyrecv.__self__
if pyrecv.__name__ == "recv": # XXX better check PyCFunction directly
commaok = False
casev[i] = pych.ch.recvs(&_rx)
elif pyrecv.__name__ == "recv_": # XXX better check PyCFunction directly
commaok = True
casev[i] = pych.ch.recvs(&_rx, &rxok)
else:
pypanic("pyselect: recv expected: %r" % (pyrecv,))
if ch is not pynilchan: # nil chan is never ready
ch._mu.acquire()
if 1:
rx_, ok = ch._tryrecv()
if ok:
if not commaok:
rx, ok = rx_
rx_ = rx
return n, rx_
ch._mu.release()
recvv.append((n, ch, commaok))
# execute default if we have it
if ndefault is not None:
return ndefault, None
# select{} or with nil-channels only -> block forever
if len(recvv) + len(sendv) == 0:
_blockforever()
# second pass: subscribe and wait on all rx/tx cases
g = _WaitGroup()
# selected returns what was selected in g.
# the return signature is the one of select.
def selected():
g.wait()
sel = g.which
if isinstance(sel, _SendWaiting):
if not sel.ok:
pypanic("send on closed channel")
return sel.sel_n, None
if isinstance(sel, _RecvWaiting):
rx_ = sel.rx_
if not sel.sel_commaok:
rx, ok = rx_
rx_ = rx
return sel.sel_n, rx_
raise AssertionError("select: unreachable")
selected = -1
try:
for n, ch, tx in sendv:
ch._mu.acquire()
with g._mu:
# a case that we previously queued already won
if g.which is not None:
ch._mu.release()
return selected()
ok = ch._trysend(tx)
if ok:
# don't let already queued cases win
g.which = "tx prepoll won" # !None
return n, None
w = _SendWaiting(g, ch, tx)
w.sel_n = n
ch._sendq.append(w)
ch._mu.release()
for n, ch, commaok in recvv:
ch._mu.acquire()
with g._mu:
# a case that we previously queued already won
if g.which is not None:
ch._mu.release()
return selected()
rx_, ok = ch._tryrecv()
if ok:
# don't let already queued cases win
g.which = "rx prepoll won" # !None
if not commaok:
rx, ok = rx_
rx_ = rx
return n, rx_
w = _RecvWaiting(g, ch)
w.sel_n = n
w.sel_commaok = commaok
ch._recvq.append(w)
ch._mu.release()
return selected()
with nogil:
selected = _chanselect_pyexc(&casev[0], casev.size())
finally:
# unsubscribe not-succeeded waiters
g.dequeAll()
# _blockforever blocks current goroutine forever.
_tblockforever = None
def _blockforever():
if _tblockforever is not None:
_tblockforever()
# take a lock twice. It will forever block on the second lock attempt.
# Under gevent, similarly to Go, this raises "LoopExit: This operation
# would block forever", if there are no other greenlets scheduled to be run.
dead = threading.Lock()
dead.acquire()
dead.acquire()
# decref not sent tx (see ^^^ send prepare)
for i in range(n):
if casev[i].op == _CHANSEND and (i != selected):
p_tx = <PyObject **>casev[i].data
_tx = p_tx[0]
tx = <object>_tx
Py_DECREF(tx)
# return what was selected
cdef _chanop op = casev[selected].op
if op == _DEFAULT:
return selected, None
if op == _CHANSEND:
return selected, None
if op != _CHANRECV:
raise AssertionError("pyselect: chanselect returned with bad op")
# we received NULL or the object; if it is object, corresponding channel
# dropped pointer to it (see pychan.recv_ for details).
cdef object rx = None
if _rx != NULL:
rx = <object>_rx
Py_DECREF(rx)
if casev[selected].rxok != NULL:
return selected, (rx, rxok)
else:
return selected, rx
# ---- init libgolang runtime ---
......@@ -668,9 +386,30 @@ cdef void _init_libgolang() except*:
# ---- misc ----
cdef extern from "golang/libgolang.h" namespace "golang" nogil:
int _chanrefcnt(_chan *ch)
int _chanselect(_selcase *casev, int casec)
void _taskgo(void (*f)(void *), void *arg)
cdef nogil:
chan[pPyObject] makechan_pyobj_pyexc(unsigned size) except +topyexc:
return makechan[pPyObject](size)
void chansend_pyexc(chan[pPyObject] ch, PyObject *_tx) except +topyexc:
ch.send(_tx)
(PyObject*, bint) chanrecv__pyexc(chan[pPyObject] ch) except +topyexc:
_ = ch.recv_()
return (_.first, _.second) # TODO teach Cython to coerce pair[X,Y] -> (X,Y)
void chanclose_pyexc(chan[pPyObject] ch) except +topyexc:
ch.close()
unsigned chanlen_pyexc(chan[pPyObject] ch) except +topyexc:
return ch.len()
int _chanselect_pyexc(const _selcase *casev, int casec) except +topyexc:
return _chanselect(casev, casec)
void _taskgo_pyexc(void (*f)(void *) nogil, void *arg) except +topyexc:
_taskgo(f, arg)
......@@ -26,21 +26,30 @@
from __future__ import print_function, absolute_import
from golang cimport go, pychan, panic, pypanic, topyexc
from golang import nilchan
from golang import _golang
from golang cimport go, chan, _chan, makechan, pychan, nil, select, \
default, structZ, panic, pypanic, topyexc, cbool
from golang import time
cdef extern from "golang/libgolang.h" namespace "golang" nogil:
int _tchanrecvqlen(_chan *ch)
int _tchansendqlen(_chan *ch)
void (*_tblockforever)()
# pylen_{recv,send}q returns len(pych._{recv,send}q)
# pylen_{recv,send}q returns len(_chan._{recv,send}q)
def pylen_recvq(pychan pych not None): # -> int
if pych is nilchan:
if pych.ch == nil:
raise AssertionError('len(.recvq) on nil channel')
return len(pych._recvq)
return _tchanrecvqlen(pych.ch._rawchan())
def pylen_sendq(pychan pych not None): # -> int
if pych is nilchan:
if pych.ch == nil:
raise AssertionError('len(.sendq) on nil channel')
return len(pych._sendq)
return _tchansendqlen(pych.ch._rawchan())
# runtime/libgolang_test.cpp
cdef extern from *:
"""
extern void waitBlocked(golang::_chan *ch, bool rx, bool tx);
"""
void waitBlocked(_chan *, bint rx, bint tx) nogil except +topyexc
# pywaitBlocked waits till a receive or send pychan operation blocks waiting on the channel.
#
......@@ -49,7 +58,8 @@ def pywaitBlocked(pychanop):
if pychanop.__self__.__class__ is not pychan:
pypanic("wait blocked: %r is method of a non-chan: %r" % (pychanop, pychanop.__self__.__class__))
cdef pychan pych = pychanop.__self__
recv = send = False
cdef bint recv = False
cdef bint send = False
if pychanop.__name__ == "recv": # XXX better check PyCFunction directly
recv = True
elif pychanop.__name__ == "send": # XXX better check PyCFunction directly
......@@ -57,41 +67,84 @@ def pywaitBlocked(pychanop):
else:
pypanic("wait blocked: unexpected chan method: %r" % (pychanop,))
t0 = time.now()
while 1:
with pych._mu:
if recv and pylen_recvq(pych) > 0:
return
if send and pylen_sendq(pych) > 0:
return
now = time.now()
if now-t0 > 10: # waited > 10 seconds - likely deadlock
pypanic("deadlock")
time.sleep(0) # yield to another thread / coroutine
with nogil:
waitBlocked(pych.ch._rawchan(), recv, send)
# `with pypanicWhenBlocked` hooks into _golang._blockforever to raise panic with
# `with pypanicWhenBlocked` hooks into libgolang _blockforever to raise panic with
# "t: blocks forever" instead of blocking.
cdef class pypanicWhenBlocked:
def __enter__(t):
assert _golang._tblockforever is None
_golang._tblockforever = _panicblocked
global _tblockforever
_tblockforever = _panicblocked
return t
def __exit__(t, typ, val, tb):
_golang._tblockforever = None
def _panicblocked():
pypanic("t: blocks forever")
_tblockforever = NULL
cdef void _panicblocked() nogil:
panic("t: blocks forever")
# small test to verify pyx(nogil) channels.
ctypedef struct Point:
int x
int y
# TODO kill this and teach Cython to coerce pair[X,Y] -> (X,Y)
cdef (int, cbool) recv_(chan[int] ch) nogil:
_ = ch.recv_()
return (_.first, _.second)
cdef void _test_chan_nogil() nogil except +topyexc:
cdef chan[structZ] done = makechan[structZ]()
cdef chan[int] chi = makechan[int](1)
cdef chan[Point] chp = makechan[Point]()
chp = nil # reset to nil
cdef int i, j
cdef Point p
cdef cbool jok
i=+1; chi.send(i)
j=-1; j = chi.recv()
if not (j == i):
panic("send -> recv != I")
i = 2
_=select([
done.recvs(), # 0
chi.sends(&i), # 1
chp.recvs(&p), # 2
chi.recvs(&j, &jok), # 3
default, # 4
])
if _ != 1:
panic("select: selected !1")
j, jok = recv_(chi)
if not (j == 2 and jok == True):
panic("recv_ != (2, true)")
chi.close()
j, jok = recv_(chi)
if not (j == 0 and jok == False):
panic("recv_ from closed != (0, false)")
def test_chan_nogil():
with nogil:
_test_chan_nogil()
# small test to verify pyx(nogil) go.
cdef void _test_go_nogil() nogil except +topyexc:
go(_work, 111)
# TODO wait till _work is done
cdef void _work(int i) nogil:
cdef chan[structZ] done = makechan[structZ]()
go(_work, 111, done)
done.recv()
cdef void _work(int i, chan[structZ] done) nogil:
if i != 111:
panic("_work: i != 111")
done.close()
def test_go_nogil():
with nogil:
......@@ -101,9 +154,14 @@ def test_go_nogil():
# runtime/libgolang_test_c.c
cdef extern from * nogil:
"""
extern "C" void _test_chan_c();
extern "C" void _test_go_c();
"""
void _test_chan_c() except +topyexc
void _test_go_c() except +topyexc
def test_chan_c():
with nogil:
_test_chan_c()
def test_go_c():
with nogil:
_test_go_c()
......@@ -111,9 +169,24 @@ def test_go_c():
# runtime/libgolang_test.cpp
cdef extern from * nogil:
"""
extern void _test_chan_cpp_refcount();
extern void _test_chan_cpp();
extern void _test_chan_vs_stackdeadwhileparked();
extern void _test_go_cpp();
"""
void _test_chan_cpp_refcount() except +topyexc
void _test_chan_cpp() except +topyexc
void _test_chan_vs_stackdeadwhileparked() except +topyexc
void _test_go_cpp() except +topyexc
def test_chan_cpp_refcount():
with nogil:
_test_chan_cpp_refcount()
def test_chan_cpp():
with nogil:
_test_chan_cpp()
def test_chan_vs_stackdeadwhileparked():
with nogil:
_test_chan_vs_stackdeadwhileparked()
def test_go_cpp():
with nogil:
_test_go_cpp()
......@@ -159,6 +159,11 @@ def test_chan():
assert w2() is not None
ch = None
gc.collect()
# pypy needs another GC run: pychan does Py_DECREF on buffered objects, but
# on pypy cpyext objects are not deallocated from Py_DECREF even if
# ob_refcnt goes to zero - the deallocation is delayed until GC run.
# see also: http://doc.pypy.org/en/latest/discussion/rawrefcount.html
gc.collect()
assert w1() is None
assert w2() is None
......
......@@ -22,9 +22,10 @@
// Library Libgolang provides Go-like features for C and C++.
//
// Library Libgolang provides goroutines and other
// Library Libgolang provides goroutines, channels with Go semantic and other
// accompanying features. The library consists of high-level type-safe C++ API,
// and low-level unsafe C API.
// and low-level unsafe C API. The low-level C API was inspired by Libtask[1]
// and Plan9/Libthread[2].
//
// The primary motivation for Libgolang is to serve as runtime for golang.pyx -
// - Cython part of Pygolang project. However Libgolang is independent of
......@@ -35,12 +36,29 @@
// C++-level API
//
// - `go` spawns new task.
// - `chan<T>`, and `select` provide channels with Go semantic and automatic
// lifetime management.
// - `sleep` pauses current task.
// - `panic` throws exception that represent C-level panic.
//
// For example:
//
// go(worker, 1); // spawn worker(int)
// chan<int> ch = makechan<int>(); // create new channel
// go(worker, ch, 1); // spawn worker(chan<int>, int)
// ch.send(1)
// j = ch.recv()
//
// _ = select({
// _default, // 0
// ch.sends(&i), // 1
// ch.recvs(&j), // 2
// });
// if (_ == 0)
// // default case selected
// if (_ == 1)
// // case 1 selected: i sent to ch
// if (_ == 2)
// // case 2 selected: j received from ch
//
// if (<bug condition>)
// panic("bug");
......@@ -49,6 +67,10 @@
// C-level API
//
// - `_taskgo` spawns new task.
// - `_makechan` creates raw channel with Go semantic.
// - `_chanxincref` and `_chanxdecref` manage channel lifetime.
// - `_chansend` and `_chanrecv` send/receive over raw channel.
// - `_chanselect`, `_selsend`, `_selrecv`, ... provide raw select functionality.
// - `tasknanosleep` pauses current task.
//
//
......@@ -65,6 +87,10 @@
//
// Once again, Libgolang itself is independent from Python, and other runtimes
// are possible.
//
//
// [1] Libtask: a Coroutine Library for C and Unix. https://swtch.com/libtask.
// [2] http://9p.io/magic/man2html/2/thread.
#include <stdbool.h>
#include <stddef.h>
......@@ -104,10 +130,88 @@ LIBGOLANG_API void _taskgo(void (*f)(void *arg), void *arg);
LIBGOLANG_API void _tasknanosleep(uint64_t dt);
LIBGOLANG_API uint64_t _nanotime(void);
typedef struct _chan _chan;
LIBGOLANG_API _chan *_makechan(unsigned elemsize, unsigned size);
LIBGOLANG_API void _chanxincref(_chan *ch);
LIBGOLANG_API void _chanxdecref(_chan *ch);
LIBGOLANG_API int _chanrefcnt(_chan *ch);
LIBGOLANG_API void _chansend(_chan *ch, const void *ptx);
LIBGOLANG_API void _chanrecv(_chan *ch, void *prx);
LIBGOLANG_API bool _chanrecv_(_chan *ch, void *prx);
LIBGOLANG_API void _chanclose(_chan *ch);
LIBGOLANG_API unsigned _chanlen(_chan *ch);
LIBGOLANG_API unsigned _chancap(_chan *ch);
enum _chanop {
_CHANSEND = 0,
_CHANRECV = 1,
_DEFAULT = 2,
};
// _selcase represents one select case.
typedef struct _selcase {
_chan *ch; // channel
enum _chanop op; // chansend/chanrecv/default
void *data; // chansend: ptx; chanrecv: prx
bool *rxok; // chanrecv: where to save ok if !NULL; otherwise not used
} _selcase;
LIBGOLANG_API int _chanselect(const _selcase *casev, int casec);
// _selsend creates `_chansend(ch, ptx)` case for _chanselect.
static inline
_selcase _selsend(_chan *ch, const void *ptx) {
_selcase _ = {
.ch = ch,
.op = _CHANSEND,
.data = (void *)ptx,
.rxok = NULL,
};
return _;
}
// _selrecv creates `_chanrecv(ch, prx)` case for _chanselect.
static inline
_selcase _selrecv(_chan *ch, void *prx) {
_selcase _ = {
.ch = ch,
.op = _CHANRECV,
.data = prx,
.rxok = NULL,
};
return _;
}
// _selrecv_ creates `*pok = _chanrecv_(ch, prx)` case for _chanselect.
static inline
_selcase _selrecv_(_chan *ch, void *prx, bool *pok) {
_selcase _ = {
.ch = ch,
.op = _CHANRECV,
.data = prx,
.rxok = pok,
};
return _;
}
// _default represents default case for _chanselect.
extern LIBGOLANG_API const _selcase _default;
// libgolang runtime - the runtime must be initialized before any other libgolang use.
typedef struct _libgolang_sema _libgolang_sema;
typedef enum _libgolang_runtime_flags {
// STACK_DEAD_WHILE_PARKED indicates that it is not safe to access
// goroutine's stack memory while the goroutine is parked.
//
// for example gevent/greenlet/stackless use it because they copy g's stack
// to heap on park and back on unpark. This way if objects on g's stack
// were accessed while g was parked it would be memory of another g's stack.
STACK_DEAD_WHILE_PARKED = 1,
} _libgolang_runtime_flags;
typedef struct _libgolang_runtime_ops {
_libgolang_runtime_flags flags;
// go should spawn a task (coroutine/thread/...).
void (*go)(void (*f)(void *), void *arg);
......@@ -136,6 +240,11 @@ typedef struct _libgolang_runtime_ops {
LIBGOLANG_API void _libgolang_init(const _libgolang_runtime_ops *runtime_ops);
// for testing
LIBGOLANG_API int _tchanrecvqlen(_chan *ch);
LIBGOLANG_API int _tchansendqlen(_chan *ch);
LIBGOLANG_API extern void (*_tblockforever)(void);
#ifdef __cplusplus
}}
#endif
......@@ -145,8 +254,12 @@ LIBGOLANG_API void _libgolang_init(const _libgolang_runtime_ops *runtime_ops);
#ifdef __cplusplus
#include <exception>
#include <functional>
#include <initializer_list>
#include <memory>
#include <type_traits>
#include <utility>
namespace golang {
......@@ -162,6 +275,122 @@ static inline void go(F /*std::function<void(Argv...)>*/ f, Argv... argv) {
}, frun);
}
template<typename T> class chan;
template<typename T> static chan<T> makechan(unsigned size=0);
// chan<T> provides type-safe wrapper over _chan.
//
// chan<T> is automatically reference-counted and is safe to use from multiple
// goroutines simultaneously.
template<typename T>
class chan {
_chan *_ch;
public:
inline chan() { _ch = NULL; } // nil channel if not explicitly initialized
friend chan<T> makechan<T>(unsigned size);
inline ~chan() { _chanxdecref(_ch); _ch = NULL; }
// = nil
inline chan(nullptr_t) { _ch = NULL; }
inline chan& operator=(nullptr_t) { _chanxdecref(_ch); _ch = NULL; return *this; }
// copy
inline chan(const chan& from) { _ch = from._ch; _chanxincref(_ch); }
inline chan& operator=(const chan& from) {
if (this != &from) {
_chanxdecref(_ch); _ch = from._ch; _chanxincref(_ch);
}
return *this;
}
// move
inline chan(chan&& from) { _ch = from._ch; from._ch = NULL; }
inline chan& operator=(chan&& from) {
if (this != &from) {
_chanxdecref(_ch); _ch = from._ch; from._ch = NULL;
}
return *this;
}
// _chan does plain memcpy to copy elements.
// TODO allow all types (e.g. element=chan )
static_assert(std::is_trivially_copyable<T>::value, "TODO chan<T>: T copy is not trivial");
// send/recv/close
inline void send(const T &ptx) { _chansend(_ch, &ptx); }
inline T recv() { T rx; _chanrecv(_ch, &rx); return rx; }
inline std::pair<T,bool> recv_() { T rx; bool ok = _chanrecv_(_ch, &rx);
return std::make_pair(rx, ok); }
inline void close() { _chanclose(_ch); }
// send/recv in select
// ch.sends creates `ch.send(*ptx)` case for select.
[[nodiscard]] inline _selcase sends(const T *ptx) { return _selsend(_ch, ptx); }
// ch.recvs creates `*prx = ch.recv()` case for select.
//
// if pok is provided the case is extended to `[*prx, *pok] = ch.recv_()`
// if both prx and pok are omitted the case is reduced to `ch.recv()`.
[[nodiscard]] inline _selcase recvs(T *prx=NULL, bool *pok=NULL) {
return _selrecv_(_ch, prx, pok);
}
// length/capacity
inline unsigned len() { return _chanlen(_ch); }
inline unsigned cap() { return _chancap(_ch); }
// compare wrt nil
inline bool operator==(nullptr_t) { return (_ch == NULL); }
inline bool operator!=(nullptr_t) { return (_ch != NULL); }
// compare wrt chan
inline bool operator==(const chan<T>& ch2) { return (_ch == ch2._ch); }
inline bool operator!=(const chan<T>& ch2) { return (_ch != ch2._ch); }
// for testing
inline _chan *_rawchan() { return _ch; }
};
// makechan<T> makes new chan<T> with capacity=size.
template<typename T> static inline
chan<T> makechan(unsigned size) {
chan<T> ch;
unsigned elemsize = std::is_empty<T>::value
? 0 // eg struct{} for which sizeof() gives 1 - *not* 0
: sizeof(T);
ch._ch = _makechan(elemsize, size);
if (ch._ch == NULL)
throw std::bad_alloc();
return ch;
}
// structZ is struct{}.
//
// it's a workaround for e.g. makechan<struct{}> giving
// "error: types may not be defined in template arguments".
struct structZ{};
// select, together with chan<T>.sends and chan<T>.recvs, provide type-safe
// wrappers over _chanselect and _selsend/_selrecv/_selrecv_.
//
// Usage example:
//
// _ = select({
// ch1.recvs(&v), // 0
// ch2.recvs(&v, &ok), // 1
// ch2.sends(&v), // 2
// _default, // 3
// })
static inline // select({case1, case2, case3})
int select(const std::initializer_list<const _selcase> &casev) {
return _chanselect(casev.begin(), casev.size());
}
template<size_t N> static inline // select(casev_array)
int select(const _selcase (&casev)[N]) {
return _chanselect(&casev[0], N);
}
namespace time {
......
......@@ -23,18 +23,26 @@
# Small program that uses a bit of golang.pyx nogil features, mainly to verify
# that external project can build against golang in pyx mode.
from golang cimport go, topyexc
from golang cimport go, chan, makechan, topyexc
from libc.stdio cimport printf
cdef nogil:
void worker(int i, int j):
pass
void worker(chan[int] ch, int i, int j):
ch.send(i*j)
void _main() except +topyexc:
cdef chan[int] ch = makechan[int]()
cdef int i
for i in range(3):
go(worker, i, 4)
go(worker, ch, i, 4)
for i in range(3):
ch.recv()
ch.close()
#_, ok = ch.recv_() # TODO teach Cython to coerce pair[X,Y] -> (X,Y)
ch.recv_()
printf("test.pyx: OK\n")
......
......@@ -23,8 +23,12 @@ from libc.stdint cimport uint64_t
cdef extern from "golang/libgolang.h" nogil:
struct _libgolang_sema
enum _libgolang_runtime_flags:
STACK_DEAD_WHILE_PARKED
struct _libgolang_runtime_ops:
_libgolang_runtime_flags flags
void (*go)(void (*f)(void *) nogil, void *arg);
_libgolang_sema* (*sema_alloc) ()
......
......@@ -42,7 +42,7 @@ from cpython cimport Py_INCREF, Py_DECREF
from cython cimport final
from golang.runtime._libgolang cimport _libgolang_runtime_ops, _libgolang_sema, \
panic
STACK_DEAD_WHILE_PARKED, panic
from golang.runtime cimport _runtime_thread
......@@ -125,6 +125,10 @@ cdef nogil:
# XXX const
_libgolang_runtime_ops gevent_ops = _libgolang_runtime_ops(
# when greenlet is switched to another, its stack is copied to
# heap, and stack of switched-to greenlet is copied back to C stack.
flags = STACK_DEAD_WHILE_PARKED,
go = go,
sema_alloc = sema_alloc,
sema_free = sema_free,
......
......@@ -80,7 +80,7 @@ cdef extern from "pythread.h" nogil:
void PyThread_free_lock(PyThread_type_lock)
from golang.runtime._libgolang cimport _libgolang_runtime_ops, _libgolang_sema, \
panic
_libgolang_runtime_flags, panic
from libc.stdint cimport uint64_t, UINT64_MAX
IF POSIX:
......@@ -168,6 +168,7 @@ cdef nogil:
# XXX const
_libgolang_runtime_ops thread_ops = _libgolang_runtime_ops(
flags = <_libgolang_runtime_flags>0,
go = go,
sema_alloc = sema_alloc,
sema_free = sema_free,
......
......@@ -20,18 +20,26 @@
// Library Libgolang provides Go-like features for C and C++.
// See libgolang.h for library overview.
// Pygolang C part: provides runtime implementation of panic, etc.
// Pygolang C part: provides runtime implementation of panic, channels, etc.
//
// C++ (not C) is used:
// - to implement C-level panic (via C++ exceptions).
// - to provide chan<T> template that can be used as chan[T] in Cython.
// - because Cython (currently ?) does not allow to add methods to `cdef struct`.
#include "golang/libgolang.h"
#include <algorithm>
#include <atomic>
#include <exception>
#include <functional>
#include <limits>
#include <memory>
#include <mutex> // lock_guard
#include <random>
#include <string>
#include <stdlib.h>
#include <string.h>
// linux/list.h needs ARRAY_SIZE XXX -> better use c.h or ccan/array_size.h ?
......@@ -40,9 +48,14 @@
#endif
#include <linux/list.h>
using std::atomic;
using std::bad_alloc;
using std::exception;
using std::max;
using std::numeric_limits;
using std::string;
using std::unique_ptr;
using std::vector;
namespace golang {
......@@ -168,6 +181,949 @@ private:
// with_lock mimics `with mu` from python.
#define with_lock(mu) std::lock_guard<Mutex> _with_lock_ ## __COUNTER__ (mu)
// defer(f) mimics defer from golang.
// XXX f is called at end of current scope, not function.
#define defer(f) _deferred _defer_ ## __COUNTER__ (f)
struct _deferred {
typedef std::function<void(void)> F;
F f;
_deferred(F f) : f(f) {}
~_deferred() { f(); }
private:
_deferred(const _deferred&); // don't copy
};
// ---- channels -----
struct _WaitGroup;
struct _RecvSendWaiting;
// _chan is a raw channel with Go semantic.
//
// Over raw channel the data is sent/received via elemsize'ed memcpy of void*
// and the caller must make sure to pass correct arguments.
//
// See chan<T> for type-safe wrapper.
//
// _chan is not related to Python runtime and works without GIL if libgolang
// runtime works without GIL(*).
//
// (*) for example "thread" runtime works without GIL, while "gevent" runtime
// acquires GIL on every semaphore acquire.
struct _chan {
atomic<int> _refcnt; // reference counter for _chan object
unsigned _cap; // channel capacity (in elements)
unsigned _elemsize; // size of element
Mutex _mu;
list_head _recvq; // blocked receivers (_ -> _RecvSendWaiting.in_rxtxq)
list_head _sendq; // blocked senders (_ -> _RecvSendWaiting.in_rxtxq)
bool _closed;
// data queue (circular buffer) goes past _chan memory and occupies [_cap*_elemsize] bytes.
unsigned _dataq_n; // total number of entries in dataq
unsigned _dataq_r; // index for next read (in elements; can be used only if _dataq_n > 0)
unsigned _dataq_w; // index for next write (in elements; can be used only if _dataq_n < _cap)
void incref();
void decref();
int refcnt();
void send(const void *ptx);
bool recv_(void *prx);
void recv(void *prx);
bool _trysend(const void *tx);
bool _tryrecv(void *prx, bool *pok);
void close();
unsigned len();
unsigned cap();
void _dataq_append(const void *ptx);
void _dataq_popleft(void *prx);
private:
_chan(const _chan&); // don't copy
template<bool onstack> void _send2 (const void *);
void __send2 (const void *, _WaitGroup*, _RecvSendWaiting*);
template<bool onstack> bool _recv2_(void *);
bool __recv2_(void *, _WaitGroup*, _RecvSendWaiting*);
};
// _RecvSendWaiting represents a receiver/sender waiting on a chan.
struct _RecvSendWaiting {
_WaitGroup *group; // group of waiters this receiver/sender is part of
_chan *chan; // channel receiver/sender is waiting on
list_head in_rxtxq; // in recv or send queue of the channel (_chan._recvq|_sendq -> _)
// recv: on wakeup: sender|closer -> receiver; NULL means "don't copy received value"
// send: ptr-to data to send
void *pdata;
// on wakeup: whether recv/send succeeded (send fails on close)
bool ok;
// this wait is used in under select as case #sel_n
int sel_n;
_RecvSendWaiting();
void init(_WaitGroup *group, _chan *ch);
void wakeup(bool ok);
private:
_RecvSendWaiting(const _RecvSendWaiting&); // don't copy
};
// _WaitGroup is a group of waiting senders and receivers.
//
// Only 1 waiter from the group can succeed waiting.
struct _WaitGroup {
Sema _sema; // used for wakeup
Mutex _mu; // lock NOTE ∀ chan order is always: chan._mu > ._mu
// on wakeup: sender|receiver -> group:
// .which _{Send|Recv}Waiting instance which succeeded waiting.
const _RecvSendWaiting *which;
_WaitGroup();
bool try_to_win(_RecvSendWaiting *waiter);
void wait();
void wakeup();
private:
_WaitGroup(const _WaitGroup&); // don't copy
};
// Default _RecvSendWaiting ctor creates zero-value _RecvSendWaiting.
// zero value _RecvSendWaiting is invalid and must be initialized via .init before use.
_RecvSendWaiting::_RecvSendWaiting() {
_RecvSendWaiting *w = this;
bzero((void *)w, sizeof(*w));
}
// init initializes waiter to be part of group waiting on ch.
void _RecvSendWaiting::init(_WaitGroup *group, _chan *ch) {
_RecvSendWaiting *w = this;
if (w->group != NULL)
bug("_RecvSendWaiting: double init");
w->group = group;
w->chan = ch;
INIT_LIST_HEAD(&w->in_rxtxq);
w->pdata = NULL;
w->ok = false;
w->sel_n = -1;
}
// wakeup notifies waiting receiver/sender that corresponding operation completed.
void _RecvSendWaiting::wakeup(bool ok) {
_RecvSendWaiting *w = this;
w->ok = ok;
w->group->wakeup();
}
_WaitGroup::_WaitGroup() {
_WaitGroup *group = this;
group->_sema.acquire();
group->which = NULL;
}
// try_to_win tries to win waiter after it was dequeued from a channel's {_send|_recv}q.
//
// -> won: true if won, false - if not.
bool _WaitGroup::try_to_win(_RecvSendWaiting *waiter) { // -> won
_WaitGroup *group = this;
bool won;
group->_mu.lock();
if (group->which != NULL) {
won = false;
}
else {
group->which = waiter;
won = true;
}
group->_mu.unlock();
return won;
}
// wait waits for winning case of group to complete.
void _WaitGroup::wait() {
_WaitGroup *group = this;
group->_sema.acquire();
}
// wakeup wakes up the group.
//
// prior to wakeup try_to_win must have been called.
// in practice this means that waiters queued to chan.{_send|_recv}q must
// be dequeued with _dequeWaiter.
void _WaitGroup::wakeup() {
_WaitGroup *group = this;
if (group->which == NULL)
bug("wakeup: group.which=nil");
group->_sema.release();
}
// _dequeWaiter dequeues a send or recv waiter from a channel's _recvq or _sendq.
//
// the channel owning {_recv|_send}q must be locked.
_RecvSendWaiting *_dequeWaiter(list_head *queue) {
while (!list_empty(queue)) {
_RecvSendWaiting *w = list_entry(queue->next, _RecvSendWaiting, in_rxtxq);
list_del_init(&w->in_rxtxq); // _init is important as we can try to remove the
// waiter the second time in select.
// if this waiter can win its group - return it.
// if not - someone else from its group already has won, and so we anyway have
// to remove the waiter from the queue.
if (w->group->try_to_win(w)) {
return w;
}
}
return NULL;
}
// _makechan creates new _chan(elemsize, size).
//
// returned channel has refcnt=1.
_chan *_makechan(unsigned elemsize, unsigned size) {
_chan *ch;
ch = (_chan *)malloc(sizeof(_chan) + size*elemsize);
if (ch == NULL)
return NULL;
memset((void *)ch, 0, sizeof(*ch));
new (&ch->_mu) Sema();
ch->_refcnt = 1;
ch->_cap = size;
ch->_elemsize = elemsize;
ch->_closed = false;
INIT_LIST_HEAD(&ch->_recvq);
INIT_LIST_HEAD(&ch->_sendq);
return ch;
}
// _chanxincref increments reference counter of the channel.
//
// it is noop if ch=nil.
void _chanxincref(_chan *ch) {
if (ch == NULL)
return;
ch->incref();
}
void _chan::incref() {
_chan *ch = this;
int refcnt_was = ch->_refcnt.fetch_add(+1);
if (refcnt_was < 1)
panic("chan: incref: refcnt was < 1");
}
// _chanxdecref decrements reference counter of the channel.
//
// if refcnt goes to zero, the channel is deallocated.
// it is noop if ch=nil.
void _chanxdecref(_chan *ch) {
if (ch == NULL)
return;
ch->decref();
}
void _chan::decref() {
_chan *ch = this;
int refcnt_was = ch->_refcnt.fetch_add(-1);
if (refcnt_was < 1)
panic("chan: decref: refcnt was < 1");
if (refcnt_was != 1)
return;
// refcnt=0 -> free the channel
ch->_mu.~Mutex();
memset((void *)ch, 0, sizeof(*ch) + ch->_cap*ch->_elemsize);
free(ch);
}
// _chanrefcnt returns current reference counter of the channel.
//
// NOTE if returned refcnt is > 1, the caller, due to concurrent execution of
// other goroutines, cannot generally assume that the reference counter won't change.
int _chanrefcnt(_chan *ch) {
return ch->refcnt();
}
int _chan::refcnt() {
_chan *ch = this;
return ch->_refcnt;
}
void _blockforever();
// send sends data to a receiver.
//
// sizeof(*ptx) must be ch._elemsize.
void _chansend(_chan *ch, const void *ptx) {
if (ch == NULL) // NOTE: cannot do this check in _chan::send
_blockforever(); // (C++ assumes `this` is never NULL and optimizes it out)
ch->send(ptx);
}
template<> void _chan::_send2</*onstack=*/true> (const void *ptx);
template<> void _chan::_send2</*onstack=*/false>(const void *ptx);
void _chan::send(const void *ptx) {
_chan *ch = this;
ch->_mu.lock();
bool done = ch->_trysend(ptx);
if (done)
return;
(_runtime->flags & STACK_DEAD_WHILE_PARKED) \
? ch->_send2</*onstack=*/false>(ptx)
: ch->_send2</*onstack=*/true >(ptx);
}
template<> void _chan::_send2</*onstack=*/true> (const void *ptx) {
_WaitGroup g;
_RecvSendWaiting me;
__send2(ptx, &g, &me);
}
template<> void _chan::_send2</*onstack=*/false>(const void *ptx) { _chan *ch = this;
unique_ptr<_WaitGroup> g (new _WaitGroup);
unique_ptr<_RecvSendWaiting> me (new _RecvSendWaiting);
// ptx stack -> heap (if ptx is on stack) TODO avoid copy if ptx is !onstack
void *ptx_onheap = malloc(ch->_elemsize);
if (ptx_onheap == NULL) {
ch->_mu.unlock();
throw bad_alloc();
}
memcpy(ptx_onheap, ptx, ch->_elemsize);
defer([&]() {
free(ptx_onheap);
});
__send2(ptx_onheap, g.get(), me.get());
}
void _chan::__send2(const void *ptx, _WaitGroup *g, _RecvSendWaiting *me) { _chan *ch = this;
me->init(g, ch);
me->pdata = (void *)ptx; // we add it to _sendq; the memory will be only read
me->ok = false;
list_add_tail(&me->in_rxtxq, &ch->_sendq);
ch->_mu.unlock();
g->wait();
if (g->which != me)
bug("chansend: g.which != me");
if (!me->ok)
panic("send on closed channel");
}
// recv_ is "comma-ok" version of recv.
//
// ok is true - if receive was delivered by a successful send.
// ok is false - if receive is due to channel being closed and empty.
//
// sizeof(*prx) must be ch._elemsize | prx=NULL.
bool _chanrecv_(_chan *ch, void *prx) {
if (ch == NULL)
_blockforever();
return ch->recv_(prx);
}
template<> bool _chan::_recv2_</*onstack=*/true> (void *prx);
template<> bool _chan::_recv2_</*onstack=*/false>(void *prx);
bool _chan::recv_(void *prx) { // -> ok
_chan *ch = this;
ch->_mu.lock();
bool ok, done = ch->_tryrecv(prx, &ok);
if (done)
return ok;
return (_runtime->flags & STACK_DEAD_WHILE_PARKED) \
? ch->_recv2_</*onstack=*/false>(prx)
: ch->_recv2_</*onstack=*/true> (prx);
}
template<> bool _chan::_recv2_</*onstack=*/true> (void *prx) {
_WaitGroup g;
_RecvSendWaiting me;
return __recv2_(prx, &g, &me);
}
template<> bool _chan::_recv2_</*onstack=*/false>(void *prx) { _chan *ch = this;
unique_ptr<_WaitGroup> g (new _WaitGroup);
unique_ptr<_RecvSendWaiting> me (new _RecvSendWaiting);
if (prx == NULL)
return __recv2_(prx, g.get(), me.get());
// prx stack -> onheap + copy back (if prx is on stack) TODO avoid copy if prx is !onstack
void *prx_onheap = malloc(ch->_elemsize);
if (prx_onheap == NULL) {
ch->_mu.unlock();
throw bad_alloc();
}
defer([&]() {
free(prx_onheap);
});
bool ok = __recv2_(prx_onheap, g.get(), me.get());
memcpy(prx, prx_onheap, ch->_elemsize);
return ok;
}
bool _chan::__recv2_(void *prx, _WaitGroup *g, _RecvSendWaiting *me) { _chan *ch = this;
me->init(g, ch);
me->pdata = prx;
me->ok = false;
list_add_tail(&me->in_rxtxq, &ch->_recvq);
ch->_mu.unlock();
g->wait();
if (g->which != me)
bug("chanrecv: g.which != me");
return me->ok;
}
// recv receives from the channel.
//
// if prx != NULL received value is put into *prx.
void _chanrecv(_chan *ch, void *prx) {
if (ch == NULL)
_blockforever();
ch->recv(prx);
}
void _chan::recv(void *prx) {
_chan *ch = this;
(void)ch->recv_(prx);
return;
}
// _trysend(ch, *ptx) -> done
//
// must be called with ._mu held.
// if done or panic - returns with ._mu released.
// if !done - returns with ._mu still being held.
bool _chan::_trysend(const void *ptx) { // -> done
_chan *ch = this;
if (ch->_closed) {
ch->_mu.unlock();
panic("send on closed channel");
}
// synchronous channel
if (ch->_cap == 0) {
_RecvSendWaiting *recv = _dequeWaiter(&ch->_recvq);
if (recv == NULL)
return false;
ch->_mu.unlock();
if (recv->pdata != NULL)
memcpy(recv->pdata, ptx, ch->_elemsize);
recv->wakeup(/*ok=*/true);
return true;
}
// buffered channel
else {
if (ch->_dataq_n >= ch->_cap)
return false;
ch->_dataq_append(ptx);
_RecvSendWaiting *recv = _dequeWaiter(&ch->_recvq);
if (recv != NULL) {
ch->_dataq_popleft(recv->pdata);
ch->_mu.unlock();
recv->wakeup(/*ok=*/true);
} else {
ch->_mu.unlock();
}
return true;
}
}
// _tryrecv() -> (*prx, *pok), done
//
// must be called with ._mu held.
// if done or panic - returns with ._mu released.
// if !done - returns with ._mu still being held.
//
// if !done - (*prx, *pok) are left unmodified.
// if prx=NULL received value is not copied into *prx.
bool _chan::_tryrecv(void *prx, bool *pok) { // -> done
_chan *ch = this;
// buffered
if (ch->_dataq_n > 0) {
ch->_dataq_popleft(prx);
*pok = true;
// wakeup a blocked writer, if there is any
_RecvSendWaiting *send = _dequeWaiter(&ch->_sendq);
if (send != NULL) {
ch->_dataq_append(send->pdata);
ch->_mu.unlock();
send->wakeup(/*ok=*/true);
} else {
ch->_mu.unlock();
}
return true;
}
// closed
if (ch->_closed) {
ch->_mu.unlock();
if (prx != NULL)
memset(prx, 0, ch->_elemsize);
*pok = false;
return true;
}
// sync | empty: there is waiting writer
_RecvSendWaiting *send = _dequeWaiter(&ch->_sendq);
if (send == NULL)
return false;
ch->_mu.unlock();
if (prx != NULL)
memcpy(prx, send->pdata, ch->_elemsize);
*pok = true;
send->wakeup(/*ok=*/true);
return true;
}
// close closes sending side of the channel.
void _chanclose(_chan *ch) {
if (ch == NULL)
panic("close of nil channel");
ch->close();
}
void _chan::close() {
_chan *ch = this;
ch->_mu.lock();
if (ch->_closed) {
ch->_mu.unlock();
panic("close of closed channel");
}
ch->_closed = true;
// wake-up all readers
while (1) {
_RecvSendWaiting *recv = _dequeWaiter(&ch->_recvq);
if (recv == NULL)
break;
ch->_mu.unlock();
if (recv->pdata != NULL)
memset(recv->pdata, 0, ch->_elemsize);
recv->wakeup(/*ok=*/false);
ch->_mu.lock();
}
// wake-up all writers (they will panic)
while (1) {
_RecvSendWaiting *send = _dequeWaiter(&ch->_sendq);
if (send == NULL)
break;
ch->_mu.unlock();
send->wakeup(/*ok=*/false);
ch->_mu.lock();
}
ch->_mu.unlock();
}
// len returns current number of buffered elements.
unsigned _chanlen(_chan *ch) {
if (ch == NULL)
return 0; // len(nil) = 0
return ch->len();
}
unsigned _chan::len() {
_chan *ch = this;
ch->_mu.lock(); // only to make valgrind happy
unsigned len = ch->_dataq_n;
ch->_mu.unlock();
return len;
}
// cap returns channel capacity.
unsigned _chancap(_chan *ch) {
if (ch == NULL)
return 0; // cap(nil) = 0
return ch->cap();
}
unsigned _chan::cap() {
_chan *ch = this;
return ch->_cap;
}
// _dataq_append appends next element to ch._dataq.
// called with ch._mu locked.
void _chan::_dataq_append(const void *ptx) {
_chan *ch = this;
if (ch->_dataq_n >= ch->_cap)
bug("chan: dataq.append on full dataq");
if (ch->_dataq_w >= ch->_cap)
bug("chan: dataq.append: w >= cap");
memcpy(&((char *)(ch+1))[ch->_dataq_w * ch->_elemsize], ptx, ch->_elemsize);
ch->_dataq_w++; ch->_dataq_w %= ch->_cap;
ch->_dataq_n++;
}
// _dataq_popleft pops oldest element from ch._dataq into *prx.
// called with ch._mu locked.
// if prx=NULL the element is popped, but not copied anywhere.
void _chan::_dataq_popleft(void *prx) {
_chan *ch = this;
if (ch->_dataq_n == 0)
bug("chan: dataq.popleft on empty dataq");
if (ch->_dataq_r >= ch->_cap)
bug("chan: dataq.popleft: r >= cap");
if (prx != NULL)
memcpy(prx, &((char *)(ch+1))[ch->_dataq_r * ch->_elemsize], ch->_elemsize);
ch->_dataq_r++; ch->_dataq_r %= ch->_cap;
ch->_dataq_n--;
}
// ---- select ----
// _default represents default case for _select.
const _selcase _default = {
.ch = NULL,
.op = _DEFAULT,
.data = NULL,
.rxok = NULL,
};
static const _RecvSendWaiting _sel_txrx_prepoll_won;
template<bool onstack> static int _chanselect2(const _selcase *, int, const vector<int>&);
template<> int _chanselect2</*onstack=*/true> (const _selcase *, int, const vector<int>&);
template<> int _chanselect2</*onstack=*/false>(const _selcase *, int, const vector<int>&);
static int __chanselect2(const _selcase *, int, const vector<int>&, _WaitGroup*);
// _chanselect executes one ready send or receive channel case.
//
// if no case is ready and default case was provided, select chooses default.
// if no case is ready and default was not provided, select blocks until one case becomes ready.
//
// returns: selected case number.
//
// For example:
//
// _selcase sel[4];
// sel[0] = _selsend(chi, &i);
// sel[1] = _selrecv(chp, &p);
// sel[2] = _selrecv_(chi, &j, &jok);
// sel[3] = _default;
// _ = _chanselect(sel, 4);
//
// See `select` for user-friendly wrapper.
// NOTE casev is not modified and can be used for next _chanselect calls.
int _chanselect(const _selcase *casev, int casec) {
if (casec < 0)
panic("select: casec < 0");
// select promise: if multiple cases are ready - one will be selected randomly
vector<int> nv(casec); // n -> n(case) TODO -> caller stack-allocate nv
for (int i=0; i <casec; i++)
nv[i] = i;
std::random_shuffle(nv.begin(), nv.end());
// first pass: poll all cases and bail out in the end if default was provided
int ndefault = -1;
bool havenonnil = false; // whether we have at least one !nil channel
for (auto n : nv) {
const _selcase *cas = &casev[n];
_chan *ch = cas->ch;
// default: remember we have it
if (cas->op == _DEFAULT) {
if (ndefault != -1)
panic("select: multiple default");
ndefault = n;
}
// send
else if (cas->op == _CHANSEND) {
if (ch != NULL) { // nil chan is never ready
ch->_mu.lock();
if (1) {
bool done = ch->_trysend(cas->data);
if (done)
return n;
}
ch->_mu.unlock();
havenonnil = true;
}
}
// recv
else if (cas->op == _CHANRECV) {
if (ch != NULL) { // nil chan is never ready
ch->_mu.lock();
if (1) {
bool ok, done = ch->_tryrecv(cas->data, &ok);
if (done) {
if (cas->rxok != NULL)
*cas->rxok = ok;
return n;
}
}
ch->_mu.unlock();
havenonnil = true;
}
}
// bad
else {
panic("select: invalid op");
}
}
// execute default if we have it
if (ndefault != -1)
return ndefault;
// select{} or with nil-channels only -> block forever
if (!havenonnil)
_blockforever();
// second pass: subscribe and wait on all rx/tx cases
return (_runtime->flags & STACK_DEAD_WHILE_PARKED) \
? _chanselect2</*onstack=*/false>(casev, casec, nv)
: _chanselect2</*onstack=*/true> (casev, casec, nv);
}
template<> int _chanselect2</*onstack=*/true> (const _selcase *casev, int casec, const vector<int>& nv) {
_WaitGroup g;
return __chanselect2(casev, casec, nv, &g);
}
template<> int _chanselect2</*onstack=*/false>(const _selcase *casev, int casec, const vector<int>& nv) {
unique_ptr<_WaitGroup> g (new _WaitGroup);
int i;
unsigned rxmax=0, txtotal=0;
// reallocate chan .tx / .rx to heap; adjust casev
// XXX avoid doing this if all .tx and .rx are on heap?
unique_ptr<_selcase[]> casev_onheap (new _selcase[casec]);
for (i = 0; i < casec; i++) {
const _selcase *cas = &casev[i];
casev_onheap[i] = *cas;
if (cas->ch == NULL) // nil chan
continue;
if (cas->op == _CHANSEND) {
txtotal += cas->ch->_elemsize;
}
else if (cas->op == _CHANRECV) {
rxmax = max(rxmax, cas->ch->_elemsize);
}
else {
bug("select: invalid op ; _chanselect2: !onstack: A");
}
}
// tx are appended sequentially; all rx go to &rxtxdata[0]
char *rxtxdata = (char *)malloc(max(rxmax, txtotal));
if (rxtxdata == NULL)
throw bad_alloc();
defer([&]() {
free(rxtxdata);
});
char *ptx = rxtxdata;
for (i = 0; i <casec; i++) {
_selcase *cas = &casev_onheap[i];
if (cas->ch == NULL) // nil chan
continue;
if (cas->op == _CHANSEND) {
memcpy(ptx, cas->data, cas->ch->_elemsize);
cas->data = ptx;
ptx += cas->ch->_elemsize;
}
else if (cas->op == _CHANRECV) {
cas->data = rxtxdata;
} else {
bug("select: invalid op ; _chanselect2: !onstack: B");
}
}
// select ...
int selected = __chanselect2(casev_onheap.get(), casec, nv, g.get());
// copy data back to original rx location.
_selcase *cas = &casev_onheap[selected];
if (cas->op == _CHANRECV) {
const _selcase *cas0 = &casev[selected];
if (cas0->data != NULL)
memcpy(cas0->data, cas->data, cas->ch->_elemsize);
}
return selected;
}
static int __chanselect2(const _selcase *casev, int casec, const vector<int>& nv, _WaitGroup* g) {
// storage for waiters we create XXX stack-allocate (if !STACK_DEAD_WHILE_PARKED)
// XXX or let caller stack-allocate? but then we force it to know sizeof(_RecvSendWaiting)
_RecvSendWaiting *waitv = (_RecvSendWaiting *)calloc(sizeof(_RecvSendWaiting), casec);
int waitc = 0;
if (waitv == NULL)
throw bad_alloc();
// on exit: remove all registered waiters from their wait queues.
defer([&]() {
for (int i = 0; i < waitc; i++) {
_RecvSendWaiting *w = &waitv[i];
w->chan->_mu.lock();
list_del_init(&w->in_rxtxq); // thanks to _init used in _dequeWaiter
w->chan->_mu.unlock(); // it is ok to del twice even if w was already removed
}
free(waitv);
waitv = NULL;
});
for (auto n : nv) {
const _selcase *cas = &casev[n];
_chan *ch = cas->ch;
if (ch == NULL) // nil chan is never ready
continue;
ch->_mu.lock();
with_lock(g->_mu); // with, because _trysend may panic
// a case that we previously queued already won while we were
// queuing other cases.
if (g->which != NULL) {
ch->_mu.unlock();
goto ready;
}
// send
if (cas->op == _CHANSEND) {
bool done = ch->_trysend(cas->data);
if (done) {
g->which = &_sel_txrx_prepoll_won; // !NULL not to let already queued cases win
return n;
}
if (waitc >= casec)
bug("select: waitv overflow");
_RecvSendWaiting *w = &waitv[waitc++];
w->init(g, ch);
w->pdata = cas->data;
w->ok = false;
w->sel_n = n;
list_add_tail(&w->in_rxtxq, &ch->_sendq);
}
// recv
else if (cas->op == _CHANRECV) {
bool ok, done = ch->_tryrecv(cas->data, &ok);
if (done) {
g->which = &_sel_txrx_prepoll_won; // !NULL not to let already queued cases win
if (cas->rxok != NULL)
*cas->rxok = ok;
return n;
}
if (waitc >= casec)
bug("select: waitv overflow");
_RecvSendWaiting *w = &waitv[waitc++];
w->init(g, ch);
w->pdata = cas->data;
w->ok = false;
w->sel_n = n;
list_add_tail(&w->in_rxtxq, &ch->_recvq);
}
// bad
else {
bug("select: invalid op during phase 2");
}
ch->_mu.unlock();
}
// wait for a case to become ready
g->wait();
ready:
if (g->which == &_sel_txrx_prepoll_won)
bug("select: woke up with g.which=_sel_txrx_prepoll_won");
const _RecvSendWaiting *sel = g->which;
int selected = sel->sel_n;
const _selcase *cas = &casev[selected];
if (cas->op == _CHANSEND) {
if (!sel->ok)
panic("send on closed channel");
return selected;
}
else if (cas->op == _CHANRECV) {
if (cas->rxok != NULL)
*cas->rxok = sel->ok;
return selected;
}
bug("select: selected case has invalid op");
}
// _blockforever blocks current goroutine forever.
void (*_tblockforever)() = NULL;
void _blockforever() {
if (_tblockforever != NULL)
_tblockforever();
// take a lock twice. It will forever block on the second lock attempt.
// Under gevent, similarly to Go, this raises "LoopExit: This operation
// would block forever", if there are no other greenlets scheduled to be run.
Sema dead;
dead.acquire();
dead.acquire();
bug("_blockforever: woken up");
}
// ---- for tests ----
// _tchanlenrecvqlen returns len(_ch._recvq)
int _tchanrecvqlen(_chan *_ch) {
int l = 0;
list_head *h;
_ch->_mu.lock();
list_for_each(h, &_ch->_recvq)
l++;
_ch->_mu.unlock();
return l;
}
// _tchanlensendqlen returns len(_ch._sendq)
int _tchansendqlen(_chan *_ch) {
int l = 0;
list_head *h;
_ch->_mu.lock();
list_for_each(h, &_ch->_sendq)
l++;
_ch->_mu.unlock();
return l;
}
} // golang::
......
......@@ -20,15 +20,265 @@
// Test that exercises C++-level libgolang.h API and functionality.
#include "golang/libgolang.h"
#include <stdio.h>
#include <tuple>
#include <utility>
using namespace golang;
using std::function;
using std::move;
using std::tie;
#define __STR(X) #X
#define STR(X) __STR(X)
#define ASSERT(COND) do { \
if (!(COND)) \
panic(__FILE__ ":" STR(__LINE__) " assert `" #COND "` failed"); \
} while(0)
// verify chan<T> automatic reference counting.
void _test_chan_cpp_refcount() {
chan<int> ch;
ASSERT(ch == NULL);
ASSERT(!(ch != NULL));
ASSERT(ch._rawchan() == NULL);
ch = makechan<int>();
ASSERT(!(ch == NULL));
ASSERT(ch != NULL);
ASSERT(ch._rawchan() != NULL);
_chan *_ch = ch._rawchan();
ASSERT(_chanrefcnt(_ch) == 1);
// copy ctor
{
chan<int> ch2(ch);
ASSERT(ch2._rawchan() == _ch);
ASSERT(_chanrefcnt(_ch) == 2);
ASSERT(ch2 == ch);
ASSERT(!(ch2 != ch));
// ch2 goes out of scope, decref'ing via ~chan
}
ASSERT(_chanrefcnt(_ch) == 1);
// copy =
{
chan<int> ch2;
ASSERT(ch2 == NULL);
ASSERT(ch2._rawchan() == NULL);
ch2 = ch;
ASSERT(ch2._rawchan() == _ch);
ASSERT(_chanrefcnt(_ch) == 2);
ASSERT(ch2 == ch);
ASSERT(!(ch2 != ch));
ch2 = NULL;
ASSERT(ch2 == NULL);
ASSERT(ch2._rawchan() == NULL);
ASSERT(_chanrefcnt(_ch) == 1);
ASSERT(!(ch2 == ch));
ASSERT(ch2 != ch);
}
ASSERT(_chanrefcnt(_ch) == 1);
// move ctor
chan<int> ch2(move(ch));
ASSERT(ch == NULL);
ASSERT(ch._rawchan() == NULL);
ASSERT(ch2 != NULL);
ASSERT(ch2._rawchan() == _ch);
ASSERT(_chanrefcnt(_ch) == 1);
// move =
ch = move(ch2);
ASSERT(ch != NULL);
ASSERT(ch._rawchan() == _ch);
ASSERT(ch2 == NULL);
ASSERT(ch2._rawchan() == NULL);
ASSERT(_chanrefcnt(_ch) == 1);
// ch goes out of scope and destroys raw channel
}
// verify chan<T> IO.
struct Point {
int x, y;
};
void _test_chan_cpp() {
chan<structZ> done = makechan<structZ>();
chan<int> chi = makechan<int>(1);
chan<Point> chp = makechan<Point>(); chp = NULL;
int i, j, _;
Point p;
bool jok;
i=+1; chi.send(i);
j=-1; j = chi.recv();
if (j != i)
panic("send -> recv != I");
i = 2;
_ = select({
done.recvs(), // 0
chi.sends(&i), // 1
chp.recvs(&p), // 2
chi.recvs(&j, &jok), // 3
_default, // 4
});
if (_ != 1)
panic("select: selected !1");
tie(j, jok) = chi.recv_();
if (!(j == 2 && jok == true))
panic("recv_ != (2, true)");
chi.close();
tie(j, jok) = chi.recv_();
if (!(j == 0 && jok == false))
panic("recv_ from closed != (0, false)");
// XXX chan<chan> is currently TODO
//chan<chan<int>> zzz;
}
// waitBlocked waits until either a receive (if rx) or send (if tx) operation
// blocks waiting on the channel.
void waitBlocked(_chan *ch, bool rx, bool tx) {
if (ch == NULL)
panic("wait blocked: called on nil channel");
double t0 = time::now();
while (1) {
if (rx && (_tchanrecvqlen(ch) != 0))
return;
if (tx && (_tchansendqlen(ch) != 0))
return;
double now = time::now();
if (now-t0 > 10) // waited > 10 seconds - likely deadlock
panic("deadlock");
time::sleep(0); // yield to another thread / coroutine
}
}
template<typename T> void waitBlocked_RX(chan<T> ch) {
waitBlocked(ch._rawchan(), /*rx=*/true, /*tx=*/0);
}
template<typename T> void waitBlocked_TX(chan<T> ch) {
waitBlocked(ch._rawchan(), /*rx=*/0, /*tx=*/true);
}
// usestack_and_call pushes C-stack down and calls f from that.
// C-stack pushdown is used to make sure that when f will block and switched
// to another g, greenlet will save f's C-stack frame onto heap.
//
// --- ~~~
// stack of another g
// --- ~~~
//
// .
// .
// .
//
// f -> heap
static void usestack_and_call(function<void()> f, int nframes=128) {
if (nframes == 0) {
f();
return;
}
return usestack_and_call(f, nframes-1);
}
// verify that send/recv/select correctly route their onstack arguments through onheap proxies.
void _test_chan_vs_stackdeadwhileparked() {
// problem: under greenlet g's stack lives on system stack and is swapped as needed
// onto heap and back on g switch. This way if e.g. recv() is called with
// prx pointing to stack, and the stack is later copied to heap and replaced
// with stack of another g, the sender, if writing to original prx directly,
// will write to stack of different g, and original recv g, after wakeup,
// will see unchanged memory - with stack content that was saved to heap.
//
// to avoid this, send/recv/select create onheap proxies for onstack
// arguments and use those proxies as actual argument for send/receive.
// recv
auto ch = makechan<int>();
go([&]() {
waitBlocked_RX(ch);
usestack_and_call([&]() {
ch.send(111);
});
});
usestack_and_call([&]() {
int rx = ch.recv();
if (rx != 111)
panic("recv(111) != 111");
});
// send
auto done = makechan<structZ>();
go([&]() {
waitBlocked_TX(ch);
usestack_and_call([&]() {
int rx = ch.recv();
if (rx != 222)
panic("recv(222) != 222");
});
done.close();
});
usestack_and_call([&]() {
ch.send(222);
});
done.recv();
// select(recv)
go([&]() {
waitBlocked_RX(ch);
usestack_and_call([&]() {
ch.send(333);
});
});
usestack_and_call([&]() {
int rx = 0;
int _ = select({ch.recvs(&rx)});
if (_ != 0)
panic("select(recv, 333): selected !0");
if (rx != 333)
panic("select(recv, 333): recv != 333");
});
// select(send)
done = makechan<structZ>();
go([&]() {
waitBlocked_TX(ch);
usestack_and_call([&]() {
int rx = ch.recv();
if (rx != 444)
panic("recv(444) != 444");
});
done.close();
});
usestack_and_call([&]() {
int tx = 444;
int _ = select({ch.sends(&tx)});
if (_ != 0)
panic("select(send, 444): selected !0");
});
done.recv();
}
// small test to verify C++ go.
static void _work(int i);
static void _work(int i, chan<structZ> done);
void _test_go_cpp() {
go(_work, 111); // not λ to test that go correctly passes arguments
// TODO wait till _work is done
auto done = makechan<structZ>();
go(_work, 111, done); // not λ to test that go correctly passes arguments
done.recv();
}
static void _work(int i) {
static void _work(int i, chan<structZ> done) {
if (i != 111)
panic("_work: i != 111");
done.close();
}
......@@ -26,20 +26,69 @@
#include "golang/libgolang.h"
#include <stdlib.h>
typedef struct Point {
int x, y;
} Point;
void _test_chan_c(void) {
_chan *done = _makechan(0, 0);
_chan *chi = _makechan(sizeof(int), 1);
_chan *chp = NULL;
int i, j, _;
Point p;
bool jok;
i=+1; _chansend(chi, &i);
j=-1; _chanrecv(chi, &j);
if (j != i)
panic("send -> recv != I");
i = 2;
_selcase sel[5];
sel[0] = _selrecv(done, NULL);
sel[1] = _selsend(chi, &i);
sel[2] = _selrecv(chp, &p);
sel[3] = _selrecv_(chi, &j, &jok);
sel[4] = _default;
_ = _chanselect(sel, 5);
if (_ != 1)
panic("select: selected !1");
jok = _chanrecv_(chi, &j);
if (!(j == 2 && jok == true))
panic("recv_ != (2, true)");
_chanclose(chi);
jok = _chanrecv_(chi, &j);
if (!(j == 0 && jok == false))
panic("recv_ from closed != (0, false)");
_chanxdecref(done);
_chanxdecref(chi);
_chanxdecref(chp);
}
// small test to verify C _taskgo.
struct _work_arg{int i;};
struct _work_arg{int i; _chan *done;};
static void _work(void *);
void _test_go_c(void) {
_chan *done = _makechan(0,0);
if (done == NULL)
panic("_makechan -> failed");
struct _work_arg *_ = malloc(sizeof(*_));
if (_ == NULL)
panic("malloc _work_arg -> failed");
_->i = 111;
_->done = done;
_taskgo(_work, _);
// TODO wait till _work is done
_chanrecv(done, NULL);
_chanxdecref(done);
}
static void _work(void *__) {
struct _work_arg *_ = (struct _work_arg *)__;
if (_->i != 111)
panic("_work: i != 111");
_chanclose(_->done);
free(_);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment