Commit 5aa1e899 authored by Kirill Smelkov's avatar Kirill Smelkov

libgolang: Fix select crash while accessing destroyed channel upon wakeup

Similarly to situation described in dcf4ebd1 (libgolang: Fix chan.close
to dequeue subscribers atomically), select can be also accessing a
channel object at the time of wakeup when that channel could be already
destroyed: select queues waiters to channels recv/send queues and upon
wakeup needs to dequeue them. This requires locking channels, not to
mention that a channel destroy with non-empty subscribers queue will
trigger bug panic.

Contrary to the fix for recv, we cannot rework select not to access
channel objects after wakeup, because for select upon wakeup all queued
channels could be already destroyed, not only selected one. Thus the fix
here is to incref/decref the channels for the duration where we need to
access them.

The bug was not caught by existing tests and was noted while doing
libgolang.cpp review for concurrency issues. With added test (hereby fix
is served by a bit amended _test_close_wakeup_all) the bug, if not
fixed, renders itself as e.g. the following under TSAN:

WARNING: ThreadSanitizer: data race (pid=4421)
  Write of size 8 at 0x7b1400000650 by thread T9:
    #0 free ../../../../src/libsanitizer/tsan/tsan_interceptors.cc:649 (libtsan.so.0+0x2b46a)
    #1 free ../../../../src/libsanitizer/tsan/tsan_interceptors.cc:643 (libtsan.so.0+0x2b46a)
    #2 golang::_chan::decref() golang/runtime/libgolang.cpp:479 (liblibgolang.so.0.1+0x4822)
    #3 _chanxdecref golang/runtime/libgolang.cpp:461 (liblibgolang.so.0.1+0x487a)
    #4 golang::chan<int>::operator=(decltype(nullptr)) golang/libgolang.h:296 (_golang_test.so+0x14cf9)
    #5 operator() golang/runtime/libgolang_test.cpp:304 (_golang_test.so+0x14cf9)
    #6 __invoke_impl<void, __test_close_wakeup_all(bool)::<lambda()>&> /usr/include/c++/8/bits/invoke.h:60 (_golang_test.so+0x14cf9)
    #7 __invoke<__test_close_wakeup_all(bool)::<lambda()>&> /usr/include/c++/8/bits/invoke.h:95 (_golang_test.so+0x14cf9)
    #8 __call<void> /usr/include/c++/8/functional:400 (_golang_test.so+0x14cf9)
    #9 operator()<> /usr/include/c++/8/functional:484 (_golang_test.so+0x14cf9)
    #10 _M_invoke /usr/include/c++/8/bits/std_function.h:297 (_golang_test.so+0x14cf9)
    #11 std::function<void ()>::operator()() const /usr/include/c++/8/bits/std_function.h:687 (_golang_test.so+0x1850c)
    #12 operator() golang/libgolang.h:273 (_golang_test.so+0x1843a)
    #13 _FUN golang/libgolang.h:271 (_golang_test.so+0x1843a)
    #14 <null> <null> (python2.7+0x1929e3)

  Previous read of size 8 at 0x7b1400000650 by thread T10:
    #0 golang::Sema::acquire() golang/runtime/libgolang.cpp:168 (liblibgolang.so.0.1+0x413a)
    #1 golang::Mutex::lock() golang/runtime/libgolang.cpp:179 (liblibgolang.so.0.1+0x424a)
    #2 operator() golang/runtime/libgolang.cpp:1044 (liblibgolang.so.0.1+0x424a)
    #3 _M_invoke /usr/include/c++/8/bits/std_function.h:297 (liblibgolang.so.0.1+0x424a)
    #4 std::function<void ()>::operator()() const /usr/include/c++/8/bits/std_function.h:687 (liblibgolang.so.0.1+0x5f07)
    #5 golang::_deferred::~_deferred() golang/runtime/libgolang.cpp:215 (liblibgolang.so.0.1+0x5f07)
    #6 __chanselect2 golang/runtime/libgolang.cpp:1044 (liblibgolang.so.0.1+0x5f07)
    #7 _chanselect2<true> golang/runtime/libgolang.cpp:968 (liblibgolang.so.0.1+0x6665)
    #8 _chanselect golang/runtime/libgolang.cpp:963 (liblibgolang.so.0.1+0x6665)
    #9 select golang/libgolang.h:386 (_golang_test.so+0x14fc1)
    #10 operator() golang/runtime/libgolang_test.cpp:320 (_golang_test.so+0x14fc1)
    #11 __invoke_impl<void, __test_close_wakeup_all(bool)::<lambda()>&> /usr/include/c++/8/bits/invoke.h:60 (_golang_test.so+0x14fc1)
    #12 __invoke<__test_close_wakeup_all(bool)::<lambda()>&> /usr/include/c++/8/bits/invoke.h:95 (_golang_test.so+0x14fc1)
    #13 __call<void> /usr/include/c++/8/functional:400 (_golang_test.so+0x14fc1)
    #14 operator()<> /usr/include/c++/8/functional:484 (_golang_test.so+0x14fc1)
    #15 _M_invoke /usr/include/c++/8/bits/std_function.h:297 (_golang_test.so+0x14fc1)
    #16 std::function<void ()>::operator()() const /usr/include/c++/8/bits/std_function.h:687 (_golang_test.so+0x1850c)
    #17 operator() golang/libgolang.h:273 (_golang_test.so+0x183da)
    #18 _FUN golang/libgolang.h:271 (_golang_test.so+0x183da)
    #19 <null> <null> (python2.7+0x1929e3)

  Thread T9 (tid=4661, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors.cc:915 (libtsan.so.0+0x2be1b)
    #1 PyThread_start_new_thread <null> (python2.7+0x19299f)
    #2 _taskgo golang/runtime/libgolang.cpp:123 (liblibgolang.so.0.1+0x3f98)
    #3 go<__test_close_wakeup_all(bool)::<lambda()> > golang/libgolang.h:271 (_golang_test.so+0x16c94)
    #4 __test_close_wakeup_all(bool) golang/runtime/libgolang_test.cpp:298 (_golang_test.so+0x16c94)
    #5 _test_close_wakeup_all_vsselect() golang/runtime/libgolang_test.cpp:342 (_golang_test.so+0x16f64)
    #6 __pyx_pf_6golang_12_golang_test_24test_close_wakeup_all_vsselect golang/_golang_test.cpp:4013 (_golang_test.so+0xd92a)
    #7 __pyx_pw_6golang_12_golang_test_25test_close_wakeup_all_vsselect golang/_golang_test.cpp:3978 (_golang_test.so+0xd92a)
    #8 PyEval_EvalFrameEx <null> (python2.7+0xf68b4)

  Thread T10 (tid=4662, running) created by main thread at:
    #0 pthread_create ../../../../src/libsanitizer/tsan/tsan_interceptors.cc:915 (libtsan.so.0+0x2be1b)
    #1 PyThread_start_new_thread <null> (python2.7+0x19299f)
    #2 _taskgo golang/runtime/libgolang.cpp:123 (liblibgolang.so.0.1+0x3f98)
    #3 go<__test_close_wakeup_all(bool)::<lambda()> > golang/libgolang.h:271 (_golang_test.so+0x16d96)
    #4 __test_close_wakeup_all(bool) golang/runtime/libgolang_test.cpp:315 (_golang_test.so+0x16d96)
    #5 _test_close_wakeup_all_vsselect() golang/runtime/libgolang_test.cpp:342 (_golang_test.so+0x16f64)
    #6 __pyx_pf_6golang_12_golang_test_24test_close_wakeup_all_vsselect golang/_golang_test.cpp:4013 (_golang_test.so+0xd92a)
    #7 __pyx_pw_6golang_12_golang_test_25test_close_wakeup_all_vsselect golang/_golang_test.cpp:3978 (_golang_test.so+0xd92a)
    #8 PyEval_EvalFrameEx <null> (python2.7+0xf68b4)

and reliably crashes under regular builds.
parent 65c43848
......@@ -173,14 +173,16 @@ cdef extern from * nogil:
extern void _test_chan_cpp();
extern void _test_chan_vs_stackdeadwhileparked();
extern void _test_go_cpp();
extern void _test_close_wakeup_all();
extern void _test_close_wakeup_all_vsrecv();
extern void _test_close_wakeup_all_vsselect();
extern void _test_select_win_while_queue();
"""
void _test_chan_cpp_refcount() except +topyexc
void _test_chan_cpp() except +topyexc
void _test_chan_vs_stackdeadwhileparked() except +topyexc
void _test_go_cpp() except +topyexc
void _test_close_wakeup_all() except +topyexc
void _test_close_wakeup_all_vsrecv() except +topyexc
void _test_close_wakeup_all_vsselect() except +topyexc
void _test_select_win_while_queue() except +topyexc
def test_chan_cpp_refcount():
with nogil:
......@@ -194,9 +196,12 @@ def test_chan_vs_stackdeadwhileparked():
def test_go_cpp():
with nogil:
_test_go_cpp()
def test_close_wakeup_all():
def test_close_wakeup_all_vsrecv():
with nogil:
_test_close_wakeup_all()
_test_close_wakeup_all_vsrecv()
def test_close_wakeup_all_vsselect():
with nogil:
_test_close_wakeup_all_vsselect()
def test_select_win_while_queue():
with nogil:
_test_select_win_while_queue()
......@@ -944,6 +944,18 @@ int _chanselect(const _selcase *casev, int casec) {
_blockforever();
// second pass: subscribe and wait on all rx/tx cases
// keep all channels alive while _chanselect2 runs.
// we need to keep them alive because upon wakeup:
// - __chanselect2 needs to unregister all registered waiters from all channels,
// - _chanselect2<onstack=false> needs to access casev[selected].ch->_elemsize.
for (int i=0; i < casec; i++)
_chanxincref(casev[i].ch);
defer([&]() {
for (int i=0; i < casec; i++)
_chanxdecref(casev[i].ch);
});
return (_runtime->flags & STACK_DEAD_WHILE_PARKED) \
? _chanselect2</*onstack=*/false>(casev, casec, nv)
: _chanselect2</*onstack=*/true> (casev, casec, nv);
......@@ -1007,6 +1019,8 @@ template<> int _chanselect2</*onstack=*/false>(const _selcase *casev, int casec,
int selected = __chanselect2(casev_onheap.get(), casec, nv, g.get());
// copy data back to original rx location.
// NOTE it is ok to access cas->ch because we pin all channels to be alive
// while _chanselect2 runs.
_selcase *cas = &casev_onheap[selected];
if (cas->op == _CHANRECV) {
const _selcase *cas0 = &casev[selected];
......@@ -1028,7 +1042,7 @@ static int __chanselect2(const _selcase *casev, int casec, const vector<int>& nv
defer([&]() {
for (int i = 0; i < waitc; i++) {
_RecvSendWaiting *w = &waitv[i];
w->chan->_mu.lock();
w->chan->_mu.lock(); // NOTE we pin all channels alive before entering _chanselect2
list_del_init(&w->in_rxtxq); // thanks to _init used in _dequeWaiter
w->chan->_mu.unlock(); // it is ok to del twice even if w was already removed
}
......
......@@ -285,9 +285,9 @@ static void _work(int i, chan<structZ> done) {
// verify that chan close wakes up all consumers atomically - in other words
// that it is safe to e.g. destroy the channel after recv wakeup caused by close.
//
// this also verifies that recv, upon wakeup, does not use channel
// this also verifies that recv and select, upon wakeup, do not use channel
// object when it could be already destroyed.
void _test_close_wakeup_all() {
void __test_close_wakeup_all(bool vs_select) {
int i, N = 100;
auto ch = makechan<int>();
auto _ch = ch._rawchan();
......@@ -295,11 +295,12 @@ void _test_close_wakeup_all() {
// ch.recv subscriber that destroys ch right after wakeup.
// ch ownership is transferred to this goroutine.
go([ch, done]() mutable {
go([ch, done, vs_select]() mutable {
ch.recv();
// destroy ch _before_ signalling on done. This should be safe to do
// as other workers vvv don't use ch after wakeup from ch.recv().
ASSERT(_chanrefcnt(ch._rawchan()) == 1);
if (!vs_select)
ASSERT(_chanrefcnt(ch._rawchan()) == 1);
ch = NULL;
done.send(structZ{});
});
......@@ -308,11 +309,18 @@ void _test_close_wakeup_all() {
ch = NULL;
ASSERT(_chanrefcnt(_ch) == 1);
// many other ch.recv subscribers queued to ch.recvq
// many other ch.recv or select({ch.recv}) subscribers queued to ch.recvq
// their lifetime is subset of ^^^ subscriber lifetime; they don't own a ch reference.
for (i=0; i < N; i++) {
go([_ch, done]() {
_chanrecv(_ch, NULL);
go([_ch, done, vs_select]() {
if (!vs_select) {
_chanrecv(_ch, NULL);
} else {
int rx;
select({
_selrecv(_ch, &rx)
});
}
done.send(structZ{});
});
}
......@@ -322,13 +330,15 @@ void _test_close_wakeup_all() {
// ch.close() must wake up all workers atomically. If it is not the case,
// this will reliably (N >> 1) trigger assert in chan decref on len(ch.recvq) == 0.
ASSERT(_chanrefcnt(_ch) == 1);
ASSERT(_chanrefcnt(_ch) == (vs_select ? 1+N : 1));
_chanclose(_ch);
// wait till all workers finish
for (i=0; i < 1+N; i++)
done.recv();
}
void _test_close_wakeup_all_vsrecv() { __test_close_wakeup_all(/*vs_select=*/false); }
void _test_close_wakeup_all_vsselect() { __test_close_wakeup_all(/*vs_select=*/true); }
// verify that select correctly handles situation where a case that is already
// queued wins while select queues other cases.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment