Commit f2847307 authored by Kirill Smelkov's avatar Kirill Smelkov

golang: Rework pychan to use C-level channel API

We will soon rework pychan to be python wrapper not only for
chan<object>, but also for other channels of various C types - e.g.
chan<structZ>, chan<int>, etc.

To prepare for this let's first rework pychan from using chan[PyObject*]
into raw _chan* functions. This will allow us to use the same functions
over raw channels while dynamically dispatching on channel element type.
parent 4f6a9e09
......@@ -118,10 +118,8 @@ cdef void topyexc() except *
cpdef pypanic(arg)
# pychan is chan<object>
from cpython cimport PyObject
ctypedef PyObject *pPyObject # https://github.com/cython/cython/issues/534
from cython cimport final
@final
cdef class pychan:
cdef chan[pPyObject] ch
cdef _chan *_ch
......@@ -32,7 +32,8 @@ from __future__ import print_function, absolute_import
# init libgolang runtime early
_init_libgolang()
from cpython cimport Py_INCREF, Py_DECREF, PY_MAJOR_VERSION
from cpython cimport PyObject, Py_INCREF, Py_DECREF, PY_MAJOR_VERSION
ctypedef PyObject *pPyObject # https://github.com/cython/cython/issues/534
cdef extern from "Python.h":
ctypedef struct PyTupleObject:
PyObject **ob_item
......@@ -145,21 +146,22 @@ cdef void __goviac(void *arg) nogil:
@final
cdef class pychan:
def __cinit__(pych, size=0):
pych.ch = makechan_pyobj_pyexc(size)
pych._ch = _makechan_pyexc(sizeof(PyObject*), size)
def __dealloc__(pych):
# on del: drain buffered channel to decref sent objects.
# verify that the channel is not connected anywhere outside us.
# (if it was present also somewhere else - draining would be incorrect)
if pych.ch == nil:
if pych._ch == NULL:
return
cdef int refcnt = _chanrefcnt(pych.ch._rawchan())
cdef int refcnt = _chanrefcnt(pych._ch)
if refcnt != 1:
# cannot raise py-level exception in __dealloc__
Py_FatalError("pychan.__dealloc__: chan.refcnt=%d ; must be =1" % refcnt)
cdef chan[pPyObject] ch = pych.ch
pych.ch = nil # does _chanxdecref(ch)
cdef chan[pPyObject] ch = _wrapchan[pPyObject](pych._ch)
_chanxdecref(pych._ch)
pych._ch = NULL
cdef PyObject *_rx
while ch.len() != 0:
......@@ -177,12 +179,14 @@ cdef class pychan:
# send sends object to a receiver.
def send(pych, obj):
cdef PyObject *_tx = <PyObject*>obj
# increment obj reference count - until received the channel is holding pointer to the object.
Py_INCREF(obj)
try:
with nogil:
chansend_pyexc(pych.ch, <PyObject *>obj)
_chansend_pyexc(pych._ch, &_tx)
except: # not only _PanicError as send can also throw e.g. bad_alloc
# the object was not sent - e.g. it was "send on a closed channel"
Py_DECREF(obj)
......@@ -197,7 +201,7 @@ cdef class pychan:
cdef bint ok
with nogil:
_rx, ok = chanrecv__pyexc(pych.ch)
ok = _chanrecv__pyexc(pych._ch, &_rx)
if not ok:
return (None, ok)
......@@ -215,13 +219,13 @@ cdef class pychan:
# close closes sending side of the channel.
def close(pych):
with nogil:
chanclose_pyexc(pych.ch)
_chanclose_pyexc(pych._ch)
def __len__(pych):
return chanlen_pyexc(pych.ch)
return _chanlen_pyexc(pych._ch)
def __repr__(pych):
if pych.ch == nil:
if pych._ch == NULL:
return "nilchan"
else:
return super(pychan, pych).__repr__()
......@@ -231,7 +235,8 @@ cdef class pychan:
#
# On nil channel: send/recv block forever; close panics.
cdef pychan _pynilchan = pychan()
_pynilchan.ch = chan[pPyObject]() # = NULL
_chanxdecref(_pynilchan._ch)
_pynilchan._ch = NULL
pynilchan = _pynilchan
......@@ -301,7 +306,7 @@ def pyselect(*pycasev):
# we'll decref the object if it won't be sent.
# see pychan.send for details.
Py_INCREF(tx)
casev[i] = pych.ch.sends(p_tx)
casev[i] = _selsend(pych._ch, p_tx)
# recv
else:
......@@ -311,9 +316,9 @@ def pyselect(*pycasev):
pych = pyrecv.__self__
if pyrecv.__name__ == "recv": # XXX better check PyCFunction directly
casev[i] = pych.ch.recvs(&_rx)
casev[i] = _selrecv(pych._ch, &_rx)
elif pyrecv.__name__ == "recv_": # XXX better check PyCFunction directly
casev[i] = pych.ch.recvs(&_rx, &rxok)
casev[i] = _selrecv_(pych._ch, &_rx, &rxok)
else:
pypanic("pyselect: recv expected: %r" % (pyrecv,))
......@@ -386,27 +391,39 @@ cdef void _init_libgolang() except*:
# ---- misc ----
cdef extern from "golang/libgolang.h" namespace "golang" nogil:
_chan *_makechan(unsigned elemsize, unsigned size)
chan[T] _wrapchan[T](_chan *)
void _chanxincref(_chan *ch)
void _chanxdecref(_chan *ch)
int _chanrefcnt(_chan *ch)
void _chansend(_chan *ch, const void *ptx)
bint _chanrecv_(_chan *ch, void *prx)
void _chanclose(_chan *ch)
unsigned _chanlen(_chan *ch)
int _chanselect(_selcase *casev, int casec)
_selcase _selsend(_chan *ch, const void *ptx)
_selcase _selrecv(_chan *ch, void *prx)
_selcase _selrecv_(_chan *ch, void *prx, bint *pok)
void _taskgo(void (*f)(void *), void *arg)
cdef nogil:
chan[pPyObject] makechan_pyobj_pyexc(unsigned size) except +topyexc:
return makechan[pPyObject](size)
_chan* _makechan_pyexc(unsigned elemsize, unsigned size) except +topyexc:
return _makechan(elemsize, size)
void chansend_pyexc(chan[pPyObject] ch, PyObject *_tx) except +topyexc:
ch.send(_tx)
void _chansend_pyexc(_chan *ch, const void *ptx) except +topyexc:
_chansend(ch, ptx)
(PyObject*, bint) chanrecv__pyexc(chan[pPyObject] ch) except +topyexc:
_ = ch.recv_()
return (_.first, _.second) # TODO teach Cython to coerce pair[X,Y] -> (X,Y)
bint _chanrecv__pyexc(_chan *ch, void *prx) except +topyexc:
return _chanrecv_(ch, prx)
void chanclose_pyexc(chan[pPyObject] ch) except +topyexc:
ch.close()
void _chanclose_pyexc(_chan *ch) except +topyexc:
_chanclose(ch)
unsigned chanlen_pyexc(chan[pPyObject] ch) except +topyexc:
return ch.len()
unsigned _chanlen_pyexc(_chan *ch) except +topyexc:
return _chanlen(ch)
int _chanselect_pyexc(const _selcase *casev, int casec) except +topyexc:
return _chanselect(casev, casec)
......
......@@ -36,13 +36,13 @@ cdef extern from "golang/libgolang.h" namespace "golang" nogil:
# pylen_{recv,send}q returns len(_chan._{recv,send}q)
def pylen_recvq(pychan pych not None): # -> int
if pych.ch == nil:
if pych._ch == NULL:
raise AssertionError('len(.recvq) on nil channel')
return _tchanrecvqlen(pych.ch._rawchan())
return _tchanrecvqlen(pych._ch)
def pylen_sendq(pychan pych not None): # -> int
if pych.ch == nil:
if pych._ch == NULL:
raise AssertionError('len(.sendq) on nil channel')
return _tchansendqlen(pych.ch._rawchan())
return _tchansendqlen(pych._ch)
# runtime/libgolang_test.cpp
cdef extern from *:
......@@ -68,7 +68,7 @@ def pywaitBlocked(pychanop):
pypanic("wait blocked: unexpected chan method: %r" % (pychanop,))
with nogil:
waitBlocked(pych.ch._rawchan(), nrecv, nsend)
waitBlocked(pych._ch, nrecv, nsend)
# `with pypanicWhenBlocked` hooks into libgolang _blockforever to raise panic with
......
......@@ -287,6 +287,7 @@ static inline void go(F /*std::function<void(Argv...)>*/ f, Argv... argv) {
template<typename T> class chan;
template<typename T> static chan<T> makechan(unsigned size=0);
template<typename T> static chan<T> _wrapchan(_chan *_ch);
// chan<T> provides type-safe wrapper over _chan.
//
......@@ -299,6 +300,7 @@ class chan {
public:
inline chan() { _ch = NULL; } // nil channel if not explicitly initialized
friend chan<T> makechan<T>(unsigned size);
friend chan<T> _wrapchan<T>(_chan *_ch);
inline ~chan() { _chanxdecref(_ch); _ch = NULL; }
// = nil
......@@ -361,14 +363,30 @@ public:
inline _chan *_rawchan() const { return _ch; }
};
// _elemsize<T> returns element size for chan<T>.
template<typename T> static inline
unsigned _elemsize() {
return std::is_empty<T>::value
? 0 // eg struct{} for which sizeof() gives 1 - *not* 0
: sizeof(T);
}
// makechan<T> makes new chan<T> with capacity=size.
template<typename T> static inline
chan<T> makechan(unsigned size) {
chan<T> ch;
unsigned elemsize = std::is_empty<T>::value
? 0 // eg struct{} for which sizeof() gives 1 - *not* 0
: sizeof(T);
ch._ch = _makechan(elemsize, size);
ch._ch = _makechan(_elemsize<T>(), size);
return ch;
}
// _wrapchan<T> wraps raw channel with chan<T>.
// raw channel must be either NULL or its element size must correspond to T.
LIBGOLANG_API void __wrapchan(_chan *_ch, unsigned elemsize);
template<typename T> static inline
chan<T> _wrapchan(_chan *_ch) {
chan<T> ch;
__wrapchan(_ch, _elemsize<T>());
ch._ch = _ch;
return ch;
}
......
......@@ -439,6 +439,15 @@ _chan *_makechan(unsigned elemsize, unsigned size) {
return ch;
}
// __wrapchan serves _wrapchan<T>.
void __wrapchan(_chan *ch, unsigned elemsize) {
if (ch == NULL)
return; // nil, no elemsize checking
if (ch->_elemsize != elemsize)
panic("wrapchan: elemsize mismatch");
_chanxincref(ch);
}
// _chanxincref increments reference counter of the channel.
//
// it is noop if ch=nil.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment