Commit 47111d3e authored by Kirill Smelkov's avatar Kirill Smelkov

libgolang: Teach select to accept inplace tx data

Currently select, via _selcase, requires users to provide pointers to tx
and rx buffers. However if element type itself can fit into a pointer
word, we can put the element directly into _selcase and mark the case
with a flag, that it contains inplace data instead of referring to
external storage. This will be helpful in upcoming patch where we'll
teach pychan to work with several element types, not only pyobject
elements.

This patch does careful introduction of _selcase.flags - in such a way
that the size of _selcase stays the same as it was before by using
bitfields. The .ptxrx pointer is unioned with newly introduced inplace
uint64 .itxrx data, which is used by select instead of .ptxrx if the
flag is set. The usage of uint64 should not increase _selcase size on
64-bit platforms.

Then _selcase.ptx() and .prx() accessors are adapted accordingly and
the rest of the changes are corresponding test and
_chanselect2<onstack=false> adaptation.

This functionality is kind of low-level and is not exposed via any
_selsend() or chan.sends() API changes. Whenever inplace tx should be
used, the case should be prepared either completely manually, or with
e.g. first calling _selsend() and then manually changing .flags and
.itxrx.  Added test serves as the example on how to do it.

Inplace rx is currently forbidden - because supporting that would require
to drop const from casev select argument. However in the future, for
symmetry, we might want to support that as well.

P.S.

Since write to selcase.itxrx requires casting pointers e.g. like this:

	*(int *)&sel[0].itxrx = 12345;

it breaks C99 strict aliasing and by default compiler can generate bad
code on such pattern. To the problem we adapt the build system
to default compiler to no-strict-aliasing (as many other projects do,
e.g. Linux kernel) with the idea that in many cases where strict
aliasing was intended to help it actually does not, because e.g. pointer
types are the same, while explicitly marking pointers with `restrict`
keyword does help indeed.

Nothing new for Python2 here, as it is using -fno-strict-aliasing by
itself. However Python3 is compiling without -fno-strict-aliasing:
https://python.org/dev/peps/pep-3123 .
parent 2590e9a7
...@@ -46,6 +46,7 @@ In addition to Cython/nogil API, golang.pyx provides runtime for golang.py: ...@@ -46,6 +46,7 @@ In addition to Cython/nogil API, golang.pyx provides runtime for golang.py:
from libcpp cimport nullptr_t, nullptr as nil from libcpp cimport nullptr_t, nullptr as nil
from libcpp.utility cimport pair from libcpp.utility cimport pair
from libc.stdint cimport uint64_t
cdef extern from *: cdef extern from *:
ctypedef bint cbool "bool" ctypedef bint cbool "bool"
...@@ -103,10 +104,15 @@ cdef extern from "golang/libgolang.h" namespace "golang" nogil: ...@@ -103,10 +104,15 @@ cdef extern from "golang/libgolang.h" namespace "golang" nogil:
_CHANSEND _CHANSEND
_CHANRECV _CHANRECV
_DEFAULT _DEFAULT
enum _selflags:
_INPLACE_DATA
cppclass _selcase: cppclass _selcase:
_chanop op _chan *ch
void *ptxrx _chanop op
cbool *rxok unsigned flags
void *ptxrx
uint64_t itxrx
cbool *rxok
const void *ptx() const const void *ptx() const
void *prx() const void *prx() const
......
...@@ -176,6 +176,7 @@ cdef extern from * nogil: ...@@ -176,6 +176,7 @@ cdef extern from * nogil:
extern void _test_close_wakeup_all_vsrecv(); extern void _test_close_wakeup_all_vsrecv();
extern void _test_close_wakeup_all_vsselect(); extern void _test_close_wakeup_all_vsselect();
extern void _test_select_win_while_queue(); extern void _test_select_win_while_queue();
extern void _test_select_inplace();
""" """
void _test_chan_cpp_refcount() except +topyexc void _test_chan_cpp_refcount() except +topyexc
void _test_chan_cpp() except +topyexc void _test_chan_cpp() except +topyexc
...@@ -184,6 +185,7 @@ cdef extern from * nogil: ...@@ -184,6 +185,7 @@ cdef extern from * nogil:
void _test_close_wakeup_all_vsrecv() except +topyexc void _test_close_wakeup_all_vsrecv() except +topyexc
void _test_close_wakeup_all_vsselect() except +topyexc void _test_close_wakeup_all_vsselect() except +topyexc
void _test_select_win_while_queue() except +topyexc void _test_select_win_while_queue() except +topyexc
void _test_select_inplace() except +topyexc
def test_chan_cpp_refcount(): def test_chan_cpp_refcount():
with nogil: with nogil:
_test_chan_cpp_refcount() _test_chan_cpp_refcount()
...@@ -205,3 +207,6 @@ def test_close_wakeup_all_vsselect(): ...@@ -205,3 +207,6 @@ def test_close_wakeup_all_vsselect():
def test_select_win_while_queue(): def test_select_win_while_queue():
with nogil: with nogil:
_test_select_win_while_queue() _test_select_win_while_queue()
def test_select_inplace():
with nogil:
_test_select_inplace()
...@@ -150,12 +150,26 @@ enum _chanop { ...@@ -150,12 +150,26 @@ enum _chanop {
_DEFAULT = 2, _DEFAULT = 2,
}; };
enum _selflags {
// _INPLACE_DATA indicates that select case data is stored in
// _selcase.itxrx instead of in *_selcase.ptxrx .
// XXX can be used only for send for now. In the future, for symmetry, we
// might want to allow _INPLACE_DATA for recv too.
_INPLACE_DATA = 1,
};
// _selcase represents one select case. // _selcase represents one select case.
typedef struct _selcase { typedef struct _selcase {
_chan *ch; // channel _chan *ch; // channel
enum _chanop op; // chansend/chanrecv/default enum _chanop op : 8; // chansend/chanrecv/default
void *ptxrx; // chansend: ptx; chanrecv: prx enum _selflags flags : 8; // e.g. _INPLACE_DATA
bool *rxok; // chanrecv: where to save ok if !NULL; otherwise not used unsigned :16;
union {
void *ptxrx; // chansend: ptx; chanrecv: prx
uint64_t itxrx; // used instead of .ptxrx if .flags&_INPLACE_DATA != 0
};
bool *rxok; // chanrecv: where to save ok if !NULL; otherwise not used
#ifdef __cplusplus #ifdef __cplusplus
// ptx returns pointer to data to send for this case. // ptx returns pointer to data to send for this case.
...@@ -176,6 +190,7 @@ _selcase _selsend(_chan *ch, const void *ptx) { ...@@ -176,6 +190,7 @@ _selcase _selsend(_chan *ch, const void *ptx) {
_selcase _ = { _selcase _ = {
.ch = ch, .ch = ch,
.op = _CHANSEND, .op = _CHANSEND,
.flags = (enum _selflags)0,
.ptxrx = (void *)ptx, .ptxrx = (void *)ptx,
.rxok = NULL, .rxok = NULL,
}; };
...@@ -188,6 +203,7 @@ _selcase _selrecv(_chan *ch, void *prx) { ...@@ -188,6 +203,7 @@ _selcase _selrecv(_chan *ch, void *prx) {
_selcase _ = { _selcase _ = {
.ch = ch, .ch = ch,
.op = _CHANRECV, .op = _CHANRECV,
.flags = (enum _selflags)0,
.ptxrx = prx, .ptxrx = prx,
.rxok = NULL, .rxok = NULL,
}; };
...@@ -200,6 +216,7 @@ _selcase _selrecv_(_chan *ch, void *prx, bool *pok) { ...@@ -200,6 +216,7 @@ _selcase _selrecv_(_chan *ch, void *prx, bool *pok) {
_selcase _ = { _selcase _ = {
.ch = ch, .ch = ch,
.op = _CHANRECV, .op = _CHANRECV,
.flags = (enum _selflags)0,
.ptxrx = prx, .ptxrx = prx,
.rxok = pok, .rxok = pok,
}; };
......
...@@ -150,10 +150,15 @@ def Extension(name, sources, **kw): ...@@ -150,10 +150,15 @@ def Extension(name, sources, **kw):
lang = kw.setdefault('language', 'c++') lang = kw.setdefault('language', 'c++')
# default to C++11 (chan[T] & co require C++11 features) # default to C++11 (chan[T] & co require C++11 features)
ccdefault = []
if lang == 'c++': if lang == 'c++':
ccextra = kw.get('extra_compile_args', [])[:] ccdefault.append('-std=c++11')
ccextra.insert(0, '-std=c++11') # if another -std=... was already there - it will override us # default to no strict-aliasing
kw['extra_compile_args'] = ccextra ccdefault.append('-fno-strict-aliasing')
_ = kw.get('extra_compile_args', [])[:]
_[0:0] = ccdefault # if another e.g. -std=... was already there -
kw['extra_compile_args'] = _ # - it will override us
# some depends to workaround a bit lack of proper dependency tracking in # some depends to workaround a bit lack of proper dependency tracking in
# setuptools/distutils. # setuptools/distutils.
......
...@@ -859,6 +859,7 @@ void _chan::_dataq_popleft(void *prx) { ...@@ -859,6 +859,7 @@ void _chan::_dataq_popleft(void *prx) {
const _selcase _default = { const _selcase _default = {
.ch = NULL, .ch = NULL,
.op = _DEFAULT, .op = _DEFAULT,
.flags = (_selflags)0,
.ptxrx = NULL, .ptxrx = NULL,
.rxok = NULL, .rxok = NULL,
}; };
...@@ -867,13 +868,15 @@ const void *_selcase::ptx() const { ...@@ -867,13 +868,15 @@ const void *_selcase::ptx() const {
const _selcase *cas = this; const _selcase *cas = this;
if (cas->op != _CHANSEND) if (cas->op != _CHANSEND)
panic("_selcase: ptx: op != send"); panic("_selcase: ptx: op != send");
return cas->ptxrx; return cas->flags & _INPLACE_DATA ? &cas->itxrx : cas->ptxrx;
} }
void *_selcase::prx() const { void *_selcase::prx() const {
const _selcase *cas = this; const _selcase *cas = this;
if (cas->op != _CHANRECV) if (cas->op != _CHANRECV)
panic("_selcase: prx: op != recv"); panic("_selcase: prx: op != recv");
if (cas->flags & _INPLACE_DATA)
panic("_selcase: prx: recv with inplace data");
return cas->ptxrx; return cas->ptxrx;
} }
...@@ -943,6 +946,12 @@ int _chanselect(const _selcase *casev, int casec) { ...@@ -943,6 +946,12 @@ int _chanselect(const _selcase *casev, int casec) {
// recv // recv
else if (cas->op == _CHANRECV) { else if (cas->op == _CHANRECV) {
if (ch != NULL) { // nil chan is never ready if (ch != NULL) { // nil chan is never ready
// recv into inplace data is not supported
// ( in the future we might want to support it for symmetry with
// send, but it will requre to drop const from casev )
if (cas->flags & _INPLACE_DATA)
panic("select: recv into inplace data");
ch->_mu.lock(); ch->_mu.lock();
if (1) { if (1) {
bool ok, done = ch->_tryrecv(cas->prx(), &ok); bool ok, done = ch->_tryrecv(cas->prx(), &ok);
...@@ -1000,7 +1009,7 @@ template<> int _chanselect2</*onstack=*/false>(const _selcase *casev, int casec, ...@@ -1000,7 +1009,7 @@ template<> int _chanselect2</*onstack=*/false>(const _selcase *casev, int casec,
unsigned rxmax=0, txtotal=0; unsigned rxmax=0, txtotal=0;
// reallocate chan .tx / .rx to heap; adjust casev // reallocate chan .tx / .rx to heap; adjust casev
// XXX avoid doing this if all .tx and .rx are on heap? // XXX avoid doing this if all .tx and .rx are on heap or have inplace data?
unique_ptr<_selcase[]> casev_onheap (new _selcase[casec]); unique_ptr<_selcase[]> casev_onheap (new _selcase[casec]);
for (i = 0; i < casec; i++) { for (i = 0; i < casec; i++) {
const _selcase *cas = &casev[i]; const _selcase *cas = &casev[i];
...@@ -1008,7 +1017,8 @@ template<> int _chanselect2</*onstack=*/false>(const _selcase *casev, int casec, ...@@ -1008,7 +1017,8 @@ template<> int _chanselect2</*onstack=*/false>(const _selcase *casev, int casec,
if (cas->ch == NULL) // nil chan if (cas->ch == NULL) // nil chan
continue; continue;
if (cas->op == _CHANSEND) { if (cas->op == _CHANSEND) {
txtotal += cas->ch->_elemsize; if (!(cas->flags & _INPLACE_DATA))
txtotal += cas->ch->_elemsize;
} }
else if (cas->op == _CHANRECV) { else if (cas->op == _CHANRECV) {
rxmax = max(rxmax, cas->ch->_elemsize); rxmax = max(rxmax, cas->ch->_elemsize);
...@@ -1032,9 +1042,11 @@ template<> int _chanselect2</*onstack=*/false>(const _selcase *casev, int casec, ...@@ -1032,9 +1042,11 @@ template<> int _chanselect2</*onstack=*/false>(const _selcase *casev, int casec,
if (cas->ch == NULL) // nil chan if (cas->ch == NULL) // nil chan
continue; continue;
if (cas->op == _CHANSEND) { if (cas->op == _CHANSEND) {
memcpy(ptx, cas->ptxrx, cas->ch->_elemsize); if (!(cas->flags & _INPLACE_DATA)) {
cas->ptxrx = ptx; memcpy(ptx, cas->ptxrx, cas->ch->_elemsize);
ptx += cas->ch->_elemsize; cas->ptxrx = ptx;
ptx += cas->ch->_elemsize;
}
} }
else if (cas->op == _CHANRECV) { else if (cas->op == _CHANRECV) {
cas->ptxrx = rxtxdata; cas->ptxrx = rxtxdata;
......
...@@ -391,3 +391,73 @@ void _test_select_win_while_queue() { ...@@ -391,3 +391,73 @@ void _test_select_win_while_queue() {
for (i=0; i<N; i++) for (i=0; i<N; i++)
__test_select_win_while_queue(); __test_select_win_while_queue();
} }
// verify select behaviour with _INPLACE_DATA cases.
void _test_select_inplace() {
auto ch = makechan<int>();
int i;
// inplace tx
go([ch]() {
_selcase sel[1];
sel[0] = ch.sends(NULL);
*(int *)&sel[0].itxrx = 12345;
sel[0].flags = _INPLACE_DATA;
int _ = select(sel);
ASSERT(_ == 0);
});
i = ch.recv();
ASSERT(i == 12345);
// inplace rx - currently forbidden to keep casev const.
_selcase sel[1];
sel[0] = ch.recvs();
sel[0].flags = _INPLACE_DATA;
const char *err = NULL;
try {
select(sel);
} catch (...) {
err = recover();
}
ASSERT(err != NULL);
ASSERT(!strcmp(err, "select: recv into inplace data"));
// _selcase ptx/prx
_selcase cas = _default;
err = NULL;
try {
cas.ptx();
} catch (...) {
err = recover();
}
ASSERT(err != NULL);
ASSERT(!strcmp(err, "_selcase: ptx: op != send"));
err = NULL;
try {
cas.prx();
} catch (...) {
err = recover();
}
ASSERT(err != NULL);
ASSERT(!strcmp(err, "_selcase: prx: op != recv"));
cas = ch.sends(&i);
ASSERT(cas.ptx() == &i);
cas.flags = _INPLACE_DATA;
ASSERT(cas.ptx() == &cas.itxrx);
cas = ch.recvs(&i);
ASSERT(cas.prx() == &i);
cas.flags = _INPLACE_DATA;
err = NULL;
try {
cas.prx();
} catch (...) {
err = recover();
}
ASSERT(err != NULL);
ASSERT(!strcmp(err, "_selcase: prx: recv with inplace data"));
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment