Commit 3b241983 authored by Kirill Smelkov's avatar Kirill Smelkov

Port/move channels to C/C++/Pyx

- Move channels implementation to be done in C++ inside libgolang. The
  code and logic is based on previous Python-level channels
  implementation, but the new code is just C++ and does not depend on
  Python nor GIL at all, and so works without GIL if libgolang
  runtime works without GIL(*).

  (*) for example "thread" runtime works without GIL, while "gevent" runtime
      acquires GIL on every semaphore acquire.

  New channels implementation is located in δ(libgolang.cpp).

- Provide low-level C channels API to the implementation. The low-level
  C API was inspired by Libtask[1] and Plan9/Libthread[2].

  [1] Libtask: a Coroutine Library for C and Unix. https://swtch.com/libtask.
  [2] http://9p.io/magic/man2html/2/thread.

- Provide high-level C++ channels API that provides type-safety and
  automatic channel lifetime management.

  Overview of C and C++ APIs are in δ(libgolang.h).

- Expose C++ channels API at Pyx level as Cython/nogil API so that Cython
  programs could use channels with ease and without need to care about
  lifetime management and low-level details.

  Overview of Cython/nogil channels API is in δ(README.rst) and
  δ(_golang.pxd).

- Turn Python channels to be tiny wrapper around chan<PyObject>.

Implementation note:

- gevent case needs special care because greenlet, which gevent uses,
  swaps coroutine stack from C stack to heap on coroutine park, and
  replaces that space on C stack with stack of activated coroutine
  copied back from heap. This way if an object on g's stack is accessed
  while g is parked it would be memory of another g's stack.

  The channels implementation explicitly cares about this issue so that
  stack -> * channel send, or * -> stack channel receive work correctly.

  It should be noted that greenlet approach, which it inherits from
  stackless, is not only a bit tricky, but also comes with overhead
  (stack <-> heap copy), and prevents a coroutine to migrate from 1 OS
  thread to another OS thread as that would change addresses of on-stack
  things for that coroutine.

  As the latter property prevents to use multiple CPUs even if the
  program / runtime are prepared to work without GIL, it would be more
  logical to change gevent/greenlet to use separate stack for each
  coroutine. That would remove stack <-> heap copy and the need for
  special care in channels implementation for stack - stack sends.
  Such approach should be possible to implement with e.g. swapcontext or
  similar mechanism, and a proof of concept of such work wrapped into
  greenlet-compatible API exists[3]. It would be good if at some point
  there would be a chance to explore such approach in Pygolang context.

  [3] https://github.com/python-greenlet/greenlet/issues/113#issuecomment-264529838 and below

Just this patch brings in the following speedup at Python level:

 (on i7@2.6GHz)

thread runtime:

    name             old time/op  new time/op  delta
    go               20.0µs ± 1%  15.6µs ± 1%  -21.84%  (p=0.000 n=10+10)
    chan             9.37µs ± 4%  2.89µs ± 6%  -69.12%  (p=0.000 n=10+10)
    select           20.2µs ± 4%   3.4µs ± 5%  -83.20%  (p=0.000 n=8+10)
    def              58.0ns ± 0%  60.0ns ± 0%   +3.45%  (p=0.000 n=8+10)
    func_def         43.8µs ± 1%  43.9µs ± 1%     ~     (p=0.796 n=10+10)
    call             62.4ns ± 1%  63.5ns ± 1%   +1.76%  (p=0.001 n=10+10)
    func_call        1.06µs ± 1%  1.05µs ± 1%   -0.63%  (p=0.002 n=10+10)
    try_finally       136ns ± 0%   137ns ± 0%   +0.74%  (p=0.000 n=9+10)
    defer            2.28µs ± 1%  2.33µs ± 1%   +2.34%  (p=0.000 n=10+10)
    workgroup_empty  48.2µs ± 1%  34.1µs ± 2%  -29.18%  (p=0.000 n=9+10)
    workgroup_raise  58.9µs ± 1%  45.5µs ± 1%  -22.74%  (p=0.000 n=10+10)

gevent runtime:

    name             old time/op  new time/op  delta
    go               24.7µs ± 1%  15.9µs ± 1%  -35.72%  (p=0.000 n=9+9)
    chan             11.6µs ± 1%   7.3µs ± 1%  -36.74%  (p=0.000 n=10+10)
    select           22.5µs ± 1%  10.4µs ± 1%  -53.73%  (p=0.000 n=10+10)
    def              55.0ns ± 0%  55.0ns ± 0%     ~     (all equal)
    func_def         43.6µs ± 1%  43.6µs ± 1%     ~     (p=0.684 n=10+10)
    call             63.0ns ± 0%  64.0ns ± 0%   +1.59%  (p=0.000 n=10+10)
    func_call        1.06µs ± 1%  1.07µs ± 1%   +0.45%  (p=0.045 n=10+9)
    try_finally       135ns ± 0%   137ns ± 0%   +1.48%  (p=0.000 n=10+10)
    defer            2.31µs ± 1%  2.33µs ± 1%   +0.89%  (p=0.000 n=10+10)
    workgroup_empty  70.2µs ± 0%  55.8µs ± 0%  -20.63%  (p=0.000 n=10+10)
    workgroup_raise  90.3µs ± 0%  70.9µs ± 1%  -21.51%  (p=0.000 n=9+10)

The whole Cython/nogil work - starting from 8fa3c15b (Start using Cython
and providing Cython/nogil API) to this patch - brings in the following
speedup at Python level:

 (on i7@2.6GHz)

thread runtime:

    name             old time/op  new time/op  delta
    go               92.9µs ± 1%  15.6µs ± 1%  -83.16%  (p=0.000 n=10+10)
    chan             13.9µs ± 1%   2.9µs ± 6%  -79.14%  (p=0.000 n=10+10)
    select           29.7µs ± 6%   3.4µs ± 5%  -88.55%  (p=0.000 n=10+10)
    def              57.0ns ± 0%  60.0ns ± 0%   +5.26%  (p=0.000 n=10+10)
    func_def         44.0µs ± 1%  43.9µs ± 1%     ~     (p=0.055 n=10+10)
    call             63.5ns ± 1%  63.5ns ± 1%     ~     (p=1.000 n=10+10)
    func_call        1.06µs ± 0%  1.05µs ± 1%   -1.31%  (p=0.000 n=10+10)
    try_finally       139ns ± 0%   137ns ± 0%   -1.44%  (p=0.000 n=10+10)
    defer            2.36µs ± 1%  2.33µs ± 1%   -1.26%  (p=0.000 n=10+10)
    workgroup_empty  98.4µs ± 1%  34.1µs ± 2%  -65.32%  (p=0.000 n=10+10)
    workgroup_raise   135µs ± 1%    46µs ± 1%  -66.35%  (p=0.000 n=10+10)

gevent runtime:

    name             old time/op  new time/op  delta
    go               68.8µs ± 1%  15.9µs ± 1%  -76.91%  (p=0.000 n=10+9)
    chan             14.8µs ± 1%   7.3µs ± 1%  -50.67%  (p=0.000 n=10+10)
    select           32.0µs ± 0%  10.4µs ± 1%  -67.57%  (p=0.000 n=10+10)
    def              58.0ns ± 0%  55.0ns ± 0%   -5.17%  (p=0.000 n=10+10)
    func_def         43.9µs ± 1%  43.6µs ± 1%   -0.53%  (p=0.035 n=10+10)
    call             63.5ns ± 1%  64.0ns ± 0%   +0.79%  (p=0.033 n=10+10)
    func_call        1.08µs ± 1%  1.07µs ± 1%   -1.74%  (p=0.000 n=10+9)
    try_finally       142ns ± 0%   137ns ± 0%   -3.52%  (p=0.000 n=10+10)
    defer            2.32µs ± 1%  2.33µs ± 1%   +0.71%  (p=0.005 n=10+10)
    workgroup_empty  90.3µs ± 0%  55.8µs ± 0%  -38.26%  (p=0.000 n=10+10)
    workgroup_raise   108µs ± 1%    71µs ± 1%  -34.64%  (p=0.000 n=10+10)

This patch is the final patch in series to reach the goal of providing
channels that could be used in Cython/nogil code.

Cython/nogil channels work is dedicated to the memory of Вера Павловна Супрун[4].

[4] https://navytux.spb.ru/memory/%D0%A2%D1%91%D1%82%D1%8F%20%D0%92%D0%B5%D1%80%D0%B0.pdf#page=3
parent 9efb6575
...@@ -170,7 +170,7 @@ located in `src/` under `$GOPATH`. ...@@ -170,7 +170,7 @@ located in `src/` under `$GOPATH`.
Cython/nogil API Cython/nogil API
---------------- ----------------
Cython package `golang` provides *nogil* API with goroutines and Cython package `golang` provides *nogil* API with goroutines, channels and
other features that mirror corresponding Python package. Cython API is not only other features that mirror corresponding Python package. Cython API is not only
faster compared to Python version, but also, due to *nogil* property, allows to faster compared to Python version, but also, due to *nogil* property, allows to
build concurrent systems without limitations imposed by Python's GIL. All that build concurrent systems without limitations imposed by Python's GIL. All that
...@@ -178,14 +178,59 @@ while still programming in Python-like language. Brief description of ...@@ -178,14 +178,59 @@ while still programming in Python-like language. Brief description of
Cython/nogil API follows: Cython/nogil API follows:
`go` spawns new task - a coroutine, or thread, depending on activated runtime. `go` spawns new task - a coroutine, or thread, depending on activated runtime.
For example:: `chan[T]` represents a channel with Go semantic and elements of type `T`.
Use `makechan[T]` to create new channel, and `chan[T].recv`, `chan[T].send`,
`chan[T].close` for communication. `nil` stands for the nil channel. `select`
can be used to multiplex on several channels. For example::
cdef nogil: cdef nogil:
void worker(): struct Point:
pass int x
int y
void worker(chan[int] chi, chan[Point] chp):
chi.send(1)
cdef Point p
p.x = 3
p.y = 4
chp.send(p)
void myfunc(): void myfunc():
go(worker) cdef chan[int] chi = makechan[int]() # synchronous channel of integers
cdef chan[Point] chp = makechan[Point](3) # channel with buffer of size 3 and Point elements
go(worker, chi, chp)
i = chi.recv() # will give 1
p = chp.recv() # will give Point(3,4)
chp = nil # rebind chp to nil channel
cdef cbool ok
cdef int j = 33
_ = select([
chi.recvs(&i) # 0
chi.recvs(&i, &ok), # 1
chi.sends(&j), # 2
chp.recvs(&p), # 3
default, # 4
])
if _ == 0:
# i is what was received from chi
...
if _ == 1:
# (i, ok) is what was received from chi
...
if _ == 2:
# we know j was sent to chi
...
if _ == 3:
# this case will be never selected because
# send/recv on nil channel block forever.
...
if _ == 4:
# default case
...
`panic` stops normal execution of current goroutine by throwing a C-level `panic` stops normal execution of current goroutine by throwing a C-level
exception. On Python/C boundaries C-level exceptions have to be converted to exception. On Python/C boundaries C-level exceptions have to be converted to
......
...@@ -23,6 +23,7 @@ Cython/nogil API ...@@ -23,6 +23,7 @@ Cython/nogil API
---------------- ----------------
- `go` spawns lightweight thread. - `go` spawns lightweight thread.
- `chan[T]`, `makechan[T]` and `select` provide C-level channels with Go semantic.
- `panic` stops normal execution of current goroutine by throwing a C-level exception. - `panic` stops normal execution of current goroutine by throwing a C-level exception.
Everything in Cython/nogil API do not depend on Python runtime and in Everything in Cython/nogil API do not depend on Python runtime and in
...@@ -38,10 +39,16 @@ Golang.py runtime ...@@ -38,10 +39,16 @@ Golang.py runtime
In addition to Cython/nogil API, golang.pyx provides runtime for golang.py: In addition to Cython/nogil API, golang.pyx provides runtime for golang.py:
- Python-level channels are represented by pychan + pyselect.
- Python-level panic is represented by pypanic. - Python-level panic is represented by pypanic.
""" """
from libcpp cimport nullptr_t, nullptr as nil
from libcpp.utility cimport pair
cdef extern from *:
ctypedef bint cbool "bool"
# nogil pyx-level golang API. # nogil pyx-level golang API.
# #
# NOTE even though many functions may panic (= throw C++ exception) nothing is # NOTE even though many functions may panic (= throw C++ exception) nothing is
...@@ -58,14 +65,63 @@ cdef extern from "golang/libgolang.h" namespace "golang" nogil: ...@@ -58,14 +65,63 @@ cdef extern from "golang/libgolang.h" namespace "golang" nogil:
void go(...) # typechecking is done by C void go(...) # typechecking is done by C
struct _chan
cppclass chan[T]:
chan();
# send/recv/close
void send(const T&)
T recv()
pair[T, cbool] recv_()
void close()
# send/recv in select
_selcase sends(const T *ptx)
_selcase recvs()
_selcase recvs(T* prx)
_selcase recvs(T* prx, cbool *pok)
# length/capacity
unsigned len()
unsigned cap()
# compare wrt nil; =nil
cbool operator==(nullptr_t)
cbool operator!=(nullptr_t)
void operator=(nullptr_t)
# for tests
_chan *_rawchan()
chan[T] makechan[T]()
chan[T] makechan[T](unsigned size)
struct structZ:
pass
enum _chanop:
_CHANSEND
_CHANRECV
_DEFAULT
struct _selcase:
_chanop op
void *data
cbool *rxok
const _selcase default "golang::_default"
int select(_selcase casev[])
# ---- python bits ---- # ---- python bits ----
cdef void topyexc() except * cdef void topyexc() except *
cpdef pypanic(arg) cpdef pypanic(arg)
# pychan is chan<object>
from cpython cimport PyObject
ctypedef PyObject *pPyObject # https://github.com/cython/cython/issues/534
from cython cimport final from cython cimport final
@final @final
cdef class pychan: cdef class pychan:
cdef dict __dict__ cdef chan[pPyObject] ch
This diff is collapsed.
...@@ -26,21 +26,30 @@ ...@@ -26,21 +26,30 @@
from __future__ import print_function, absolute_import from __future__ import print_function, absolute_import
from golang cimport go, pychan, panic, pypanic, topyexc from golang cimport go, chan, _chan, makechan, pychan, nil, select, \
from golang import nilchan default, structZ, panic, pypanic, topyexc, cbool
from golang import _golang
from golang import time cdef extern from "golang/libgolang.h" namespace "golang" nogil:
int _tchanrecvqlen(_chan *ch)
int _tchansendqlen(_chan *ch)
void (*_tblockforever)()
# pylen_{recv,send}q returns len(pych._{recv,send}q) # pylen_{recv,send}q returns len(_chan._{recv,send}q)
def pylen_recvq(pychan pych not None): # -> int def pylen_recvq(pychan pych not None): # -> int
if pych is nilchan: if pych.ch == nil:
raise AssertionError('len(.recvq) on nil channel') raise AssertionError('len(.recvq) on nil channel')
return len(pych._recvq) return _tchanrecvqlen(pych.ch._rawchan())
def pylen_sendq(pychan pych not None): # -> int def pylen_sendq(pychan pych not None): # -> int
if pych is nilchan: if pych.ch == nil:
raise AssertionError('len(.sendq) on nil channel') raise AssertionError('len(.sendq) on nil channel')
return len(pych._sendq) return _tchansendqlen(pych.ch._rawchan())
# runtime/libgolang_test.cpp
cdef extern from *:
"""
extern void waitBlocked(golang::_chan *ch, bool rx, bool tx);
"""
void waitBlocked(_chan *, bint rx, bint tx) nogil except +topyexc
# pywaitBlocked waits till a receive or send pychan operation blocks waiting on the channel. # pywaitBlocked waits till a receive or send pychan operation blocks waiting on the channel.
# #
...@@ -49,7 +58,8 @@ def pywaitBlocked(pychanop): ...@@ -49,7 +58,8 @@ def pywaitBlocked(pychanop):
if pychanop.__self__.__class__ is not pychan: if pychanop.__self__.__class__ is not pychan:
pypanic("wait blocked: %r is method of a non-chan: %r" % (pychanop, pychanop.__self__.__class__)) pypanic("wait blocked: %r is method of a non-chan: %r" % (pychanop, pychanop.__self__.__class__))
cdef pychan pych = pychanop.__self__ cdef pychan pych = pychanop.__self__
recv = send = False cdef bint recv = False
cdef bint send = False
if pychanop.__name__ == "recv": # XXX better check PyCFunction directly if pychanop.__name__ == "recv": # XXX better check PyCFunction directly
recv = True recv = True
elif pychanop.__name__ == "send": # XXX better check PyCFunction directly elif pychanop.__name__ == "send": # XXX better check PyCFunction directly
...@@ -57,41 +67,84 @@ def pywaitBlocked(pychanop): ...@@ -57,41 +67,84 @@ def pywaitBlocked(pychanop):
else: else:
pypanic("wait blocked: unexpected chan method: %r" % (pychanop,)) pypanic("wait blocked: unexpected chan method: %r" % (pychanop,))
t0 = time.now() with nogil:
while 1: waitBlocked(pych.ch._rawchan(), recv, send)
with pych._mu:
if recv and pylen_recvq(pych) > 0:
return
if send and pylen_sendq(pych) > 0:
return
now = time.now()
if now-t0 > 10: # waited > 10 seconds - likely deadlock
pypanic("deadlock")
time.sleep(0) # yield to another thread / coroutine
# `with pypanicWhenBlocked` hooks into _golang._blockforever to raise panic with # `with pypanicWhenBlocked` hooks into libgolang _blockforever to raise panic with
# "t: blocks forever" instead of blocking. # "t: blocks forever" instead of blocking.
cdef class pypanicWhenBlocked: cdef class pypanicWhenBlocked:
def __enter__(t): def __enter__(t):
assert _golang._tblockforever is None global _tblockforever
_golang._tblockforever = _panicblocked _tblockforever = _panicblocked
return t return t
def __exit__(t, typ, val, tb): def __exit__(t, typ, val, tb):
_golang._tblockforever = None _tblockforever = NULL
def _panicblocked(): cdef void _panicblocked() nogil:
pypanic("t: blocks forever") panic("t: blocks forever")
# small test to verify pyx(nogil) channels.
ctypedef struct Point:
int x
int y
# TODO kill this and teach Cython to coerce pair[X,Y] -> (X,Y)
cdef (int, cbool) recv_(chan[int] ch) nogil:
_ = ch.recv_()
return (_.first, _.second)
cdef void _test_chan_nogil() nogil except +topyexc:
cdef chan[structZ] done = makechan[structZ]()
cdef chan[int] chi = makechan[int](1)
cdef chan[Point] chp = makechan[Point]()
chp = nil # reset to nil
cdef int i, j
cdef Point p
cdef cbool jok
i=+1; chi.send(i)
j=-1; j = chi.recv()
if not (j == i):
panic("send -> recv != I")
i = 2
_=select([
done.recvs(), # 0
chi.sends(&i), # 1
chp.recvs(&p), # 2
chi.recvs(&j, &jok), # 3
default, # 4
])
if _ != 1:
panic("select: selected !1")
j, jok = recv_(chi)
if not (j == 2 and jok == True):
panic("recv_ != (2, true)")
chi.close()
j, jok = recv_(chi)
if not (j == 0 and jok == False):
panic("recv_ from closed != (0, false)")
def test_chan_nogil():
with nogil:
_test_chan_nogil()
# small test to verify pyx(nogil) go. # small test to verify pyx(nogil) go.
cdef void _test_go_nogil() nogil except +topyexc: cdef void _test_go_nogil() nogil except +topyexc:
go(_work, 111) cdef chan[structZ] done = makechan[structZ]()
# TODO wait till _work is done go(_work, 111, done)
cdef void _work(int i) nogil: done.recv()
cdef void _work(int i, chan[structZ] done) nogil:
if i != 111: if i != 111:
panic("_work: i != 111") panic("_work: i != 111")
done.close()
def test_go_nogil(): def test_go_nogil():
with nogil: with nogil:
...@@ -101,9 +154,14 @@ def test_go_nogil(): ...@@ -101,9 +154,14 @@ def test_go_nogil():
# runtime/libgolang_test_c.c # runtime/libgolang_test_c.c
cdef extern from * nogil: cdef extern from * nogil:
""" """
extern "C" void _test_chan_c();
extern "C" void _test_go_c(); extern "C" void _test_go_c();
""" """
void _test_chan_c() except +topyexc
void _test_go_c() except +topyexc void _test_go_c() except +topyexc
def test_chan_c():
with nogil:
_test_chan_c()
def test_go_c(): def test_go_c():
with nogil: with nogil:
_test_go_c() _test_go_c()
...@@ -111,9 +169,24 @@ def test_go_c(): ...@@ -111,9 +169,24 @@ def test_go_c():
# runtime/libgolang_test.cpp # runtime/libgolang_test.cpp
cdef extern from * nogil: cdef extern from * nogil:
""" """
extern void _test_chan_cpp_refcount();
extern void _test_chan_cpp();
extern void _test_chan_vs_stackdeadwhileparked();
extern void _test_go_cpp(); extern void _test_go_cpp();
""" """
void _test_chan_cpp_refcount() except +topyexc
void _test_chan_cpp() except +topyexc
void _test_chan_vs_stackdeadwhileparked() except +topyexc
void _test_go_cpp() except +topyexc void _test_go_cpp() except +topyexc
def test_chan_cpp_refcount():
with nogil:
_test_chan_cpp_refcount()
def test_chan_cpp():
with nogil:
_test_chan_cpp()
def test_chan_vs_stackdeadwhileparked():
with nogil:
_test_chan_vs_stackdeadwhileparked()
def test_go_cpp(): def test_go_cpp():
with nogil: with nogil:
_test_go_cpp() _test_go_cpp()
...@@ -159,6 +159,11 @@ def test_chan(): ...@@ -159,6 +159,11 @@ def test_chan():
assert w2() is not None assert w2() is not None
ch = None ch = None
gc.collect() gc.collect()
# pypy needs another GC run: pychan does Py_DECREF on buffered objects, but
# on pypy cpyext objects are not deallocated from Py_DECREF even if
# ob_refcnt goes to zero - the deallocation is delayed until GC run.
# see also: http://doc.pypy.org/en/latest/discussion/rawrefcount.html
gc.collect()
assert w1() is None assert w1() is None
assert w2() is None assert w2() is None
......
...@@ -22,9 +22,10 @@ ...@@ -22,9 +22,10 @@
// Library Libgolang provides Go-like features for C and C++. // Library Libgolang provides Go-like features for C and C++.
// //
// Library Libgolang provides goroutines and other // Library Libgolang provides goroutines, channels with Go semantic and other
// accompanying features. The library consists of high-level type-safe C++ API, // accompanying features. The library consists of high-level type-safe C++ API,
// and low-level unsafe C API. // and low-level unsafe C API. The low-level C API was inspired by Libtask[1]
// and Plan9/Libthread[2].
// //
// The primary motivation for Libgolang is to serve as runtime for golang.pyx - // The primary motivation for Libgolang is to serve as runtime for golang.pyx -
// - Cython part of Pygolang project. However Libgolang is independent of // - Cython part of Pygolang project. However Libgolang is independent of
...@@ -35,12 +36,29 @@ ...@@ -35,12 +36,29 @@
// C++-level API // C++-level API
// //
// - `go` spawns new task. // - `go` spawns new task.
// - `chan<T>`, and `select` provide channels with Go semantic and automatic
// lifetime management.
// - `sleep` pauses current task. // - `sleep` pauses current task.
// - `panic` throws exception that represent C-level panic. // - `panic` throws exception that represent C-level panic.
// //
// For example: // For example:
// //
// go(worker, 1); // spawn worker(int) // chan<int> ch = makechan<int>(); // create new channel
// go(worker, ch, 1); // spawn worker(chan<int>, int)
// ch.send(1)
// j = ch.recv()
//
// _ = select({
// _default, // 0
// ch.sends(&i), // 1
// ch.recvs(&j), // 2
// });
// if (_ == 0)
// // default case selected
// if (_ == 1)
// // case 1 selected: i sent to ch
// if (_ == 2)
// // case 2 selected: j received from ch
// //
// if (<bug condition>) // if (<bug condition>)
// panic("bug"); // panic("bug");
...@@ -49,6 +67,10 @@ ...@@ -49,6 +67,10 @@
// C-level API // C-level API
// //
// - `_taskgo` spawns new task. // - `_taskgo` spawns new task.
// - `_makechan` creates raw channel with Go semantic.
// - `_chanxincref` and `_chanxdecref` manage channel lifetime.
// - `_chansend` and `_chanrecv` send/receive over raw channel.
// - `_chanselect`, `_selsend`, `_selrecv`, ... provide raw select functionality.
// - `tasknanosleep` pauses current task. // - `tasknanosleep` pauses current task.
// //
// //
...@@ -65,6 +87,10 @@ ...@@ -65,6 +87,10 @@
// //
// Once again, Libgolang itself is independent from Python, and other runtimes // Once again, Libgolang itself is independent from Python, and other runtimes
// are possible. // are possible.
//
//
// [1] Libtask: a Coroutine Library for C and Unix. https://swtch.com/libtask.
// [2] http://9p.io/magic/man2html/2/thread.
#include <stdbool.h> #include <stdbool.h>
#include <stddef.h> #include <stddef.h>
...@@ -104,10 +130,88 @@ LIBGOLANG_API void _taskgo(void (*f)(void *arg), void *arg); ...@@ -104,10 +130,88 @@ LIBGOLANG_API void _taskgo(void (*f)(void *arg), void *arg);
LIBGOLANG_API void _tasknanosleep(uint64_t dt); LIBGOLANG_API void _tasknanosleep(uint64_t dt);
LIBGOLANG_API uint64_t _nanotime(void); LIBGOLANG_API uint64_t _nanotime(void);
typedef struct _chan _chan;
LIBGOLANG_API _chan *_makechan(unsigned elemsize, unsigned size);
LIBGOLANG_API void _chanxincref(_chan *ch);
LIBGOLANG_API void _chanxdecref(_chan *ch);
LIBGOLANG_API int _chanrefcnt(_chan *ch);
LIBGOLANG_API void _chansend(_chan *ch, const void *ptx);
LIBGOLANG_API void _chanrecv(_chan *ch, void *prx);
LIBGOLANG_API bool _chanrecv_(_chan *ch, void *prx);
LIBGOLANG_API void _chanclose(_chan *ch);
LIBGOLANG_API unsigned _chanlen(_chan *ch);
LIBGOLANG_API unsigned _chancap(_chan *ch);
enum _chanop {
_CHANSEND = 0,
_CHANRECV = 1,
_DEFAULT = 2,
};
// _selcase represents one select case.
typedef struct _selcase {
_chan *ch; // channel
enum _chanop op; // chansend/chanrecv/default
void *data; // chansend: ptx; chanrecv: prx
bool *rxok; // chanrecv: where to save ok if !NULL; otherwise not used
} _selcase;
LIBGOLANG_API int _chanselect(const _selcase *casev, int casec);
// _selsend creates `_chansend(ch, ptx)` case for _chanselect.
static inline
_selcase _selsend(_chan *ch, const void *ptx) {
_selcase _ = {
.ch = ch,
.op = _CHANSEND,
.data = (void *)ptx,
.rxok = NULL,
};
return _;
}
// _selrecv creates `_chanrecv(ch, prx)` case for _chanselect.
static inline
_selcase _selrecv(_chan *ch, void *prx) {
_selcase _ = {
.ch = ch,
.op = _CHANRECV,
.data = prx,
.rxok = NULL,
};
return _;
}
// _selrecv_ creates `*pok = _chanrecv_(ch, prx)` case for _chanselect.
static inline
_selcase _selrecv_(_chan *ch, void *prx, bool *pok) {
_selcase _ = {
.ch = ch,
.op = _CHANRECV,
.data = prx,
.rxok = pok,
};
return _;
}
// _default represents default case for _chanselect.
extern LIBGOLANG_API const _selcase _default;
// libgolang runtime - the runtime must be initialized before any other libgolang use. // libgolang runtime - the runtime must be initialized before any other libgolang use.
typedef struct _libgolang_sema _libgolang_sema; typedef struct _libgolang_sema _libgolang_sema;
typedef enum _libgolang_runtime_flags {
// STACK_DEAD_WHILE_PARKED indicates that it is not safe to access
// goroutine's stack memory while the goroutine is parked.
//
// for example gevent/greenlet/stackless use it because they copy g's stack
// to heap on park and back on unpark. This way if objects on g's stack
// were accessed while g was parked it would be memory of another g's stack.
STACK_DEAD_WHILE_PARKED = 1,
} _libgolang_runtime_flags;
typedef struct _libgolang_runtime_ops { typedef struct _libgolang_runtime_ops {
_libgolang_runtime_flags flags;
// go should spawn a task (coroutine/thread/...). // go should spawn a task (coroutine/thread/...).
void (*go)(void (*f)(void *), void *arg); void (*go)(void (*f)(void *), void *arg);
...@@ -136,6 +240,11 @@ typedef struct _libgolang_runtime_ops { ...@@ -136,6 +240,11 @@ typedef struct _libgolang_runtime_ops {
LIBGOLANG_API void _libgolang_init(const _libgolang_runtime_ops *runtime_ops); LIBGOLANG_API void _libgolang_init(const _libgolang_runtime_ops *runtime_ops);
// for testing
LIBGOLANG_API int _tchanrecvqlen(_chan *ch);
LIBGOLANG_API int _tchansendqlen(_chan *ch);
LIBGOLANG_API extern void (*_tblockforever)(void);
#ifdef __cplusplus #ifdef __cplusplus
}} }}
#endif #endif
...@@ -145,8 +254,12 @@ LIBGOLANG_API void _libgolang_init(const _libgolang_runtime_ops *runtime_ops); ...@@ -145,8 +254,12 @@ LIBGOLANG_API void _libgolang_init(const _libgolang_runtime_ops *runtime_ops);
#ifdef __cplusplus #ifdef __cplusplus
#include <exception>
#include <functional> #include <functional>
#include <initializer_list>
#include <memory> #include <memory>
#include <type_traits>
#include <utility>
namespace golang { namespace golang {
...@@ -162,6 +275,122 @@ static inline void go(F /*std::function<void(Argv...)>*/ f, Argv... argv) { ...@@ -162,6 +275,122 @@ static inline void go(F /*std::function<void(Argv...)>*/ f, Argv... argv) {
}, frun); }, frun);
} }
template<typename T> class chan;
template<typename T> static chan<T> makechan(unsigned size=0);
// chan<T> provides type-safe wrapper over _chan.
//
// chan<T> is automatically reference-counted and is safe to use from multiple
// goroutines simultaneously.
template<typename T>
class chan {
_chan *_ch;
public:
inline chan() { _ch = NULL; } // nil channel if not explicitly initialized
friend chan<T> makechan<T>(unsigned size);
inline ~chan() { _chanxdecref(_ch); _ch = NULL; }
// = nil
inline chan(nullptr_t) { _ch = NULL; }
inline chan& operator=(nullptr_t) { _chanxdecref(_ch); _ch = NULL; return *this; }
// copy
inline chan(const chan& from) { _ch = from._ch; _chanxincref(_ch); }
inline chan& operator=(const chan& from) {
if (this != &from) {
_chanxdecref(_ch); _ch = from._ch; _chanxincref(_ch);
}
return *this;
}
// move
inline chan(chan&& from) { _ch = from._ch; from._ch = NULL; }
inline chan& operator=(chan&& from) {
if (this != &from) {
_chanxdecref(_ch); _ch = from._ch; from._ch = NULL;
}
return *this;
}
// _chan does plain memcpy to copy elements.
// TODO allow all types (e.g. element=chan )
static_assert(std::is_trivially_copyable<T>::value, "TODO chan<T>: T copy is not trivial");
// send/recv/close
inline void send(const T &ptx) { _chansend(_ch, &ptx); }
inline T recv() { T rx; _chanrecv(_ch, &rx); return rx; }
inline std::pair<T,bool> recv_() { T rx; bool ok = _chanrecv_(_ch, &rx);
return std::make_pair(rx, ok); }
inline void close() { _chanclose(_ch); }
// send/recv in select
// ch.sends creates `ch.send(*ptx)` case for select.
[[nodiscard]] inline _selcase sends(const T *ptx) { return _selsend(_ch, ptx); }
// ch.recvs creates `*prx = ch.recv()` case for select.
//
// if pok is provided the case is extended to `[*prx, *pok] = ch.recv_()`
// if both prx and pok are omitted the case is reduced to `ch.recv()`.
[[nodiscard]] inline _selcase recvs(T *prx=NULL, bool *pok=NULL) {
return _selrecv_(_ch, prx, pok);
}
// length/capacity
inline unsigned len() { return _chanlen(_ch); }
inline unsigned cap() { return _chancap(_ch); }
// compare wrt nil
inline bool operator==(nullptr_t) { return (_ch == NULL); }
inline bool operator!=(nullptr_t) { return (_ch != NULL); }
// compare wrt chan
inline bool operator==(const chan<T>& ch2) { return (_ch == ch2._ch); }
inline bool operator!=(const chan<T>& ch2) { return (_ch != ch2._ch); }
// for testing
inline _chan *_rawchan() { return _ch; }
};
// makechan<T> makes new chan<T> with capacity=size.
template<typename T> static inline
chan<T> makechan(unsigned size) {
chan<T> ch;
unsigned elemsize = std::is_empty<T>::value
? 0 // eg struct{} for which sizeof() gives 1 - *not* 0
: sizeof(T);
ch._ch = _makechan(elemsize, size);
if (ch._ch == NULL)
throw std::bad_alloc();
return ch;
}
// structZ is struct{}.
//
// it's a workaround for e.g. makechan<struct{}> giving
// "error: types may not be defined in template arguments".
struct structZ{};
// select, together with chan<T>.sends and chan<T>.recvs, provide type-safe
// wrappers over _chanselect and _selsend/_selrecv/_selrecv_.
//
// Usage example:
//
// _ = select({
// ch1.recvs(&v), // 0
// ch2.recvs(&v, &ok), // 1
// ch2.sends(&v), // 2
// _default, // 3
// })
static inline // select({case1, case2, case3})
int select(const std::initializer_list<const _selcase> &casev) {
return _chanselect(casev.begin(), casev.size());
}
template<size_t N> static inline // select(casev_array)
int select(const _selcase (&casev)[N]) {
return _chanselect(&casev[0], N);
}
namespace time { namespace time {
......
...@@ -23,18 +23,26 @@ ...@@ -23,18 +23,26 @@
# Small program that uses a bit of golang.pyx nogil features, mainly to verify # Small program that uses a bit of golang.pyx nogil features, mainly to verify
# that external project can build against golang in pyx mode. # that external project can build against golang in pyx mode.
from golang cimport go, topyexc from golang cimport go, chan, makechan, topyexc
from libc.stdio cimport printf from libc.stdio cimport printf
cdef nogil: cdef nogil:
void worker(int i, int j): void worker(chan[int] ch, int i, int j):
pass ch.send(i*j)
void _main() except +topyexc: void _main() except +topyexc:
cdef chan[int] ch = makechan[int]()
cdef int i cdef int i
for i in range(3): for i in range(3):
go(worker, i, 4) go(worker, ch, i, 4)
for i in range(3):
ch.recv()
ch.close()
#_, ok = ch.recv_() # TODO teach Cython to coerce pair[X,Y] -> (X,Y)
ch.recv_()
printf("test.pyx: OK\n") printf("test.pyx: OK\n")
......
...@@ -23,8 +23,12 @@ from libc.stdint cimport uint64_t ...@@ -23,8 +23,12 @@ from libc.stdint cimport uint64_t
cdef extern from "golang/libgolang.h" nogil: cdef extern from "golang/libgolang.h" nogil:
struct _libgolang_sema struct _libgolang_sema
enum _libgolang_runtime_flags:
STACK_DEAD_WHILE_PARKED
struct _libgolang_runtime_ops: struct _libgolang_runtime_ops:
_libgolang_runtime_flags flags
void (*go)(void (*f)(void *) nogil, void *arg); void (*go)(void (*f)(void *) nogil, void *arg);
_libgolang_sema* (*sema_alloc) () _libgolang_sema* (*sema_alloc) ()
......
...@@ -42,7 +42,7 @@ from cpython cimport Py_INCREF, Py_DECREF ...@@ -42,7 +42,7 @@ from cpython cimport Py_INCREF, Py_DECREF
from cython cimport final from cython cimport final
from golang.runtime._libgolang cimport _libgolang_runtime_ops, _libgolang_sema, \ from golang.runtime._libgolang cimport _libgolang_runtime_ops, _libgolang_sema, \
panic STACK_DEAD_WHILE_PARKED, panic
from golang.runtime cimport _runtime_thread from golang.runtime cimport _runtime_thread
...@@ -125,6 +125,10 @@ cdef nogil: ...@@ -125,6 +125,10 @@ cdef nogil:
# XXX const # XXX const
_libgolang_runtime_ops gevent_ops = _libgolang_runtime_ops( _libgolang_runtime_ops gevent_ops = _libgolang_runtime_ops(
# when greenlet is switched to another, its stack is copied to
# heap, and stack of switched-to greenlet is copied back to C stack.
flags = STACK_DEAD_WHILE_PARKED,
go = go, go = go,
sema_alloc = sema_alloc, sema_alloc = sema_alloc,
sema_free = sema_free, sema_free = sema_free,
......
...@@ -80,7 +80,7 @@ cdef extern from "pythread.h" nogil: ...@@ -80,7 +80,7 @@ cdef extern from "pythread.h" nogil:
void PyThread_free_lock(PyThread_type_lock) void PyThread_free_lock(PyThread_type_lock)
from golang.runtime._libgolang cimport _libgolang_runtime_ops, _libgolang_sema, \ from golang.runtime._libgolang cimport _libgolang_runtime_ops, _libgolang_sema, \
panic _libgolang_runtime_flags, panic
from libc.stdint cimport uint64_t, UINT64_MAX from libc.stdint cimport uint64_t, UINT64_MAX
IF POSIX: IF POSIX:
...@@ -168,6 +168,7 @@ cdef nogil: ...@@ -168,6 +168,7 @@ cdef nogil:
# XXX const # XXX const
_libgolang_runtime_ops thread_ops = _libgolang_runtime_ops( _libgolang_runtime_ops thread_ops = _libgolang_runtime_ops(
flags = <_libgolang_runtime_flags>0,
go = go, go = go,
sema_alloc = sema_alloc, sema_alloc = sema_alloc,
sema_free = sema_free, sema_free = sema_free,
......
This diff is collapsed.
...@@ -20,15 +20,265 @@ ...@@ -20,15 +20,265 @@
// Test that exercises C++-level libgolang.h API and functionality. // Test that exercises C++-level libgolang.h API and functionality.
#include "golang/libgolang.h" #include "golang/libgolang.h"
#include <stdio.h>
#include <tuple>
#include <utility>
using namespace golang; using namespace golang;
using std::function;
using std::move;
using std::tie;
#define __STR(X) #X
#define STR(X) __STR(X)
#define ASSERT(COND) do { \
if (!(COND)) \
panic(__FILE__ ":" STR(__LINE__) " assert `" #COND "` failed"); \
} while(0)
// verify chan<T> automatic reference counting.
void _test_chan_cpp_refcount() {
chan<int> ch;
ASSERT(ch == NULL);
ASSERT(!(ch != NULL));
ASSERT(ch._rawchan() == NULL);
ch = makechan<int>();
ASSERT(!(ch == NULL));
ASSERT(ch != NULL);
ASSERT(ch._rawchan() != NULL);
_chan *_ch = ch._rawchan();
ASSERT(_chanrefcnt(_ch) == 1);
// copy ctor
{
chan<int> ch2(ch);
ASSERT(ch2._rawchan() == _ch);
ASSERT(_chanrefcnt(_ch) == 2);
ASSERT(ch2 == ch);
ASSERT(!(ch2 != ch));
// ch2 goes out of scope, decref'ing via ~chan
}
ASSERT(_chanrefcnt(_ch) == 1);
// copy =
{
chan<int> ch2;
ASSERT(ch2 == NULL);
ASSERT(ch2._rawchan() == NULL);
ch2 = ch;
ASSERT(ch2._rawchan() == _ch);
ASSERT(_chanrefcnt(_ch) == 2);
ASSERT(ch2 == ch);
ASSERT(!(ch2 != ch));
ch2 = NULL;
ASSERT(ch2 == NULL);
ASSERT(ch2._rawchan() == NULL);
ASSERT(_chanrefcnt(_ch) == 1);
ASSERT(!(ch2 == ch));
ASSERT(ch2 != ch);
}
ASSERT(_chanrefcnt(_ch) == 1);
// move ctor
chan<int> ch2(move(ch));
ASSERT(ch == NULL);
ASSERT(ch._rawchan() == NULL);
ASSERT(ch2 != NULL);
ASSERT(ch2._rawchan() == _ch);
ASSERT(_chanrefcnt(_ch) == 1);
// move =
ch = move(ch2);
ASSERT(ch != NULL);
ASSERT(ch._rawchan() == _ch);
ASSERT(ch2 == NULL);
ASSERT(ch2._rawchan() == NULL);
ASSERT(_chanrefcnt(_ch) == 1);
// ch goes out of scope and destroys raw channel
}
// verify chan<T> IO.
struct Point {
int x, y;
};
void _test_chan_cpp() {
chan<structZ> done = makechan<structZ>();
chan<int> chi = makechan<int>(1);
chan<Point> chp = makechan<Point>(); chp = NULL;
int i, j, _;
Point p;
bool jok;
i=+1; chi.send(i);
j=-1; j = chi.recv();
if (j != i)
panic("send -> recv != I");
i = 2;
_ = select({
done.recvs(), // 0
chi.sends(&i), // 1
chp.recvs(&p), // 2
chi.recvs(&j, &jok), // 3
_default, // 4
});
if (_ != 1)
panic("select: selected !1");
tie(j, jok) = chi.recv_();
if (!(j == 2 && jok == true))
panic("recv_ != (2, true)");
chi.close();
tie(j, jok) = chi.recv_();
if (!(j == 0 && jok == false))
panic("recv_ from closed != (0, false)");
// XXX chan<chan> is currently TODO
//chan<chan<int>> zzz;
}
// waitBlocked waits until either a receive (if rx) or send (if tx) operation
// blocks waiting on the channel.
void waitBlocked(_chan *ch, bool rx, bool tx) {
if (ch == NULL)
panic("wait blocked: called on nil channel");
double t0 = time::now();
while (1) {
if (rx && (_tchanrecvqlen(ch) != 0))
return;
if (tx && (_tchansendqlen(ch) != 0))
return;
double now = time::now();
if (now-t0 > 10) // waited > 10 seconds - likely deadlock
panic("deadlock");
time::sleep(0); // yield to another thread / coroutine
}
}
template<typename T> void waitBlocked_RX(chan<T> ch) {
waitBlocked(ch._rawchan(), /*rx=*/true, /*tx=*/0);
}
template<typename T> void waitBlocked_TX(chan<T> ch) {
waitBlocked(ch._rawchan(), /*rx=*/0, /*tx=*/true);
}
// usestack_and_call pushes C-stack down and calls f from that.
// C-stack pushdown is used to make sure that when f will block and switched
// to another g, greenlet will save f's C-stack frame onto heap.
//
// --- ~~~
// stack of another g
// --- ~~~
//
// .
// .
// .
//
// f -> heap
static void usestack_and_call(function<void()> f, int nframes=128) {
if (nframes == 0) {
f();
return;
}
return usestack_and_call(f, nframes-1);
}
// verify that send/recv/select correctly route their onstack arguments through onheap proxies.
void _test_chan_vs_stackdeadwhileparked() {
// problem: under greenlet g's stack lives on system stack and is swapped as needed
// onto heap and back on g switch. This way if e.g. recv() is called with
// prx pointing to stack, and the stack is later copied to heap and replaced
// with stack of another g, the sender, if writing to original prx directly,
// will write to stack of different g, and original recv g, after wakeup,
// will see unchanged memory - with stack content that was saved to heap.
//
// to avoid this, send/recv/select create onheap proxies for onstack
// arguments and use those proxies as actual argument for send/receive.
// recv
auto ch = makechan<int>();
go([&]() {
waitBlocked_RX(ch);
usestack_and_call([&]() {
ch.send(111);
});
});
usestack_and_call([&]() {
int rx = ch.recv();
if (rx != 111)
panic("recv(111) != 111");
});
// send
auto done = makechan<structZ>();
go([&]() {
waitBlocked_TX(ch);
usestack_and_call([&]() {
int rx = ch.recv();
if (rx != 222)
panic("recv(222) != 222");
});
done.close();
});
usestack_and_call([&]() {
ch.send(222);
});
done.recv();
// select(recv)
go([&]() {
waitBlocked_RX(ch);
usestack_and_call([&]() {
ch.send(333);
});
});
usestack_and_call([&]() {
int rx = 0;
int _ = select({ch.recvs(&rx)});
if (_ != 0)
panic("select(recv, 333): selected !0");
if (rx != 333)
panic("select(recv, 333): recv != 333");
});
// select(send)
done = makechan<structZ>();
go([&]() {
waitBlocked_TX(ch);
usestack_and_call([&]() {
int rx = ch.recv();
if (rx != 444)
panic("recv(444) != 444");
});
done.close();
});
usestack_and_call([&]() {
int tx = 444;
int _ = select({ch.sends(&tx)});
if (_ != 0)
panic("select(send, 444): selected !0");
});
done.recv();
}
// small test to verify C++ go. // small test to verify C++ go.
static void _work(int i); static void _work(int i, chan<structZ> done);
void _test_go_cpp() { void _test_go_cpp() {
go(_work, 111); // not λ to test that go correctly passes arguments auto done = makechan<structZ>();
// TODO wait till _work is done go(_work, 111, done); // not λ to test that go correctly passes arguments
done.recv();
} }
static void _work(int i) { static void _work(int i, chan<structZ> done) {
if (i != 111) if (i != 111)
panic("_work: i != 111"); panic("_work: i != 111");
done.close();
} }
...@@ -26,20 +26,69 @@ ...@@ -26,20 +26,69 @@
#include "golang/libgolang.h" #include "golang/libgolang.h"
#include <stdlib.h> #include <stdlib.h>
typedef struct Point {
int x, y;
} Point;
void _test_chan_c(void) {
_chan *done = _makechan(0, 0);
_chan *chi = _makechan(sizeof(int), 1);
_chan *chp = NULL;
int i, j, _;
Point p;
bool jok;
i=+1; _chansend(chi, &i);
j=-1; _chanrecv(chi, &j);
if (j != i)
panic("send -> recv != I");
i = 2;
_selcase sel[5];
sel[0] = _selrecv(done, NULL);
sel[1] = _selsend(chi, &i);
sel[2] = _selrecv(chp, &p);
sel[3] = _selrecv_(chi, &j, &jok);
sel[4] = _default;
_ = _chanselect(sel, 5);
if (_ != 1)
panic("select: selected !1");
jok = _chanrecv_(chi, &j);
if (!(j == 2 && jok == true))
panic("recv_ != (2, true)");
_chanclose(chi);
jok = _chanrecv_(chi, &j);
if (!(j == 0 && jok == false))
panic("recv_ from closed != (0, false)");
_chanxdecref(done);
_chanxdecref(chi);
_chanxdecref(chp);
}
// small test to verify C _taskgo. // small test to verify C _taskgo.
struct _work_arg{int i;}; struct _work_arg{int i; _chan *done;};
static void _work(void *); static void _work(void *);
void _test_go_c(void) { void _test_go_c(void) {
_chan *done = _makechan(0,0);
if (done == NULL)
panic("_makechan -> failed");
struct _work_arg *_ = malloc(sizeof(*_)); struct _work_arg *_ = malloc(sizeof(*_));
if (_ == NULL) if (_ == NULL)
panic("malloc _work_arg -> failed"); panic("malloc _work_arg -> failed");
_->i = 111; _->i = 111;
_->done = done;
_taskgo(_work, _); _taskgo(_work, _);
// TODO wait till _work is done _chanrecv(done, NULL);
_chanxdecref(done);
} }
static void _work(void *__) { static void _work(void *__) {
struct _work_arg *_ = (struct _work_arg *)__; struct _work_arg *_ = (struct _work_arg *)__;
if (_->i != 111) if (_->i != 111)
panic("_work: i != 111"); panic("_work: i != 111");
_chanclose(_->done);
free(_); free(_);
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment