Commit eb8a1fef authored by Kirill Smelkov's avatar Kirill Smelkov

golang: Fix race in chan._trysend

For buffered channel _trysend, on success, was unlocking ch._mu too
early - before accessing ch._dataq with ch._dataq.popleft().

Without the fix, newly added test breaks as e.g.

    golang/golang_test.py::test_chan_buf_send_vs_tryrecv_race Exception in thread Thread-3:
    Traceback (most recent call last):
      File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
        self.run()
      File "/usr/lib/python2.7/threading.py", line 754, in run
        self.__target(*self.__args, **self.__kwargs)
      File "/home/kirr/src/tools/go/pygolang-master/golang/golang_test.py", line 256, in _
        assert (_, _rx) == (1, None)
    AssertionError: assert (0, 209) == (1, None)
      At index 0 diff: 0 != 1
      Full diff:
      - (0, 209)
      + (1, None)

    Exception in thread Thread-2:
    Traceback (most recent call last):
      File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
        self.run()
      File "/usr/lib/python2.7/threading.py", line 754, in run
        self.__target(*self.__args, **self.__kwargs)
      File "/home/kirr/src/tools/go/pygolang-master/golang/golang_test.py", line 243, in _
        ch.send(i)
      File "/home/kirr/src/tools/go/pygolang-master/golang/__init__.py", line 340, in send
        ok = self._trysend(obj)
      File "/home/kirr/src/tools/go/pygolang-master/golang/__init__.py", line 417, in _trysend
        rx = self._dataq.popleft()
    IndexError: pop from an empty deque
parent cb5bfdd2
......@@ -412,10 +412,13 @@ class chan(object):
self._dataq.append(obj)
recv = _dequeWaiter(self._recvq)
self._mu.release()
if recv is not None:
rx = self._dataq.popleft()
self._mu.release()
recv.wakeup(rx, True)
else:
self._mu.release()
return True
# _tryrecv() -> rx_=(rx, ok), ok
......
......@@ -201,6 +201,65 @@ def test_chan():
assert w1() is None
assert w2() is None
# test for buffered chan bug when ch._mu was released too early in _trysend.
def test_chan_buf_send_vs_tryrecv_race():
# there was a bug when for buffered channel _trysend(ch) was releasing
# ch._mu before further popping element from ch._dataq. If there was
# another _tryrecv running concurrently to _trysend, that _tryrecv could
# pop the element and _trysend would in turn try to pop on empty ch._dataq
# leading to oops. The test tries to reproduce the following scenario:
#
# T1(recv) T2(send) T3(_tryrecv)
#
# recv(blocked)
#
# ch.mu.lock
# ch.dataq.append(x)
# ch.mu.unlock()
# ch.mu.lock
# ch.dataq.popleft()
#
# # oopses since T3 already
# # popped the value
# ch.dataq.popleft()
ch = chan(1) # buffered
done = chan()
N = 1000
# T1: recv(blocked)
def _():
for i in range(N):
assert ch.recv() == i
done.send(1)
go(_)
tryrecv_ctl = chan() # send <-> _tryrecv sync
# T2: send after recv is blocked -> _trysend succeeds
def _():
for i in range(N):
waitBlocked(ch.recv) # ch.recv() ^^^ entered ch._recvq
tryrecv_ctl.send('start') # signal _tryrecv to start
ch.send(i)
assert tryrecv_ctl.recv() == 'done' # wait _tryrecv to finish
done.send(1)
go(_)
# T3: _tryrecv running in parallel to _trysend
def _():
for i in range(N):
assert tryrecv_ctl.recv() == 'start'
_, _rx = select(
ch.recv, # 0
default, # 1
)
assert (_, _rx) == (1, None)
tryrecv_ctl.send('done')
done.send(1)
go(_)
for i in range(3):
done.recv()
# benchmark sync chan send/recv.
def bench_chan(b):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment