Commit 491ff95b authored by Jason Madden's avatar Jason Madden

Handle more cases, and be more careful with hubs.

Implement timeouts for the no-hub case using spin locks.
parent e3667c71
......@@ -2,4 +2,14 @@ Improve the ability to use monkey-patched locks, and
`gevent.lock.BoundedSemaphore`, across threads, especially when the
various threads might not have a gevent hub or any other active
greenlets. In particular, this handles some cases that previously
raised ``LoopExit``.
raised ``LoopExit`` or would hang.
The semaphore tries to avoid creating a hub if it seems unnecessary,
automatically creating one in the single-threaded case when it would
block, but not in the multi-threaded case. While the differences
should be correctly detected, it's possible there are corner cases
where they might not be.
If your application appears to hang acquiring semaphores, but adding a
call to ``gevent.get_hub()`` in the thread attempting to acquire the
semaphore before doing so fixes it, please file an issue.
......@@ -145,11 +145,16 @@ class AbstractLinkable(object):
def _getcurrent(self):
return getcurrent() # pylint:disable=undefined-variable
def _get_thread_ident(self):
return _get_thread_ident()
def _capture_hub(self, create):
# Subclasses should call this as the first action from any
# public method that could, in theory, block and switch
# to the hub. This may release the GIL. It may
# raise InvalidThreadUseError if the result would
# First, detect a dead hub and drop it.
while 1:
my_hub = self.hub
if my_hub is None:
......@@ -178,20 +183,30 @@ class AbstractLinkable(object):
get_hub_if_exists(),
getcurrent() # pylint:disable=undefined-variable
)
return 1
return self.hub
def _check_and_notify(self):
# If this object is ready to be notified, begin the process.
if self.ready() and self._links and not self._notifier:
hub = None
try:
self._capture_hub(True) # Must create, we need it.
hub = self._capture_hub(False) # Must create, we need it.
except InvalidThreadUseError:
# The current hub doesn't match self.hub. That's OK,
# we still want to start the notifier in the thread running
# self.hub (because the links probably contains greenlet.switch
# calls valid only in that hub)
pass
self._notifier = self.hub.loop.run_callback(self._notify_links, [])
if hub is not None:
self._notifier = hub.loop.run_callback(self._notify_links, [])
else:
# Hmm, no hub. We must be the only thing running. Then its OK
# to just directly call the callbacks.
self._notifier = 1
try:
self._notify_links([])
finally:
self._notifier = None
def _notify_link_list(self, links):
# The core of the _notify_links method to notify
......@@ -201,6 +216,9 @@ class AbstractLinkable(object):
only_while_ready = not self._notify_all
final_link = links[-1]
done = set() # of ids
hub = self.hub
if hub is None:
hub = get_hub_if_exists()
while links: # remember this can be mutated
if only_while_ready and not self.ready():
break
......@@ -222,7 +240,11 @@ class AbstractLinkable(object):
except: # pylint:disable=bare-except
# We're running in the hub, errors must not escape.
self.hub.handle_error((link, self), *sys.exc_info())
if hub is not None:
hub.handle_error((link, self), *sys.exc_info())
else:
import traceback
traceback.print_exc()
if link is final_link:
break
......
......@@ -51,13 +51,10 @@ cdef class AbstractLinkable(object):
cpdef unlink(self, callback)
cdef _check_and_notify(self)
cdef int _capture_hub(self, bint create) except -1
cdef SwitchOutGreenletWithLoop _capture_hub(self, bint create)
cdef __wait_to_be_notified(self, bint rawlink)
cdef void _quiet_unlink_all(self, obj) # suppress exceptions
cdef _allocate_lock(self)
cdef greenlet _getcurrent(self)
cdef int _switch_to_hub(self, the_hub) except -1
@cython.nonecheck(False)
......@@ -72,3 +69,8 @@ cdef class AbstractLinkable(object):
cdef _wait_core(self, timeout, catch=*)
cdef _wait_return_value(self, bint waited, bint wait_success)
cdef _wait(self, timeout=*)
# Unreleated utilities
cdef _allocate_lock(self)
cdef greenlet _getcurrent(self)
cdef _get_thread_ident(self)
......@@ -5,16 +5,22 @@ from gevent._gevent_c_abstract_linkable cimport AbstractLinkable
from gevent._gevent_c_hub_local cimport get_hub_if_exists
from gevent._gevent_c_hub_local cimport get_hub_noargs as get_hub
cdef Timeout
cdef InvalidThreadUseError
cdef LoopExit
cdef spawn_raw
cdef Timeout
cdef _native_sleep
cdef monotonic
cdef spawn_raw
cdef class _LockReleaseLink(object):
cdef object lock
cdef class Semaphore(AbstractLinkable):
cdef public int counter
cdef long _multithreaded
cpdef bint locked(self)
cpdef int release(self) except -1000
# We don't really want this to be public, but
......@@ -38,6 +44,15 @@ cdef class Semaphore(AbstractLinkable):
cdef __acquire_from_other_thread(self, tuple args, bint blocking, timeout)
cpdef __acquire_from_other_thread_cb(self, list results, bint blocking, timeout, thread_lock)
cdef __add_link(self, link)
cdef __acquire_using_two_hubs(self,
SwitchOutGreenletWithLoop hub_for_this_thread,
current_greenlet,
timeout)
cdef __acquire_using_other_hub(self, SwitchOutGreenletWithLoop owning_hub, timeout)
cdef bint __acquire_without_hubs(self, timeout)
cdef bint __spin_on_native_lock(self, thread_lock, timeout)
cdef class BoundedSemaphore(Semaphore):
cdef readonly int _initial_value
......
......@@ -16,8 +16,9 @@ __all__ = [
from time import sleep as _native_sleep
from gevent.exceptions import LoopExit
from gevent._compat import monotonic
from gevent.exceptions import InvalidThreadUseError
from gevent.exceptions import LoopExit
from gevent.timeout import Timeout
def _get_linkable():
......@@ -30,7 +31,16 @@ from gevent._hub_local import get_hub_if_exists
from gevent._hub_local import get_hub
from gevent.hub import spawn_raw
class _LockReleaseLink(object):
__slots__ = (
'lock',
)
def __init__(self, lock):
self.lock = lock
def __call__(self, _):
self.lock.release()
class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
"""
......@@ -61,10 +71,17 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
.. versionchanged:: 1.5a3
The low-level ``rawlink`` method (most users won't use this) now automatically
unlinks waiters before calling them.
.. versionchanged:: NEXT
Improved support for multi-threaded usage. When multi-threaded usage is detected,
instances will no longer create the thread's hub if it's not present.
"""
__slots__ = (
'counter',
# Integer. Set to 0 initially. Set to the ident of the first
# thread that acquires us. If we later see a different thread
# ident, set to -1.
'_multithreaded',
)
def __init__(self, value=1, hub=None):
......@@ -73,6 +90,7 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
raise ValueError("semaphore initial value must be >= 0")
super(Semaphore, self).__init__(hub)
self._notify_all = False
self._multithreaded = 0
def __str__(self):
return '<%s at 0x%x counter=%s _links[%s]>' % (
......@@ -176,21 +194,29 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
the semaphore was acquired, False will be returned. (Note that this can still
raise a ``Timeout`` exception, if some other caller had already started a timer.)
"""
# pylint:disable=too-many-return-statements,too-many-branches
# Sadly, the body of this method is rather complicated.
if self._multithreaded == 0:
self._multithreaded = self._get_thread_ident()
elif self._multithreaded != self._get_thread_ident():
self._multithreaded = -1
# We conceptually now belong to the hub of the thread that
# called this, whether or not we have to block. Note that we
# cannot force it to be created yet, because Semaphore is used
# by importlib.ModuleLock which is used when importing the hub
# itself! This also checks for cross-thread issues.
invalid_thread_use = None
try:
self._capture_hub(False)
except InvalidThreadUseError as e:
# My hub belongs to some other thread. We didn't release the GIL/object lock
# by raising the exception, so we know this is still true.
args = e.args
invalid_thread_use = e.args
e = None
if not self.counter and blocking:
# We would need to block. So coordinate with the main hub.
return self.__acquire_from_other_thread(args, blocking, timeout)
return self.__acquire_from_other_thread(invalid_thread_use, blocking, timeout)
if self.counter > 0:
self.counter -= 1
......@@ -199,6 +225,19 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
if not blocking:
return False
if self._multithreaded != -1 and self.hub is None: # pylint:disable=access-member-before-definition
self.hub = get_hub() # pylint:disable=attribute-defined-outside-init
if self.hub is None and not invalid_thread_use:
# Someone else is holding us. There's not a hub here,
# nor is there a hub in that thread. We'll need to use regular locks.
# This will be unfair to yet a third thread that tries to use us with greenlets.
return self.__acquire_from_other_thread(
(None, None, self._getcurrent(), "NoHubs"),
blocking,
timeout
)
# self._wait may drop both the GIL and the _lock_lock.
# By the time we regain control, both have been reacquired.
try:
......@@ -237,8 +276,13 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
def __exit__(self, t, v, tb):
self.release()
def __add_link(self, link):
if not self._notifier:
self.rawlink(link)
else:
self._notifier.args[0].append(link)
def __acquire_from_other_thread(self, ex_args, blocking, timeout):
# pylint:disable=too-many-branches
assert blocking
# Some other hub owns this object. We must ask it to wake us
# up. In general, we can't use a Python-level ``Lock`` because
......@@ -248,6 +292,9 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
#
# So we need to do so in a way that cooperates with *two*
# hubs. That's what an async watcher is built for.
#
# Of course, if we don't actually have two hubs, then we must find some other
# solution. That involves using a lock.
# We have to take an action that drops the GIL and drops the object lock
# to allow the main thread (the thread for our hub) to advance.
......@@ -255,45 +302,27 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
hub_for_this_thread = ex_args[1]
current_greenlet = ex_args[2]
if hub_for_this_thread is None and timeout is None:
if owning_hub is None and hub_for_this_thread is None:
return self.__acquire_without_hubs(timeout)
if hub_for_this_thread is None:
# Probably a background worker thread. We don't want to create
# the hub if not needed, and since it didn't exist there are no
# other greenlets that we could yield to anyway, so there's nothing
# to block and no reason to try to avoid blocking, so using a native
# lock is the simplest way to go.
thread_lock = self._allocate_lock()
thread_lock.acquire()
results = []
return self.__acquire_using_other_hub(owning_hub, timeout)
owning_hub.loop.run_callback(
spawn_raw,
self.__acquire_from_other_thread_cb,
results,
blocking,
timeout,
thread_lock)
# This can't actually do anything until we drop the object lock.
self._drop_lock_for_switch_out()
# We MUST use a blocking acquire here, or at least be sure we keep going
# until we acquire it. If we timed out waiting here,
# just before the callback runs, then we would be out of sync.
# However, Python 2 has terrible behaviour where lock acquires can't
# be interrupted, so we use a spin loop
try:
while not thread_lock.acquire(0):
_native_sleep(0.005)
finally:
self._acquire_lock_for_switch_in()
return results[0]
# We either already had a hub, or we wanted a timeout, in which case
# we need to use the hub.
if hub_for_this_thread is None:
hub_for_this_thread = get_hub()
# We have a hub we don't want to block. Use an async watcher
# and ask the next releaser of this object to wake us up.
return self.__acquire_using_two_hubs(hub_for_this_thread,
current_greenlet,
timeout)
def __acquire_using_two_hubs(self,
hub_for_this_thread,
current_greenlet,
timeout):
# Allocating and starting the watcher *could* release the GIL.
# with the libev corcext, allocating won't, but starting briefly will.
# With other backends, allocating might, and starting might also.
......@@ -312,10 +341,7 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
assert self.counter >= 0, (self,)
return True
if not self._notifier:
self.rawlink(send)
else:
self._notifier.args[0].append(send)
self.__add_link(send)
# Releases the object lock
self._switch_to_hub(hub_for_this_thread)
......@@ -343,6 +369,76 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
thread_lock.release()
return result
def __acquire_using_other_hub(self, owning_hub, timeout):
assert owning_hub is not get_hub_if_exists()
thread_lock = self._allocate_lock()
thread_lock.acquire()
results = []
owning_hub.loop.run_callback(
spawn_raw,
self.__acquire_from_other_thread_cb,
results,
1, # blocking,
timeout, # timeout,
thread_lock)
# We MUST use a blocking acquire here, or at least be sure we keep going
# until we acquire it. If we timed out waiting here,
# just before the callback runs, then we would be out of sync.
self.__spin_on_native_lock(thread_lock, None)
return results[0]
def __acquire_without_hubs(self, timeout):
thread_lock = self._allocate_lock()
thread_lock.acquire()
absolute_expiration = 0
begin = 0
if timeout:
absolute_expiration = monotonic() + timeout
# Cython won't compile a lambda here
link = _LockReleaseLink(thread_lock)
while 1:
self.__add_link(link)
if absolute_expiration:
begin = monotonic()
got_native = self.__spin_on_native_lock(thread_lock, timeout)
self._quiet_unlink_all(link)
if got_native:
if self.acquire(0):
return True
if absolute_expiration:
now = monotonic()
if now >= absolute_expiration:
return False
duration = now - begin
timeout -= duration
if timeout <= 0:
return False
def __spin_on_native_lock(self, thread_lock, timeout):
expiration = 0
if timeout:
expiration = monotonic() + timeout
self._drop_lock_for_switch_out()
try:
# TODO: When timeout is given and the lock supports that
# (Python 3), pass that.
# Python 2 has terrible behaviour where lock acquires can't
# be interrupted, so we use a spin loop
while not thread_lock.acquire(0):
if expiration and monotonic() >= expiration:
return False
_native_sleep(0.001)
return True
finally:
self._acquire_lock_for_switch_in()
class BoundedSemaphore(Semaphore):
"""
BoundedSemaphore(value=1) -> BoundedSemaphore
......@@ -356,6 +452,10 @@ class BoundedSemaphore(Semaphore):
If not given, *value* defaults to 1.
"""
__slots__ = (
'_initial_value',
)
#: For monkey-patching, allow changing the class of error we raise
_OVER_RELEASE_ERROR = ValueError
......@@ -370,7 +470,6 @@ class BoundedSemaphore(Semaphore):
"""
if self.counter >= self._initial_value:
raise self._OVER_RELEASE_ERROR("Semaphore released too many times")
counter = Semaphore.release(self)
# When we are absolutely certain that no one holds this semaphore,
# release our hub and go back to floating. This assists in cross-thread
......
......@@ -294,6 +294,10 @@ class loop(AbstractLoop):
libev.ev_timer_stop(self._timer0)
def _setup_for_run_callback(self):
# XXX: libuv needs to start the callback timer to be sure
# that the loop wakes up and calls this. Our C version doesn't
# do this.
# self._start_callback_timer()
self.ref() # we should go through the loop now
def destroy(self):
......
......@@ -22,31 +22,36 @@ from functools import wraps
def wrap_error_fatal(method):
import gevent
system_error = gevent.get_hub().SYSTEM_ERROR
from gevent._hub_local import get_hub_class
system_error = get_hub_class().SYSTEM_ERROR
@wraps(method)
def wrapper(self, *args, **kwargs):
# XXX should also be able to do gevent.SYSTEM_ERROR = object
# which is a global default to all hubs
gevent.get_hub().SYSTEM_ERROR = object
get_hub_class().SYSTEM_ERROR = object
try:
return method(self, *args, **kwargs)
finally:
gevent.get_hub().SYSTEM_ERROR = system_error
get_hub_class().SYSTEM_ERROR = system_error
return wrapper
def wrap_restore_handle_error(method):
import gevent
old = gevent.get_hub().handle_error
from gevent._hub_local import get_hub_if_exists
from gevent import getcurrent
@wraps(method)
def wrapper(self, *args, **kwargs):
try:
return method(self, *args, **kwargs)
finally:
gevent.get_hub().handle_error = old
# Remove any customized handle_error, if set on the
# instance.
try:
del get_hub_if_exists().handle_error
except AttributeError:
pass
if self.peek_error()[0] is not None:
gevent.getcurrent().throw(*self.peek_error()[1:])
getcurrent().throw(*self.peek_error()[1:])
return wrapper
......@@ -16,7 +16,6 @@ from gevent.lock import BoundedSemaphore
import gevent.testing as greentest
from gevent.testing import timing
from gevent.testing import flaky
class TestSemaphore(greentest.TestCase):
......@@ -132,6 +131,7 @@ class TestSemaphoreMultiThread(greentest.TestCase):
acquired, exc_info,
**thread_acquire_kwargs
))
t.daemon = True
t.start()
thread_running.wait(10) # implausibly large time
if release:
......@@ -151,6 +151,14 @@ class TestSemaphoreMultiThread(greentest.TestCase):
self.assertEqual(acquired, [True])
if not release and thread_acquire_kwargs.get("timeout"):
# Spin the loop to be sure that the timeout has a chance to
# process. Interleave this with something that drops the GIL
# so the background thread has a chance to notice that.
for _ in range(3):
gevent.idle()
if thread_acquired.wait(timing.LARGE_TICK):
break
thread_acquired.wait(timing.LARGE_TICK * 5)
if require_thread_acquired_to_finish:
self.assertTrue(thread_acquired.is_set())
......@@ -210,9 +218,12 @@ class TestSemaphoreMultiThread(greentest.TestCase):
acquired, exc_info,
timeout=timing.LARGE_TICK
))
thread.daemon = True
gevent.idle()
sem.release()
glet.join()
for _ in range(3):
gevent.idle()
thread.join(timing.LARGE_TICK)
self.assertEqual(glet.value, True)
......@@ -260,13 +271,20 @@ class TestSemaphoreMultiThread(greentest.TestCase):
sem.acquire(*acquire_args)
sem.release()
results[ix] = i
if not create_hub:
# We don't artificially create the hub.
self.assertIsNone(
get_hub_if_exists(),
(get_hub_if_exists(), ix, i)
)
if create_hub and i % 10 == 0:
gevent.sleep(timing.SMALLEST_RELIABLE_DELAY)
elif i % 100 == 0:
native_sleep(timing.SMALLEST_RELIABLE_DELAY)
except Exception as ex: # pylint:disable=broad-except
import traceback; traceback.print_exc()
results[ix] = ex
results[ix] = str(ex)
ex = None
finally:
hub = get_hub_if_exists()
if hub is not None:
......@@ -285,23 +303,14 @@ class TestSemaphoreMultiThread(greentest.TestCase):
while t1.is_alive() or t2.is_alive():
cur = list(results)
t1.join(2)
t2.join(2)
t1.join(7)
t2.join(7)
if cur == results:
# Hmm, after two seconds, no progress
print("No progress!", cur, results, t1, t2)
from gevent.util import print_run_info
print_run_info()
run = False
break
try:
self.assertEqual(results, [count - 1, count - 1])
except AssertionError:
if greentest.PY2:
flaky.reraiseFlakyTestRaceCondition()
else:
raise
def test_dueling_threads_timeout(self):
self.test_dueling_threads((True, 4))
......
......@@ -116,14 +116,7 @@ class LockType(BoundedSemaphore):
if timeout > self._TIMEOUT_MAX:
raise OverflowError('timeout value is too large')
acquired = BoundedSemaphore.acquire(self, 0)
if not acquired and getcurrent() is not get_hub_if_exists() and blocking and not timeout:
# If we would block forever, and we're not in the hub, and a trivial non-blocking
# check didn't get us the lock, then try to run pending callbacks that might
# release the lock.
sleep()
if not acquired:
try:
acquired = BoundedSemaphore.acquire(self, blocking, timeout)
except LoopExit:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment