Commit 8243351a authored by Jason Madden's avatar Jason Madden

Add initial fix for #1739; needs more testing and a change note.

In addition to the two missing tests mentioned in test__event, I need to see if I can get a semaphore in this situation.
parent 09f3ae72
...@@ -9,7 +9,9 @@ from __future__ import division ...@@ -9,7 +9,9 @@ from __future__ import division
from __future__ import print_function from __future__ import print_function
import sys import sys
from gc import get_objects
from greenlet import greenlet
from greenlet import error as greenlet_error from greenlet import error as greenlet_error
from gevent._compat import thread_mod_name from gevent._compat import thread_mod_name
...@@ -40,6 +42,15 @@ class _FakeNotifier(object): ...@@ -40,6 +42,15 @@ class _FakeNotifier(object):
def __init__(self): def __init__(self):
self.pending = False self.pending = False
def get_roots_and_hubs():
from gevent.hub import Hub # delay import
return {
x.parent: x
for x in get_objects()
if isinstance(x, Hub)
}
class AbstractLinkable(object): class AbstractLinkable(object):
# Encapsulates the standard parts of the linking and notifying # Encapsulates the standard parts of the linking and notifying
# protocol common to both repeatable events (Event, Semaphore) and # protocol common to both repeatable events (Event, Semaphore) and
...@@ -328,13 +339,77 @@ class AbstractLinkable(object): ...@@ -328,13 +339,77 @@ class AbstractLinkable(object):
# must have more links than we started with. We need to schedule the # must have more links than we started with. We need to schedule the
# wakeup. # wakeup.
self._check_and_notify() self._check_and_notify()
# If we added unswitched greenlets, however, don't add them back to the links yet. if unswitched:
# We wouldn't be able to call them in this hub anyway. self._handle_unswitched_notifications(unswitched)
# TODO: Instead of just adding these back to self._links, we should try to detect their
# "home" hub and move the callback to that hub. As it stands, there's a chance that
# if no greenlet tries to acquire/release this object in that hub, these objects def _handle_unswitched_notifications(self, unswitched):
# will never get to run. # Given a list of callable objects that raised
self._links.extend(unswitched) # ``greenlet.error`` when we called them: If we can determine
# that it is a parked greenlet (the callablle is a
# ``greenlet.switch`` method) and we can determine the hub
# that the greenlet belongs to (either its parent, or, in the
# case of a main greenlet, find a hub with the same parent as
# this greenlet object) then:
# Move this to be a callback in that thread.
# (This relies on holding the GIL *or* ``Hub.loop.run_callback`` being
# thread-safe!)
#
# Otherwise, print some error messages.
# TODO: Inline this for individual links.
root_greenlets = None
printed_tb = False
only_while_ready = not self._notify_all
while unswitched:
if only_while_ready and not self.ready():
self.__print_unswitched_warning(unswitched, printed_tb)
break
link = unswitched.pop(0)
hub = None # Also serves as a "handled?" flag
# Is it a greenlet.switch method?
if (getattr(link, '__name__', None) == 'switch'
and isinstance(getattr(link, '__self__', None), greenlet)):
glet = link.__self__
parent = glet.parent
while parent is not None:
if hasattr(parent, 'loop'): # Assuming the hub.
hub = glet.parent
break
parent = glet.parent
if hub is None:
if root_greenlets is None:
root_greenlets = get_roots_and_hubs()
hub = root_greenlets.get(glet)
if hub is not None:
hub.loop.run_callback(link, self)
if hub is None:
# We couldn't handle it
self.__print_unswitched_warning(link, printed_tb)
printed_tb = True
def __print_unswitched_warning(self, link, printed_tb):
print('gevent: error: Unable to switch to greenlet', link,
'from', self, '; crossing thread boundaries is not allowed.',
file=sys.stderr)
if not printed_tb:
printed_tb = True
print(
'gevent: error: '
'This is a result of using gevent objects from multiple threads,',
'and is a bug in the calling code.', file=sys.stderr)
import traceback
traceback.print_stack()
def _quiet_unlink_all(self, obj): def _quiet_unlink_all(self, obj):
if obj is None: if obj is None:
......
...@@ -9,6 +9,7 @@ cdef InvalidThreadUseError ...@@ -9,6 +9,7 @@ cdef InvalidThreadUseError
cdef Timeout cdef Timeout
cdef _get_thread_ident cdef _get_thread_ident
cdef bint _greenlet_imported cdef bint _greenlet_imported
cdef get_objects
cdef extern from "greenlet/greenlet.h": cdef extern from "greenlet/greenlet.h":
...@@ -32,6 +33,8 @@ cdef inline void greenlet_init(): ...@@ -32,6 +33,8 @@ cdef inline void greenlet_init():
cdef void _init() cdef void _init()
cdef dict get_roots_and_hubs()
cdef class _FakeNotifier(object): cdef class _FakeNotifier(object):
cdef bint pending cdef bint pending
...@@ -66,6 +69,9 @@ cdef class AbstractLinkable(object): ...@@ -66,6 +69,9 @@ cdef class AbstractLinkable(object):
@cython.nonecheck(False) @cython.nonecheck(False)
cpdef _notify_links(self, list arrived_while_waiting) cpdef _notify_links(self, list arrived_while_waiting)
cdef _handle_unswitched_notifications(self, list unswitched)
cdef __print_unswitched_warning(self, link, bint printed_tb)
cpdef _drop_lock_for_switch_out(self) cpdef _drop_lock_for_switch_out(self)
cpdef _acquire_lock_for_switch_in(self) cpdef _acquire_lock_for_switch_in(self)
......
...@@ -173,9 +173,11 @@ class TestAsyncResult(greentest.TestCase): ...@@ -173,9 +173,11 @@ class TestAsyncResult(greentest.TestCase):
self.running_event.set() self.running_event.set()
print("Entering wait")
# XXX: If we use a timed wait(), the bug doesn't manifest. # XXX: If we use a timed wait(), the bug doesn't manifest.
# Why not? # Why not?
# TODO: Add a test for that.
# TODO: Add a test where it's the greenlet we spawn that does
# the wait. This is a separate code path.
self.result = self.async_result.wait() self.result = self.async_result.wait()
g_event.set() g_event.set()
...@@ -186,9 +188,7 @@ class TestAsyncResult(greentest.TestCase): ...@@ -186,9 +188,7 @@ class TestAsyncResult(greentest.TestCase):
thread = Thread() thread = Thread()
thread.start() thread.start()
try: try:
print("Waiting for thread")
thread.running_event.wait() thread.running_event.wait()
print("Thread is running")
thread.async_result.set('from main') thread.async_result.set('from main')
thread.finished_event.wait(DELAY * 5) thread.finished_event.wait(DELAY * 5)
finally: finally:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment