Commit aa16985a authored by Jason Madden's avatar Jason Madden

Add failing test for #1437

parent 0321105e
The underlying Semaphore always behaves in an atomic fashion (as if
the GIL was not released) when PURE_PYTHON is set. Previously, it only
correctly did so on PyPy.
......@@ -57,4 +57,4 @@ cdef class AbstractLinkable(object):
cdef _wait_core(self, timeout, catch=*)
cdef _wait_return_value(self, waited, wait_success)
cdef _wait(self, timeout=*)
cpdef _wait(self, timeout=*)
......@@ -11,7 +11,7 @@ from __future__ import division
from __future__ import print_function
from gevent import _semaphore
from gevent import lock
from gevent import queue
......@@ -21,7 +21,7 @@ __all__ = [
]
locals()['Greenlet'] = __import__('gevent').Greenlet
locals()['Semaphore'] = _semaphore.Semaphore
locals()['Semaphore'] = lock.Semaphore
locals()['UnboundQueue'] = queue.UnboundQueue
......
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False
###
# This file is ``gevent._semaphore`` so that it can be compiled by Cython
# individually. However, this is not the place to import from. Everyone,
# gevent internal code included, must import from ``gevent.lock``.
# The only exception are .pxd files which need access to the
# C code; the PURE_PYTHON things that have to happen and which are
# handled in ``gevent.lock``, do not apply to them.
###
from __future__ import print_function, absolute_import, division
__all__ = [
......@@ -45,6 +53,10 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
unlinks waiters before calling them.
"""
__slots__ = (
'counter',
)
def __init__(self, value=1, hub=None):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
......
......@@ -10,8 +10,12 @@ infinite bounds (:class:`DummySemaphore`), along with a reentrant lock
from __future__ import absolute_import
from gevent.hub import getcurrent
from gevent._compat import PYPY
from gevent._semaphore import Semaphore, BoundedSemaphore # pylint:disable=no-name-in-module,import-error
from gevent._compat import PURE_PYTHON
# This is the one exception to the rule of where to
# import Semaphore, obviously
from gevent import monkey
from gevent._semaphore import Semaphore
from gevent._semaphore import BoundedSemaphore
__all__ = [
......@@ -27,53 +31,60 @@ __all__ = [
# unsafe state (only when we _do_wait do we call back into Python and
# allow switching threads). Simulate that here through the use of a manual
# lock. (We use a separate lock for each semaphore to allow sys.settrace functions
# to use locks *other* than the one being traced.)
if PYPY:
# TODO: Need to use monkey.get_original?
try:
from _thread import allocate_lock as _allocate_lock # pylint:disable=import-error,useless-suppression
from _thread import get_ident as _get_ident # pylint:disable=import-error,useless-suppression
except ImportError:
# Python 2
from thread import allocate_lock as _allocate_lock # pylint:disable=import-error,useless-suppression
from thread import get_ident as _get_ident # pylint:disable=import-error,useless-suppression
_sem_lock = _allocate_lock()
def untraceable(f):
# Don't allow re-entry to these functions in a single thread, as can
# happen if a sys.settrace is used
def wrapper(self):
me = _get_ident()
try:
count = self._locking[me]
except KeyError:
count = self._locking[me] = 1
else:
count = self._locking[me] = count + 1
if count:
return
# to use locks *other* than the one being traced.) This, of course, must also
# hold for PURE_PYTHON mode when no optional C extensions are used.
try:
return f(self)
finally:
count = count - 1
if not count:
del self._locking[me]
else:
self._locking[me] = count
return wrapper
_allocate_lock, _get_ident = monkey.get_original(
('_thread', 'thread'),
('allocate_lock', 'get_ident')
)
class _OwnedLock(object):
def __init__(self):
self._owner = None
self._block = _allocate_lock()
self._locking = {}
self._count = 0
class _OwnedLock(object):
__slots__ = (
'_owner',
'_block',
'_locking',
'_count',
)
def __init__(self):
self._owner = None
self._block = _allocate_lock()
self._locking = {}
self._count = 0
# Don't allow re-entry to these functions in a single thread, as can
# happen if a sys.settrace is used.
def __begin(self):
# Return (me, count) if we should proceed, otherwise return
# None. The function should exit in that case.
# In either case, it must call __end.
me = _get_ident()
try:
count = self._locking[me]
except KeyError:
count = self._locking[me] = 1
else:
count = self._locking[me] = count + 1
return (me, count) if not count else (None, None)
def __end(self, me, count):
if me is None:
return
count = count - 1
if not count:
del self._locking[me]
else:
self._locking[me] = count
def __enter__(self):
me, lock_count = self.__begin()
try:
if me is None:
return
@untraceable
def acquire(self):
me = _get_ident()
if self._owner == me:
self._count += 1
return
......@@ -81,55 +92,70 @@ if PYPY:
self._owner = me
self._block.acquire()
self._count = 1
finally:
self.__end(me, lock_count)
def __exit__(self, t, v, tb):
self.release()
acquire = __enter__
def release(self):
me, lock_count = self.__begin()
try:
if me is None:
return
@untraceable
def release(self):
self._count = count = self._count - 1
if not count:
self._block.release()
self._owner = None
finally:
self.__end(me, lock_count)
class _AtomicSemaphore(Semaphore):
# Behaves as though the GIL was held for the duration of acquire, wait,
# and release, just as if we were in Cython.
#
# acquire, wait, and release all acquire the lock on entry and release it
# on exit. acquire and wait can call _do_wait, which must release it on entry
# on exit. acquire and wait can call _wait, which must release it on entry
# and re-acquire it for them on exit.
class _around(object):
__slots__ = ('before', 'after')
def __init__(self, before, after):
self.before = before
self.after = after
#
# Note that this does *NOT* make semaphores safe to use from multiple threads
__slots__ = (
'_lock_lock',
)
def __init__(self, *args, **kwargs):
self._lock_lock = _OwnedLock()
def __enter__(self):
self.before()
super(_AtomicSemaphore, self).__init__(*args, **kwargs)
def __exit__(self, t, v, tb):
self.after()
def _wait(self, *args, **kwargs):
self._lock_lock.release()
try:
return super(_AtomicSemaphore, self)._wait(*args, **kwargs)
finally:
self._lock_lock.acquire()
def _decorate(func, cmname):
# functools.wrap?
def wrapped(self, *args, **kwargs):
with getattr(self, cmname):
return func(self, *args, **kwargs)
return wrapped
def release(self):
with self._lock_lock:
return super(_AtomicSemaphore, self).release()
Semaphore._py3k_acquire = Semaphore.acquire = _decorate(Semaphore.acquire, '_lock_locked')
Semaphore.release = _decorate(Semaphore.release, '_lock_locked')
Semaphore.wait = _decorate(Semaphore.wait, '_lock_locked')
Semaphore._wait = _decorate(Semaphore._wait, '_lock_unlocked')
def acquire(self, blocking=True, timeout=None):
with self._lock_lock:
return super(_AtomicSemaphore, self).acquire(blocking, timeout)
_Sem_init = Semaphore.__init__
_py3k_acquire = acquire
def __init__(self, *args, **kwargs):
l = self._lock_lock = _OwnedLock()
self._lock_locked = _around(l.acquire, l.release)
self._lock_unlocked = _around(l.release, l.acquire)
def wait(self, timeout=None):
with self._lock_lock:
return super(_AtomicSemaphore, self).wait(timeout)
_Sem_init(self, *args, **kwargs)
Semaphore.__init__ = __init__
del _decorate
del untraceable
if PURE_PYTHON:
Semaphore = _AtomicSemaphore
class DummySemaphore(object):
......
......@@ -250,7 +250,9 @@ def get_original(mod_name, item_name):
retrieved.
:param str mod_name: The name of the standard library module,
e.g., ``'socket'``.
e.g., ``'socket'``. Can also be a sequence of standard library
modules giving alternate names to try, e.g., ``('thread', '_thread')``;
the first importable module will supply all *item_name* items.
:param item_name: A string or sequence of strings naming the
attribute(s) on the module ``mod_name`` to return.
......@@ -258,10 +260,22 @@ def get_original(mod_name, item_name):
``item_name`` or a sequence of original values if a
sequence was passed.
"""
mod_names = [mod_name] if isinstance(mod_name, string_types) else mod_name
if isinstance(item_name, string_types):
return _get_original(mod_name, [item_name])[0]
return _get_original(mod_name, item_name)
item_names = [item_name]
unpack = True
else:
item_names = item_name
unpack = False
for mod in mod_names:
try:
result = _get_original(mod, item_names)
except ImportError:
if mod is mod_names[-1]:
raise
else:
return result[0] if unpack else result
_NONE = object()
......
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from gevent import lock
import gevent.testing as greentest
if __name__ == '__main__':
greentest.main()
###
# This file is test__semaphore.py only for organization purposes.
# The public API,
# and the *only* correct place to import Semaphore --- even in tests ---
# is ``gevent.lock``, never ``gevent._semaphore``.
##
from __future__ import print_function
from __future__ import absolute_import
......@@ -6,16 +12,9 @@ import weakref
import gevent
import gevent.exceptions
from gevent.lock import Semaphore
from gevent.thread import allocate_lock
import gevent.testing as greentest
try:
from _thread import allocate_lock as std_allocate_lock
except ImportError: # Py2
from thread import allocate_lock as std_allocate_lock
# pylint:disable=broad-except
class TestSemaphore(greentest.TestCase):
......@@ -67,24 +66,54 @@ class TestSemaphore(greentest.TestCase):
s = Semaphore()
gevent.wait([s])
class TestLock(greentest.TestCase):
def test_release_unheld_lock(self):
std_lock = std_allocate_lock()
g_lock = allocate_lock()
try:
std_lock.release()
self.fail("Should have thrown an exception")
except Exception as e:
std_exc = e
class TestAcquireContended(greentest.TestCase):
# Tests that the object can be acquired correctly across
# multiple threads.
# Used as a base class.
try:
g_lock.release()
self.fail("Should have thrown an exception")
except Exception as e:
g_exc = e
self.assertIsInstance(g_exc, type(std_exc))
# See https://github.com/gevent/gevent/issues/1437
def _makeOne(self):
# Create an object that is associated with the current hub. If
# we don't do this now, it gets initialized lazily the first
# time it would have to block, which, in the event of threads,
# would be from an arbitrary thread.
return Semaphore(1, gevent.get_hub())
def test_acquire_in_one_then_another(self):
from gevent import monkey
self.assertFalse(monkey.is_module_patched('threading'))
import sys
import threading
sem = self._makeOne()
# Make future acquires block
print("acquiring", sem)
sem.acquire()
exc_info = []
def thread_main():
# XXX: When this is fixed, this will have to be modified
# to avoid deadlock, but being careful to still test
# the initial conditions (e.g., that this doesn't throw;
# we can't pass block=False because that bypasses the part
# that would throw.)
try:
sem.acquire()
except:
exc_info[:] = sys.exc_info()
t = threading.Thread(target=thread_main)
t.start()
t.join()
try:
self.assertEqual(exc_info, [])
finally:
exc_info = None
@greentest.skipOnPurePython("Needs C extension")
class TestCExt(greentest.TestCase):
......
from __future__ import print_function
from __future__ import absolute_import
from gevent.thread import allocate_lock
import gevent.testing as greentest
try:
from _thread import allocate_lock as std_allocate_lock
except ImportError: # Py2
from thread import allocate_lock as std_allocate_lock
class TestLock(greentest.TestCase):
def test_release_unheld_lock(self):
std_lock = std_allocate_lock()
g_lock = allocate_lock()
with self.assertRaises(Exception) as exc:
std_lock.release()
std_exc = exc.exception
with self.assertRaises(Exception) as exc:
g_lock.release()
g_exc = exc.exception
self.assertIsInstance(g_exc, type(std_exc))
if __name__ == '__main__':
greentest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment