Commit 17128481 authored by Jason Madden's avatar Jason Madden

Better handle multi-threaded case when a greenlet from a different thread is already waiting.

Also some tweaks to the make-manylinux script; we seem to not be getting the loop I expect.
parent 491ff95b
......@@ -48,6 +48,9 @@ fi
# Needed for clock_gettime libc support on this version. This used to be spelled with LDFLAGS,
# but that is deprecated and produces a warning.
export LIBS="-lrt"
# Be sure that we get the loop we expect by default, and not
# a fallback loop.
export GEVENT_LOOP="libev-cext"
if [ -d /gevent -a -d /opt/python ]; then
# Running inside docker
......@@ -117,7 +120,10 @@ if [ -d /gevent -a -d /opt/python ]; then
python -c 'from __future__ import print_function; import gevent; print(gevent, gevent.__version__)'
python -c 'from __future__ import print_function; from gevent._compat import get_clock_info; print("clock info", get_clock_info("perf_counter"))'
python -c 'from __future__ import print_function; import greenlet; print(greenlet, greenlet.__version__)'
python -c 'from __future__ import print_function; import gevent.core; print("loop", gevent.core.loop)'
python -c 'from __future__ import print_function; import gevent.core; print("default loop", gevent.core.loop)'
# Other loops we should have
GEVENT_LOOP=libuv python -c 'from __future__ import print_function; import gevent.core; print("libuv loop", gevent.core.loop)'
GEVENT_LOOP=libev-cffi python -c 'from __future__ import print_function; import gevent.core; print("libev-cffi loop", gevent.core.loop)'
if [ -z "$GEVENTSETUP_DISABLE_ARES" ]; then
python -c 'from __future__ import print_function; import gevent.ares; print("ares", gevent.ares)'
fi
......
......@@ -10,6 +10,8 @@ from __future__ import print_function
import sys
from greenlet import error as greenlet_error
from gevent._compat import thread_mod_name
from gevent._hub_local import get_hub_noargs as get_hub
from gevent._hub_local import get_hub_if_exists
......@@ -30,6 +32,14 @@ __all__ = [
_get_thread_ident = __import__(thread_mod_name).get_ident
_allocate_thread_lock = __import__(thread_mod_name).allocate_lock
class _FakeNotifier(object):
__slots__ = (
'pending',
)
def __init__(self):
self.pending = False
class AbstractLinkable(object):
# Encapsulates the standard parts of the linking and notifying
# protocol common to both repeatable events (Event, Semaphore) and
......@@ -216,9 +226,8 @@ class AbstractLinkable(object):
only_while_ready = not self._notify_all
final_link = links[-1]
done = set() # of ids
hub = self.hub
if hub is None:
hub = get_hub_if_exists()
hub = self.hub if self.hub is not None else get_hub_if_exists()
unswitched = []
while links: # remember this can be mutated
if only_while_ready and not self.ready():
break
......@@ -235,6 +244,10 @@ class AbstractLinkable(object):
self._drop_lock_for_switch_out()
try:
link(self)
except greenlet_error:
# couldn't switch to a greenlet, we must be
# running in a different thread. back on the list it goes for next time.
unswitched.append(link)
finally:
self._acquire_lock_for_switch_in()
......@@ -248,6 +261,7 @@ class AbstractLinkable(object):
if link is final_link:
break
return unswitched
def _notify_links(self, arrived_while_waiting):
# This method must hold the GIL, or be guarded with the lock that guards
......@@ -281,12 +295,13 @@ class AbstractLinkable(object):
# any more. In that case, we must not keep notifying anyone that's
# newly added after that, even if we go ready again.
try:
self._notify_link_list(self._links)
unswitched = self._notify_link_list(self._links)
# Now, those that arrived after we had begun the notification
# process. Follow the same rules, stop with those that are
# added so far to prevent starvation.
if arrived_while_waiting:
self._notify_link_list(arrived_while_waiting)
un2 = self._notify_link_list(arrived_while_waiting)
unswitched.extend(un2)
# Anything left needs to go back on the main list.
self._links.extend(arrived_while_waiting)
......@@ -300,11 +315,16 @@ class AbstractLinkable(object):
# free up thread affinity? In case of a pathological situation where
# one object was used from one thread once & first, but usually is
# used by another thread.
#
# BoundedSemaphore does this.
# Now we may be ready or not ready. If we're ready, which
# could have happened during the last link we called, then we
# must have more links than we started with. We need to schedule the
# wakeup.
self._check_and_notify()
# If we added unswitched greenlets, however, don't add them back to the links yet.
# We wouldn't be able to call them in this hub anyway.
self._links.extend(unswitched)
def _quiet_unlink_all(self, obj):
if obj is None:
......
......@@ -32,6 +32,9 @@ cdef inline void greenlet_init():
cdef void _init()
cdef class _FakeNotifier(object):
cdef bint pending
cdef class AbstractLinkable(object):
# We declare the __weakref__ here in the base (even though
# that's not really what we want) as a workaround for a Cython
......@@ -58,7 +61,7 @@ cdef class AbstractLinkable(object):
cdef int _switch_to_hub(self, the_hub) except -1
@cython.nonecheck(False)
cdef _notify_link_list(self, list links)
cdef list _notify_link_list(self, list links)
@cython.nonecheck(False)
cpdef _notify_links(self, list arrived_while_waiting)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment