Commit f0b592b4 authored by Kirill Smelkov's avatar Kirill Smelkov

select: Don't let both a queued and a tried cases win at the same time

While the second phase of select is running we queue send/recv cases to
corresponding channels. At some point - when some of the cases are
already queued - a peer goroutine might try to send/recv on that
channel. And it will succeed because a waiter was queued to the channel.

At the same time select is continuing its enqueue loop and before enqueuing
to a channel it tries to send/recv there. If that channel became just ready
(i.e. just after select poll phase) the try to send/recv will succeed. This
means that actually 2 select cases could be executed at the same time.

Fix it by carefully checking whether some case already won before trying
to send/recv on a channel.

This fixes the test failures that were demonstrated by previous 2 patches.
parent 2fc6797c
......@@ -130,7 +130,7 @@ class _WaitGroup(object):
# ._waitv [] of _{Send|Recv}Waiting
# ._sema semaphore used for wakeup
#
# ._mu lock
# ._mu lock NOTE ∀ chan order is always: chan._mu > ._mu
#
# on wakeup: sender|receiver -> group:
# .which _{Send|Recv}Waiting instance which succeeded waiting.
......@@ -148,7 +148,7 @@ class _WaitGroup(object):
#
# -> ok: true if won, false - if not.
def try_to_win(self, waiter):
with self._mu: # NOTE order is always: waiter.chan._mu > ._mu
with self._mu:
if self.which is not None:
return False
else:
......@@ -480,12 +480,39 @@ def select(*casev):
# second pass: subscribe and wait on all rx/tx cases
g = _WaitGroup()
# selected returns what was selected in g.
# the return signature is the one of select.
def selected():
g.wait()
sel = g.which
if isinstance(sel, _SendWaiting):
if not sel.ok:
panic("send on closed channel")
return sel.sel_n, None
if isinstance(sel, _RecvWaiting):
rx_ = sel.rx_
if not sel.sel_commaok:
rx, ok = rx_
rx_ = rx
return sel.sel_n, rx_
raise AssertionError("select: unreachable")
try:
for n, ch, tx in sendv:
ch._mu.acquire()
if 1:
with g._mu:
# a case that we previously queued already won
if g.which is not None:
ch._mu.release()
return selected()
ok = ch._trysend(tx)
if ok:
# don't let already queued cases win
g.which = "tx prepoll won" # !None
return n, None
w = _SendWaiting(g, ch, tx)
......@@ -495,9 +522,17 @@ def select(*casev):
for n, ch, commaok in recvv:
ch._mu.acquire()
if 1:
with g._mu:
# a case that we previously queued already won
if g.which is not None:
ch._mu.release()
return selected()
rx_, ok = ch._tryrecv()
if ok:
# don't let already queued cases win
g.which = "rx prepoll won" # !None
if not commaok:
rx, ok = rx_
rx_ = rx
......@@ -509,22 +544,7 @@ def select(*casev):
ch._recvq.append(w)
ch._mu.release()
g.wait()
sel = g.which
if isinstance(sel, _SendWaiting):
if not sel.ok:
panic("send on closed channel")
return sel.sel_n, None
if isinstance(sel, _RecvWaiting):
rx_ = sel.rx_
if not sel.sel_commaok:
rx, ok = rx_
rx_ = rx
return sel.sel_n, rx_
raise AssertionError("select: unreachable")
return selected()
finally:
# unsubscribe not-succeeded waiters
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment