Commit 7eb9b595 authored by Jason Madden's avatar Jason Madden

Debug the test failure in TestSemaphoreMultiThread.test_dueling_threads_with_hub

And fix. Leave a comment about what was discovered. It's somewhat surprising.
parent ca2de796
Make ``AsyncResult`` print a warning when it detects improper
cross-thread usage instead of hanging.
``AsyncResult`` has *never* been safe to use from multiple threads.
It, like most gevent objects, is intended to work with greenlets from
a single thread. Using ``AsyncResult`` from multiple threads has
undefined semantics. The safest way to communicate between threads is
using an event loop async watcher.
Those undefined semantics changed in recent gevent versions, making it
more likely that an abused ``AsyncResult`` would misbehave in ways
that could cause the program to hang.
Now, when ``AsyncResult`` detects a situation that would hang, it
prints a warning to stderr. Note that this is best-effort, and hangs
are still possible, especially under PyPy 7.3.3.
At the same time, ``AsyncResult`` is tuned to behave more like it did
in older versions, meaning that the hang is once again much less
likely. If you were getting lucky and using ``AsyncResult``
successfully across threads, this may restore your luck.
...@@ -354,12 +354,16 @@ class AbstractLinkable(object): ...@@ -354,12 +354,16 @@ class AbstractLinkable(object):
# Move this to be a callback in that thread. # Move this to be a callback in that thread.
# (This relies on holding the GIL *or* ``Hub.loop.run_callback`` being # (This relies on holding the GIL *or* ``Hub.loop.run_callback`` being
# thread-safe!) # thread-safe! Note that the CFFI implementations are definitely
# NOT thread-safe. TODO: Make them? Or an alternative?)
# #
# Otherwise, print some error messages. # Otherwise, print some error messages.
# TODO: Inline this for individual links. That handles the # TODO: Inline this for individual links. That handles the
# "only while ready" case automatically. # "only while ready" case automatically. Be careful about locking in that case.
#
# TODO: Add a 'strict' mode that prevents doing this dance, since it's
# inherently not safe.
root_greenlets = None root_greenlets = None
printed_tb = False printed_tb = False
only_while_ready = not self._notify_all only_while_ready = not self._notify_all
......
...@@ -281,6 +281,31 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable ...@@ -281,6 +281,31 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
def __exit__(self, t, v, tb): def __exit__(self, t, v, tb):
self.release() self.release()
def _handle_unswitched_notifications(self, unswitched):
# If we fail to switch to a greenlet in another thread to send
# a notification, just re-queue it, in the hopes that the
# other thread will eventually run notifications itself.
#
# We CANNOT do what the ``super()`` does and actually allow
# this notification to get run sometime in the future by
# scheduling a callback in the other thread. The algorithm
# that we use to handle cross-thread locking/unlocking was
# designed before the schedule-a-callback mechanism was
# implemented. If we allow this to be run as a callback, we
# can find ourself the victim of ``InvalidSwitchError`` (or
# worse, silent corruption) because the switch can come at an
# unexpected time: *after* the destination thread has already
# acquired the lock.
#
# This manifests in a fairly reliable test failure,
# ``gevent.tests.test__semaphore``
# ``TestSemaphoreMultiThread.test_dueling_threads_with_hub``,
# but ONLY when running in PURE_PYTHON mode.
#
# TODO: Maybe we can rewrite that part of the algorithm to be friendly to
# running the callbacks?
self._links.extend(unswitched)
def __add_link(self, link): def __add_link(self, link):
if not self._notifier: if not self._notifier:
self.rawlink(link) self.rawlink(link)
......
...@@ -29,6 +29,10 @@ class Event(AbstractLinkable): # pylint:disable=undefined-variable ...@@ -29,6 +29,10 @@ class Event(AbstractLinkable): # pylint:disable=undefined-variable
one or more others. It has the same interface as one or more others. It has the same interface as
:class:`threading.Event` but works across greenlets. :class:`threading.Event` but works across greenlets.
.. important::
This object is for communicating among greenlets within the
same thread *only*! Do not try to use it to communicate across threads.
An event object manages an internal flag that can be set to true An event object manages an internal flag that can be set to true
with the :meth:`set` method and reset to false with the with the :meth:`set` method and reset to false with the
:meth:`clear` method. The :meth:`wait` method blocks until the :meth:`clear` method. The :meth:`wait` method blocks until the
...@@ -169,12 +173,18 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable ...@@ -169,12 +173,18 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
""" """
A one-time event that stores a value or an exception. A one-time event that stores a value or an exception.
Like :class:`Event` it wakes up all the waiters when :meth:`set` or :meth:`set_exception` Like :class:`Event` it wakes up all the waiters when :meth:`set`
is called. Waiters may receive the passed value or exception by calling :meth:`get` or :meth:`set_exception` is called. Waiters may receive the passed
instead of :meth:`wait`. An :class:`AsyncResult` instance cannot be reset. value or exception by calling :meth:`get` instead of :meth:`wait`.
An :class:`AsyncResult` instance cannot be reset.
.. important::
This object is for communicating among greenlets within the
same thread *only*! Do not try to use it to communicate across threads.
To pass a value call :meth:`set`. Calls to :meth:`get` (those that are currently blocking as well as To pass a value call :meth:`set`. Calls to :meth:`get` (those that
those made in the future) will return the value: are currently blocking as well as those made in the future) will
return the value::
>>> from gevent.event import AsyncResult >>> from gevent.event import AsyncResult
>>> result = AsyncResult() >>> result = AsyncResult()
...@@ -182,7 +192,8 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable ...@@ -182,7 +192,8 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
>>> result.get() >>> result.get()
100 100
To pass an exception call :meth:`set_exception`. This will cause :meth:`get` to raise that exception: To pass an exception call :meth:`set_exception`. This will cause
:meth:`get` to raise that exception::
>>> result = AsyncResult() >>> result = AsyncResult()
>>> result.set_exception(RuntimeError('failure')) >>> result.set_exception(RuntimeError('failure'))
...@@ -191,7 +202,8 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable ...@@ -191,7 +202,8 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
... ...
RuntimeError: failure RuntimeError: failure
:class:`AsyncResult` implements :meth:`__call__` and thus can be used as :meth:`link` target: :class:`AsyncResult` implements :meth:`__call__` and thus can be
used as :meth:`link` target::
>>> import gevent >>> import gevent
>>> result = AsyncResult() >>> result = AsyncResult()
...@@ -203,6 +215,7 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable ...@@ -203,6 +215,7 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
ZeroDivisionError ZeroDivisionError
.. note:: .. note::
The order and timing in which waiting greenlets are awakened is not determined. The order and timing in which waiting greenlets are awakened is not determined.
As an implementation note, in gevent 1.1 and 1.0, waiting greenlets are awakened in a As an implementation note, in gevent 1.1 and 1.0, waiting greenlets are awakened in a
undetermined order sometime *after* the current greenlet yields to the event loop. Other greenlets undetermined order sometime *after* the current greenlet yields to the event loop. Other greenlets
...@@ -210,16 +223,25 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable ...@@ -210,16 +223,25 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
the waiting greenlets being awakened. These details may change in the future. the waiting greenlets being awakened. These details may change in the future.
.. versionchanged:: 1.1 .. versionchanged:: 1.1
The exact order in which waiting greenlets are awakened is not the same
as in 1.0. The exact order in which waiting greenlets
are awakened is not the same as in 1.0.
.. versionchanged:: 1.1 .. versionchanged:: 1.1
Callbacks :meth:`linked <rawlink>` to this object are required to be hashable, and duplicates are
merged. Callbacks :meth:`linked <rawlink>` to this object are required to
be hashable, and duplicates are merged.
.. versionchanged:: 1.5a3 .. versionchanged:: 1.5a3
Waiting greenlets are now awakened in the order in which they waited.
Waiting greenlets are now awakened in the order in which they
waited.
.. versionchanged:: 1.5a3 .. versionchanged:: 1.5a3
The low-level ``rawlink`` method (most users won't use this) now automatically
unlinks waiters before calling them. The low-level ``rawlink`` method
(most users won't use this) now automatically unlinks waiters
before calling them.
""" """
__slots__ = ('_value', '_exc_info', '_imap_task_index') __slots__ = ('_value', '_exc_info', '_imap_task_index')
......
...@@ -267,7 +267,9 @@ class TestSemaphoreMultiThread(greentest.TestCase): ...@@ -267,7 +267,9 @@ class TestSemaphoreMultiThread(greentest.TestCase):
if not run: if not run:
break break
sem.acquire(*acquire_args) acquired = sem.acquire(*acquire_args)
assert acquire_args or acquired
if acquired:
sem.release() sem.release()
results[ix] = i results[ix] = i
if not create_hub: if not create_hub:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment