Commit c6bb9eb3 authored by Kirill Smelkov's avatar Kirill Smelkov

golang: Fix race in chan._tryrecv

For buffered channel _tryrecv, on success, was unlocking ch._mu too
early - before accessing ch._dataq with ch._dataq.append().

Without the fix, newly added test breaks as e.g.

    golang/golang_test.py::test_chan_buf_recv_vs_tryrecv_race Exception in thread Thread-3:
    Traceback (most recent call last):
      File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
        self.run()
      File "/usr/lib/python2.7/threading.py", line 754, in run
        self.__target(*self.__args, **self.__kwargs)
      File "/home/kirr/src/tools/go/pygolang-master/golang/golang_test.py", line 317, in _
        assert (_, _rx) == (1, None), ('i%d' % i)
    AssertionError: i30
    assert (0, None) == (1, None)
      At index 0 diff: 0 != 1
      Full diff:
      - (0, None)
      ?  ^
      + (1, None)
      ?  ^
parent eb8a1fef
...@@ -433,10 +433,12 @@ class chan(object): ...@@ -433,10 +433,12 @@ class chan(object):
# wakeup a blocked writer, if there is any # wakeup a blocked writer, if there is any
send = _dequeWaiter(self._sendq) send = _dequeWaiter(self._sendq)
self._mu.release()
if send is not None: if send is not None:
self._dataq.append(send.obj) self._dataq.append(send.obj)
self._mu.release()
send.wakeup(True) send.wakeup(True)
else:
self._mu.release()
return (rx, True), True return (rx, True), True
......
...@@ -261,6 +261,68 @@ def test_chan_buf_send_vs_tryrecv_race(): ...@@ -261,6 +261,68 @@ def test_chan_buf_send_vs_tryrecv_race():
for i in range(3): for i in range(3):
done.recv() done.recv()
# test for buffered chan bug when ch._mu was released too early in _tryrecv.
def test_chan_buf_recv_vs_tryrecv_race():
# (see test_chan_buf_send_vs_tryrecv_race for similar problem description)
#
# T1(send) T2(recv) T3(_trysend)
#
# send(blocked)
#
# ch.mu.lock
# ch.dataq.popleft()
# send = _dequeWaiter(ch._sendq)
# ch.mu.unlock()
#
# ch.mu.lock
# len(ch.dataq) == 0 -> ok to append
#
# # erroneously succeeds sending while
# # it must not
# ch.dataq.append(x)
#
# ch.dataq.append(send.obj)
ch = chan(1) # buffered
done = chan()
N = 1000
# T1: send(blocked)
def _():
for i in range(1 + N):
ch.send(i)
done.send(1)
go(_)
trysend_ctl = chan() # recv <-> _trysend sync
# T2: recv after send is blocked -> _tryrecv succeeds
def _():
for i in range(N):
waitBlocked(ch.send) # ch.send() ^^^ entered ch._sendq
assert len(ch) == 1 # and 1 element was already buffered
trysend_ctl.send('start') # signal _trysend to start
assert ch.recv() == i
assert trysend_ctl.recv() == 'done' # wait _trysend to finish
done.send(1)
go(_)
# T3: _trysend running in parallel to _tryrecv
def _():
for i in range(N):
assert trysend_ctl.recv() == 'start'
_, _rx = select(
(ch.send, 'i%d' % i), # 0
default, # 1
)
assert (_, _rx) == (1, None), ('i%d' % i)
trysend_ctl.send('done')
done.send(1)
go(_)
for i in range(3):
done.recv()
# benchmark sync chan send/recv. # benchmark sync chan send/recv.
def bench_chan(b): def bench_chan(b):
ch = chan() ch = chan()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment