Commit 83259a1b authored by Kirill Smelkov's avatar Kirill Smelkov

golang: Move channels implementation from golang.py to golang.pyx

Plain code movement with just s/panic/pypanic/ as in golang.pyx panic is
aleady there and means semantically different thing. Moved code, even
though it lives in golang.pyx, is still Python code and requires Python
runtime and GIL. We'll be splitting channels implementation into nogil
world in the following patches.

Just plain movement to Cython brings the following speedup:

 (on i7@2.6GHz)

thread runtime:

    name             old time/op  new time/op  delta
    go               26.6µs ± 1%  21.7µs ± 1%  -18.54%  (p=0.000 n=10+10)
    chan             13.7µs ± 1%   9.9µs ± 4%  -27.80%  (p=0.000 n=10+10)
    select           29.3µs ± 2%  19.2µs ± 4%  -34.65%  (p=0.000 n=9+9)
    def              55.0ns ± 0%  58.0ns ± 0%   +5.45%  (p=0.000 n=10+10)
    func_def         44.0µs ± 1%  44.4µs ± 0%   +0.72%  (p=0.002 n=10+10)
    call             64.0ns ± 0%  63.0ns ± 0%   -1.56%  (p=0.002 n=8+10)
    func_call        1.09µs ± 1%  1.05µs ± 1%   -2.96%  (p=0.000 n=10+10)
    try_finally       139ns ± 2%   135ns ± 0%   -2.60%  (p=0.000 n=10+10)
    defer            2.36µs ± 1%  2.36µs ± 1%     ~     (p=0.617 n=10+10)
    workgroup_empty  58.1µs ± 1%  49.0µs ± 1%  -15.61%  (p=0.000 n=10+10)
    workgroup_raise  72.7µs ± 1%  62.6µs ± 1%  -13.88%  (p=0.000 n=10+10)

gevent runtime:

    name             old time/op  new time/op  delta
    go               28.6µs ± 0%  25.4µs ± 0%  -11.20%  (p=0.000 n=8+9)
    chan             15.8µs ± 1%  12.2µs ± 1%  -22.62%  (p=0.000 n=10+10)
    select           33.1µs ± 1%  23.3µs ± 2%  -29.60%  (p=0.000 n=10+10)
    def              55.0ns ± 0%  56.0ns ± 0%   +1.82%  (p=0.000 n=10+10)
    func_def         44.4µs ± 2%  43.0µs ± 1%   -3.06%  (p=0.000 n=10+9)
    call             64.0ns ± 2%  69.0ns ± 0%   +7.81%  (p=0.000 n=10+10)
    func_call        1.06µs ± 0%  1.06µs ± 1%     ~     (p=0.913 n=8+9)
    try_finally       136ns ± 0%   139ns ± 0%   +2.21%  (p=0.000 n=9+10)
    defer            2.29µs ± 1%  2.38µs ± 2%   +3.58%  (p=0.000 n=10+10)
    workgroup_empty  73.8µs ± 1%  70.5µs ± 1%   -4.48%  (p=0.000 n=10+10)
    workgroup_raise  94.1µs ± 0%  90.6µs ± 0%   -3.69%  (p=0.000 n=10+10)
parent f971a2a8
......@@ -36,12 +36,9 @@ __version__ = "0.0.2"
__all__ = ['go', 'chan', 'select', 'default', 'nilchan', 'defer', 'panic', 'recover', 'func', 'gimport']
from golang._gopath import gimport # make gimport available from golang
import inspect, threading, collections, random, sys
import inspect, sys
import decorator
import six
from golang._pycompat import im_class
# @func is a necessary decorator for functions for selected golang features to work.
#
......@@ -176,510 +173,9 @@ def defer(f):
from ._golang import \
pygo as go, \
chan, \
select, \
default, \
nilchan, \
_PanicError, \
pypanic as panic
# _RecvWaiting represents a receiver waiting on a chan.
class _RecvWaiting(object):
# .group _WaitGroup group of waiters this receiver is part of
# .chan chan channel receiver is waiting on
#
# on wakeup: sender|closer -> receiver:
# .rx_ rx_ for recv
def __init__(self, group, ch):
self.group = group
self.chan = ch
group.register(self)
# wakeup notifies waiting receiver that recv_ completed.
def wakeup(self, rx, ok):
self.rx_ = (rx, ok)
self.group.wakeup()
# _SendWaiting represents a sender waiting on a chan.
class _SendWaiting(object):
# .group _WaitGroup group of waiters this sender is part of
# .chan chan channel sender is waiting on
# .obj object that was passed to send
#
# on wakeup: receiver|closer -> sender:
# .ok bool whether send succeeded (it will not on close)
def __init__(self, group, ch, obj):
self.group = group
self.chan = ch
self.obj = obj
group.register(self)
# wakeup notifies waiting sender that send completed.
def wakeup(self, ok):
self.ok = ok
self.group.wakeup()
# _WaitGroup is a group of waiting senders and receivers.
#
# Only 1 waiter from the group can succeed waiting.
class _WaitGroup(object):
# ._waitv [] of _{Send|Recv}Waiting
# ._sema semaphore used for wakeup
#
# ._mu lock NOTE ∀ chan order is always: chan._mu > ._mu
#
# on wakeup: sender|receiver -> group:
# .which _{Send|Recv}Waiting instance which succeeded waiting.
def __init__(self):
self._waitv = []
self._sema = threading.Lock() # in python it is valid to release lock from another thread.
self._sema.acquire()
self._mu = threading.Lock()
self.which = None
def register(self, wait):
self._waitv.append(wait)
# try_to_win tries to win waiter after it was dequeued from a channel's {_send|_recv}q.
#
# -> ok: true if won, false - if not.
def try_to_win(self, waiter):
with self._mu:
if self.which is not None:
return False
else:
self.which = waiter
return True
# wait waits for winning case of group to complete.
def wait(self):
self._sema.acquire()
# wakeup wakes up the group.
#
# prior to wakeup try_to_win must have been called.
# in practice this means that waiters queued to chan.{_send|_recv}q must
# be dequeued with _dequeWaiter.
def wakeup(self):
assert self.which is not None
self._sema.release()
# dequeAll removes all registered waiters from their wait queues.
def dequeAll(self):
for w in self._waitv:
ch = w.chan
if isinstance(w, _SendWaiting):
queue = ch._sendq
else:
assert isinstance(w, _RecvWaiting)
queue = ch._recvq
with ch._mu:
try:
queue.remove(w)
except ValueError:
pass
# _dequeWaiter dequeues a send or recv waiter from a channel's _recvq or _sendq.
#
# the channel owning {_recv|_send}q must be locked.
def _dequeWaiter(queue):
while len(queue) > 0:
w = queue.popleft()
# if this waiter can win its group - return it.
# if not - someone else from its group already has won, and so we anyway have
# to remove the waiter from the queue.
if w.group.try_to_win(w):
return w
return None
# chan is a channel with Go semantic.
class chan(object):
# ._cap channel capacity
# ._mu lock
# ._dataq deque *: data buffer
# ._recvq deque _RecvWaiting: blocked receivers
# ._sendq deque _SendWaiting: blocked senders
# ._closed bool
def __init__(self, size=0):
self._cap = size
self._mu = threading.Lock()
self._dataq = collections.deque()
self._recvq = collections.deque()
self._sendq = collections.deque()
self._closed = False
# send sends object to a receiver.
#
# .send(obj)
def send(self, obj):
if self is nilchan:
_blockforever()
self._mu.acquire()
if 1:
ok = self._trysend(obj)
if ok:
return
g = _WaitGroup()
me = _SendWaiting(g, self, obj)
self._sendq.append(me)
self._mu.release()
g.wait()
assert g.which is me
if not me.ok:
panic("send on closed channel")
# recv_ is "comma-ok" version of recv.
#
# ok is true - if receive was delivered by a successful send.
# ok is false - if receive is due to channel being closed and empty.
#
# .recv_() -> (rx, ok)
def recv_(self):
if self is nilchan:
_blockforever()
self._mu.acquire()
if 1:
rx_, ok = self._tryrecv()
if ok:
return rx_
g = _WaitGroup()
me = _RecvWaiting(g, self)
self._recvq.append(me)
self._mu.release()
g.wait()
assert g.which is me
return me.rx_
# recv receives from the channel.
#
# .recv() -> rx
def recv(self):
rx, _ = self.recv_()
return rx
# _trysend(obj) -> ok
#
# must be called with ._mu held.
# if ok or panic - returns with ._mu released.
# if !ok - returns with ._mu still being held.
def _trysend(self, obj):
if self._closed:
self._mu.release()
panic("send on closed channel")
# synchronous channel
if self._cap == 0:
recv = _dequeWaiter(self._recvq)
if recv is None:
return False
self._mu.release()
recv.wakeup(obj, True)
return True
# buffered channel
else:
if len(self._dataq) >= self._cap:
return False
self._dataq.append(obj)
recv = _dequeWaiter(self._recvq)
if recv is not None:
rx = self._dataq.popleft()
self._mu.release()
recv.wakeup(rx, True)
else:
self._mu.release()
return True
# _tryrecv() -> rx_=(rx, ok), ok
#
# must be called with ._mu held.
# if ok or panic - returns with ._mu released.
# if !ok - returns with ._mu still being held.
def _tryrecv(self):
# buffered
if len(self._dataq) > 0:
rx = self._dataq.popleft()
# wakeup a blocked writer, if there is any
send = _dequeWaiter(self._sendq)
if send is not None:
self._dataq.append(send.obj)
self._mu.release()
send.wakeup(True)
else:
self._mu.release()
return (rx, True), True
# closed
if self._closed:
self._mu.release()
return (None, False), True
# sync | empty: there is waiting writer
send = _dequeWaiter(self._sendq)
if send is None:
return (None, False), False
self._mu.release()
rx = send.obj
send.wakeup(True)
return (rx, True), True
# close closes sending side of the channel.
def close(self):
if self is nilchan:
panic("close of nil channel")
recvv = []
sendv = []
with self._mu:
if self._closed:
panic("close of closed channel")
self._closed = True
# schedule: wake-up all readers
while 1:
recv = _dequeWaiter(self._recvq)
if recv is None:
break
recvv.append(recv)
# schedule: wake-up all writers (they will panic)
while 1:
send = _dequeWaiter(self._sendq)
if send is None:
break
sendv.append(send)
# perform scheduled wakeups outside of ._mu
for recv in recvv:
recv.wakeup(None, False)
for send in sendv:
send.wakeup(False)
def __len__(self):
return len(self._dataq)
def __repr__(self):
if self is nilchan:
return "nilchan"
else:
return super(chan, self).__repr__()
# nilchan is the nil channel.
#
# On nil channel: send/recv block forever; close panics.
nilchan = chan(None) # TODO -> <chan*>(NULL) after move to Cython
# default represents default case for select.
default = object()
# unbound chan.{send,recv,recv_}
_chan_send = chan.send
_chan_recv = chan.recv
_chan_recv_ = chan.recv_
if six.PY2:
# on py3 class.func gets the func; on py2 - unbound_method(func)
_chan_send = _chan_send.__func__
_chan_recv = _chan_recv.__func__
_chan_recv_ = _chan_recv_.__func__
# select executes one ready send or receive channel case.
#
# if no case is ready and default case was provided, select chooses default.
# if no case is ready and default was not provided, select blocks until one case becomes ready.
#
# returns: selected case number and receive info (None if send case was selected).
#
# example:
#
# _, _rx = select(
# ch1.recv, # 0
# ch2.recv_, # 1
# (ch2.send, obj2), # 2
# default, # 3
# )
# if _ == 0:
# # _rx is what was received from ch1
# ...
# if _ == 1:
# # _rx is (rx, ok) of what was received from ch2
# ...
# if _ == 2:
# # we know obj2 was sent to ch2
# ...
# if _ == 3:
# # default case
# ...
def select(*casev):
# select promise: if multiple cases are ready - one will be selected randomly
ncasev = list(enumerate(casev))
random.shuffle(ncasev)
# first pass: poll all cases and bail out in the end if default was provided
recvv = [] # [](n, ch, commaok)
sendv = [] # [](n, ch, tx)
ndefault = None
for (n, case) in ncasev:
# default: remember we have it
if case is default:
if ndefault is not None:
panic("select: multiple default")
ndefault = n
# send
elif isinstance(case, tuple):
send, tx = case
if im_class(send) is not chan:
panic("select: send on non-chan: %r" % (im_class(send),))
if send.__func__ is not _chan_send:
panic("select: send expected: %r" % (send,))
ch = send.__self__
if ch is not nilchan: # nil chan is never ready
ch._mu.acquire()
if 1:
ok = ch._trysend(tx)
if ok:
return n, None
ch._mu.release()
sendv.append((n, ch, tx))
# recv
else:
recv = case
if im_class(recv) is not chan:
panic("select: recv on non-chan: %r" % (im_class(recv),))
if recv.__func__ is _chan_recv:
commaok = False
elif recv.__func__ is _chan_recv_:
commaok = True
else:
panic("select: recv expected: %r" % (recv,))
ch = recv.__self__
if ch is not nilchan: # nil chan is never ready
ch._mu.acquire()
if 1:
rx_, ok = ch._tryrecv()
if ok:
if not commaok:
rx, ok = rx_
rx_ = rx
return n, rx_
ch._mu.release()
recvv.append((n, ch, commaok))
# execute default if we have it
if ndefault is not None:
return ndefault, None
# select{} or with nil-channels only -> block forever
if len(recvv) + len(sendv) == 0:
_blockforever()
# second pass: subscribe and wait on all rx/tx cases
g = _WaitGroup()
# selected returns what was selected in g.
# the return signature is the one of select.
def selected():
g.wait()
sel = g.which
if isinstance(sel, _SendWaiting):
if not sel.ok:
panic("send on closed channel")
return sel.sel_n, None
if isinstance(sel, _RecvWaiting):
rx_ = sel.rx_
if not sel.sel_commaok:
rx, ok = rx_
rx_ = rx
return sel.sel_n, rx_
raise AssertionError("select: unreachable")
try:
for n, ch, tx in sendv:
ch._mu.acquire()
with g._mu:
# a case that we previously queued already won
if g.which is not None:
ch._mu.release()
return selected()
ok = ch._trysend(tx)
if ok:
# don't let already queued cases win
g.which = "tx prepoll won" # !None
return n, None
w = _SendWaiting(g, ch, tx)
w.sel_n = n
ch._sendq.append(w)
ch._mu.release()
for n, ch, commaok in recvv:
ch._mu.acquire()
with g._mu:
# a case that we previously queued already won
if g.which is not None:
ch._mu.release()
return selected()
rx_, ok = ch._tryrecv()
if ok:
# don't let already queued cases win
g.which = "rx prepoll won" # !None
if not commaok:
rx, ok = rx_
rx_ = rx
return n, rx_
w = _RecvWaiting(g, ch)
w.sel_n = n
w.sel_commaok = commaok
ch._recvq.append(w)
ch._mu.release()
return selected()
finally:
# unsubscribe not-succeeded waiters
g.dequeAll()
# _blockforever blocks current goroutine forever.
def _blockforever():
# take a lock twice. It will forever block on the second lock attempt.
# Under gevent, similarly to Go, this raises "LoopExit: This operation
# would block forever", if there are no other greenlets scheduled to be run.
dead = threading.Lock()
dead.acquire()
dead.acquire()
......@@ -35,6 +35,9 @@ from cpython cimport Py_INCREF, Py_DECREF, PY_MAJOR_VERSION
from cython cimport final
import sys
import six
import threading, collections, random
from golang._pycompat import im_class
# ---- panic ----
......@@ -132,6 +135,512 @@ cdef void __goviac(void *arg) nogil:
raise # XXX exception -> exit program with traceback (same as in go) ?
# ---- channels ----
# _RecvWaiting represents a receiver waiting on a chan.
class _RecvWaiting(object):
# .group _WaitGroup group of waiters this receiver is part of
# .chan chan channel receiver is waiting on
#
# on wakeup: sender|closer -> receiver:
# .rx_ rx_ for recv
def __init__(self, group, ch):
self.group = group
self.chan = ch
group.register(self)
# wakeup notifies waiting receiver that recv_ completed.
def wakeup(self, rx, ok):
self.rx_ = (rx, ok)
self.group.wakeup()
# _SendWaiting represents a sender waiting on a chan.
class _SendWaiting(object):
# .group _WaitGroup group of waiters this sender is part of
# .chan chan channel sender is waiting on
# .obj object that was passed to send
#
# on wakeup: receiver|closer -> sender:
# .ok bool whether send succeeded (it will not on close)
def __init__(self, group, ch, obj):
self.group = group
self.chan = ch
self.obj = obj
group.register(self)
# wakeup notifies waiting sender that send completed.
def wakeup(self, ok):
self.ok = ok
self.group.wakeup()
# _WaitGroup is a group of waiting senders and receivers.
#
# Only 1 waiter from the group can succeed waiting.
class _WaitGroup(object):
# ._waitv [] of _{Send|Recv}Waiting
# ._sema semaphore used for wakeup
#
# ._mu lock NOTE ∀ chan order is always: chan._mu > ._mu
#
# on wakeup: sender|receiver -> group:
# .which _{Send|Recv}Waiting instance which succeeded waiting.
def __init__(self):
self._waitv = []
self._sema = threading.Lock() # in python it is valid to release lock from another thread.
self._sema.acquire()
self._mu = threading.Lock()
self.which = None
def register(self, wait):
self._waitv.append(wait)
# try_to_win tries to win waiter after it was dequeued from a channel's {_send|_recv}q.
#
# -> ok: true if won, false - if not.
def try_to_win(self, waiter):
with self._mu:
if self.which is not None:
return False
else:
self.which = waiter
return True
# wait waits for winning case of group to complete.
def wait(self):
self._sema.acquire()
# wakeup wakes up the group.
#
# prior to wakeup try_to_win must have been called.
# in practice this means that waiters queued to chan.{_send|_recv}q must
# be dequeued with _dequeWaiter.
def wakeup(self):
assert self.which is not None
self._sema.release()
# dequeAll removes all registered waiters from their wait queues.
def dequeAll(self):
for w in self._waitv:
ch = w.chan
if isinstance(w, _SendWaiting):
queue = ch._sendq
else:
assert isinstance(w, _RecvWaiting)
queue = ch._recvq
with ch._mu:
try:
queue.remove(w)
except ValueError:
pass
# _dequeWaiter dequeues a send or recv waiter from a channel's _recvq or _sendq.
#
# the channel owning {_recv|_send}q must be locked.
def _dequeWaiter(queue):
while len(queue) > 0:
w = queue.popleft()
# if this waiter can win its group - return it.
# if not - someone else from its group already has won, and so we anyway have
# to remove the waiter from the queue.
if w.group.try_to_win(w):
return w
return None
# chan is a channel with Go semantic.
class chan(object):
# ._cap channel capacity
# ._mu lock
# ._dataq deque *: data buffer
# ._recvq deque _RecvWaiting: blocked receivers
# ._sendq deque _SendWaiting: blocked senders
# ._closed bool
def __init__(self, size=0):
self._cap = size
self._mu = threading.Lock()
self._dataq = collections.deque()
self._recvq = collections.deque()
self._sendq = collections.deque()
self._closed = False
# send sends object to a receiver.
#
# .send(obj)
def send(self, obj):
if self is nilchan:
_blockforever()
self._mu.acquire()
if 1:
ok = self._trysend(obj)
if ok:
return
g = _WaitGroup()
me = _SendWaiting(g, self, obj)
self._sendq.append(me)
self._mu.release()
g.wait()
assert g.which is me
if not me.ok:
pypanic("send on closed channel")
# recv_ is "comma-ok" version of recv.
#
# ok is true - if receive was delivered by a successful send.
# ok is false - if receive is due to channel being closed and empty.
#
# .recv_() -> (rx, ok)
def recv_(self):
if self is nilchan:
_blockforever()
self._mu.acquire()
if 1:
rx_, ok = self._tryrecv()
if ok:
return rx_
g = _WaitGroup()
me = _RecvWaiting(g, self)
self._recvq.append(me)
self._mu.release()
g.wait()
assert g.which is me
return me.rx_
# recv receives from the channel.
#
# .recv() -> rx
def recv(self):
rx, _ = self.recv_()
return rx
# _trysend(obj) -> ok
#
# must be called with ._mu held.
# if ok or panic - returns with ._mu released.
# if !ok - returns with ._mu still being held.
def _trysend(self, obj):
if self._closed:
self._mu.release()
pypanic("send on closed channel")
# synchronous channel
if self._cap == 0:
recv = _dequeWaiter(self._recvq)
if recv is None:
return False
self._mu.release()
recv.wakeup(obj, True)
return True
# buffered channel
else:
if len(self._dataq) >= self._cap:
return False
self._dataq.append(obj)
recv = _dequeWaiter(self._recvq)
if recv is not None:
rx = self._dataq.popleft()
self._mu.release()
recv.wakeup(rx, True)
else:
self._mu.release()
return True
# _tryrecv() -> rx_=(rx, ok), ok
#
# must be called with ._mu held.
# if ok or panic - returns with ._mu released.
# if !ok - returns with ._mu still being held.
def _tryrecv(self):
# buffered
if len(self._dataq) > 0:
rx = self._dataq.popleft()
# wakeup a blocked writer, if there is any
send = _dequeWaiter(self._sendq)
if send is not None:
self._dataq.append(send.obj)
self._mu.release()
send.wakeup(True)
else:
self._mu.release()
return (rx, True), True
# closed
if self._closed:
self._mu.release()
return (None, False), True
# sync | empty: there is waiting writer
send = _dequeWaiter(self._sendq)
if send is None:
return (None, False), False
self._mu.release()
rx = send.obj
send.wakeup(True)
return (rx, True), True
# close closes sending side of the channel.
def close(self):
if self is nilchan:
pypanic("close of nil channel")
recvv = []
sendv = []
with self._mu:
if self._closed:
pypanic("close of closed channel")
self._closed = True
# schedule: wake-up all readers
while 1:
recv = _dequeWaiter(self._recvq)
if recv is None:
break
recvv.append(recv)
# schedule: wake-up all writers (they will panic)
while 1:
send = _dequeWaiter(self._sendq)
if send is None:
break
sendv.append(send)
# perform scheduled wakeups outside of ._mu
for recv in recvv:
recv.wakeup(None, False)
for send in sendv:
send.wakeup(False)
def __len__(self):
return len(self._dataq)
def __repr__(self):
if self is nilchan:
return "nilchan"
else:
return super(chan, self).__repr__()
# nilchan is the nil channel.
#
# On nil channel: send/recv block forever; close panics.
nilchan = chan(None) # TODO -> <chan*>(NULL) after move to Cython
# default represents default case for select.
default = object()
# unbound chan.{send,recv,recv_}
_chan_send = chan.send
_chan_recv = chan.recv
_chan_recv_ = chan.recv_
if six.PY2:
# on py3 class.func gets the func; on py2 - unbound_method(func)
_chan_send = _chan_send.__func__
_chan_recv = _chan_recv.__func__
_chan_recv_ = _chan_recv_.__func__
# select executes one ready send or receive channel case.
#
# if no case is ready and default case was provided, select chooses default.
# if no case is ready and default was not provided, select blocks until one case becomes ready.
#
# returns: selected case number and receive info (None if send case was selected).
#
# example:
#
# _, _rx = select(
# ch1.recv, # 0
# ch2.recv_, # 1
# (ch2.send, obj2), # 2
# default, # 3
# )
# if _ == 0:
# # _rx is what was received from ch1
# ...
# if _ == 1:
# # _rx is (rx, ok) of what was received from ch2
# ...
# if _ == 2:
# # we know obj2 was sent to ch2
# ...
# if _ == 3:
# # default case
# ...
def select(*casev):
# select promise: if multiple cases are ready - one will be selected randomly
ncasev = list(enumerate(casev))
random.shuffle(ncasev)
# first pass: poll all cases and bail out in the end if default was provided
recvv = [] # [](n, ch, commaok)
sendv = [] # [](n, ch, tx)
ndefault = None
for (n, case) in ncasev:
# default: remember we have it
if case is default:
if ndefault is not None:
pypanic("select: multiple default")
ndefault = n
# send
elif isinstance(case, tuple):
send, tx = case
if im_class(send) is not chan:
pypanic("select: send on non-chan: %r" % (im_class(send),))
if send.__func__ is not _chan_send:
pypanic("select: send expected: %r" % (send,))
ch = send.__self__
if ch is not nilchan: # nil chan is never ready
ch._mu.acquire()
if 1:
ok = ch._trysend(tx)
if ok:
return n, None
ch._mu.release()
sendv.append((n, ch, tx))
# recv
else:
recv = case
if im_class(recv) is not chan:
pypanic("select: recv on non-chan: %r" % (im_class(recv),))
if recv.__func__ is _chan_recv:
commaok = False
elif recv.__func__ is _chan_recv_:
commaok = True
else:
pypanic("select: recv expected: %r" % (recv,))
ch = recv.__self__
if ch is not nilchan: # nil chan is never ready
ch._mu.acquire()
if 1:
rx_, ok = ch._tryrecv()
if ok:
if not commaok:
rx, ok = rx_
rx_ = rx
return n, rx_
ch._mu.release()
recvv.append((n, ch, commaok))
# execute default if we have it
if ndefault is not None:
return ndefault, None
# select{} or with nil-channels only -> block forever
if len(recvv) + len(sendv) == 0:
_blockforever()
# second pass: subscribe and wait on all rx/tx cases
g = _WaitGroup()
# selected returns what was selected in g.
# the return signature is the one of select.
def selected():
g.wait()
sel = g.which
if isinstance(sel, _SendWaiting):
if not sel.ok:
pypanic("send on closed channel")
return sel.sel_n, None
if isinstance(sel, _RecvWaiting):
rx_ = sel.rx_
if not sel.sel_commaok:
rx, ok = rx_
rx_ = rx
return sel.sel_n, rx_
raise AssertionError("select: unreachable")
try:
for n, ch, tx in sendv:
ch._mu.acquire()
with g._mu:
# a case that we previously queued already won
if g.which is not None:
ch._mu.release()
return selected()
ok = ch._trysend(tx)
if ok:
# don't let already queued cases win
g.which = "tx prepoll won" # !None
return n, None
w = _SendWaiting(g, ch, tx)
w.sel_n = n
ch._sendq.append(w)
ch._mu.release()
for n, ch, commaok in recvv:
ch._mu.acquire()
with g._mu:
# a case that we previously queued already won
if g.which is not None:
ch._mu.release()
return selected()
rx_, ok = ch._tryrecv()
if ok:
# don't let already queued cases win
g.which = "rx prepoll won" # !None
if not commaok:
rx, ok = rx_
rx_ = rx
return n, rx_
w = _RecvWaiting(g, ch)
w.sel_n = n
w.sel_commaok = commaok
ch._recvq.append(w)
ch._mu.release()
return selected()
finally:
# unsubscribe not-succeeded waiters
g.dequeAll()
# _blockforever blocks current goroutine forever.
def _blockforever():
# take a lock twice. It will forever block on the second lock attempt.
# Under gevent, similarly to Go, this raises "LoopExit: This operation
# would block forever", if there are no other greenlets scheduled to be run.
dead = threading.Lock()
dead.acquire()
dead.acquire()
# ---- init libgolang runtime ---
cdef extern from "golang/libgolang.h" namespace "golang" nogil:
......
......@@ -30,7 +30,7 @@ from six.moves import range as xrange
import gc, weakref
import golang
from golang import _chan_recv, _chan_send
from golang._golang import _chan_recv, _chan_send
from golang._pycompat import im_class
# pyx/c/c++ tests -> test_pyx_*
......@@ -693,13 +693,14 @@ def bench_select(b):
def test_blockforever():
B = golang._blockforever
from golang import _golang
B = _golang._blockforever
def _(): panic("t: blocks forever")
golang._blockforever = _
_golang._blockforever = _
try:
_test_blockforever()
finally:
golang._blockforever = B
_golang._blockforever = B
def _test_blockforever():
z = nilchan
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment