Commit a671cdf6 authored by Ralf Schmitt's avatar Ralf Schmitt

fix issues with tracing and thread shutdown

python's threading module relies on the lock's class __exit__ to not
call into the tracing function set via sys.settrace until the lock
really is released. The only way to prevent this is to implement it in
C. So, we move coros.Semaphore to a cython module.

fixes the failing tests in test__threading_vs_settrace.py
parent 670f521a
......@@ -3,7 +3,7 @@
PYTHON ?= python
all: gevent/gevent.core.c gevent/gevent.ares.c
all: gevent/gevent.core.c gevent/gevent.ares.c gevent/gevent._semaphore.c
gevent/gevent.core.c: gevent/core.ppyx gevent/libev.pxd util/cythonpp.py
$(PYTHON) util/cythonpp.py -o gevent.core.c gevent/core.ppyx
......@@ -11,10 +11,14 @@ gevent/gevent.core.c: gevent/core.ppyx gevent/libev.pxd util/cythonpp.py
echo '#include "callbacks.c"' >> gevent.core.c
mv gevent.core.* gevent/
gevent/gevent.ares.c: gevent/*.pyx gevent/*.pxd
gevent/gevent.ares.c: gevent/ares.pyx gevent/core.pyx gevent/*.pxd
cython -o gevent.ares.c gevent/ares.pyx
mv gevent.ares.* gevent/
gevent/gevent._semaphore.c: gevent/_semaphore.pyx
cython -o gevent._semaphore.c gevent/_semaphore.pyx
mv gevent._semaphore.c gevent/
clean:
rm -f gevent.core.c gevent.core.h core.pyx gevent/gevent.core.c gevent/gevent.core.h gevent/core.pyx
rm -f gevent.ares.c gevent.ares.h gevent/gevent.ares.c gevent/gevent.ares.h
......
import sys
from gevent.hub import get_hub, getcurrent
from gevent.timeout import Timeout
__all__ = ['Semaphore']
class Semaphore(object):
"""A semaphore manages a counter representing the number of release() calls minus the number of acquire() calls,
plus an initial value. The acquire() method blocks if necessary until it can return without making the counter
negative.
If not given, value defaults to 1."""
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._links = []
self.counter = value
self.hub = get_hub()
self._notifier = self.hub.loop.callback()
def __str__(self):
params = (self.__class__.__name__, self.counter, len(self._links))
return '<%s counter=%s _links[%s]>' % params
def locked(self):
return self.counter <= 0
def release(self):
self.counter += 1
self._start_notify()
def _start_notify(self):
if self._links and self.counter > 0 and not self._notifier.active:
self._notifier.start(self._notify_links)
def _notify_links(self):
while True:
self._dirty = False
for link in self._links:
if self.counter <= 0:
return
try:
link(self)
except:
self.hub.handle_error((link, self), *sys.exc_info())
if self._dirty:
break
if not self._dirty:
return
def rawlink(self, callback):
"""Register a callback to call when a counter is more than zero.
*callback* will be called in the :class:`Hub <gevent.hub.Hub>`, so it must not use blocking gevent API.
*callback* will be passed one argument: this instance.
"""
if not callable(callback):
raise TypeError('Expected callable: %r' % (callback, ))
self._links.append(callback)
self._dirty = True
def unlink(self, callback):
"""Remove the callback set by :meth:`rawlink`"""
try:
self._links.remove(callback)
self._dirty = True
except ValueError:
pass
def wait(self, timeout=None):
if self.counter > 0:
return self.counter
else:
switch = getcurrent().switch
self.rawlink(switch)
try:
timer = Timeout.start_new(timeout)
try:
try:
result = self.hub.switch()
assert result is self, 'Invalid switch into Semaphore.wait(): %r' % (result, )
except Timeout:
ex = sys.exc_info()[1]
if ex is not timer:
raise
finally:
timer.cancel()
finally:
self.unlink(switch)
return self.counter
def acquire(self, blocking=True, timeout=None):
if self.counter > 0:
self.counter -= 1
return True
elif not blocking:
return False
else:
switch = getcurrent().switch
self.rawlink(switch)
try:
timer = Timeout.start_new(timeout)
try:
try:
result = self.hub.switch()
assert result is self, 'Invalid switch into Semaphore.acquire(): %r' % (result, )
except Timeout:
ex = sys.exc_info()[1]
if ex is timer:
return False
raise
finally:
timer.cancel()
finally:
self.unlink(switch)
self.counter -= 1
assert self.counter >= 0
return True
def __enter__(self):
self.acquire()
def __exit__(self, typ, val, tb):
self.release()
......@@ -9,7 +9,7 @@ from gevent.timeout import Timeout
__all__ = ['Semaphore', 'DummySemaphore', 'BoundedSemaphore', 'RLock']
class Semaphore(object):
class _Semaphore(object):
"""A semaphore manages a counter representing the number of release() calls minus the number of acquire() calls,
plus an initial value. The acquire() method blocks if necessary until it can return without making the counter
negative.
......@@ -129,6 +129,11 @@ class Semaphore(object):
def __exit__(self, typ, val, tb):
self.release()
try:
from gevent._semaphore import Semaphore
except ImportError:
Semaphore = _Semaphore
class DummySemaphore(object):
"""A Semaphore initialized with "infinite" initial value. Neither of its methods ever block."""
......
......@@ -59,6 +59,8 @@ ARES.optional = True
ext_modules = [CORE, ARES]
ext_modules.append(Extension(name="gevent._semaphore",
sources=["gevent/gevent._semaphore.c"]))
def make_universal_header(filename, *defines):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment