Commit 5843ff0b authored by Denis Bilenko's avatar Denis Bilenko

Semaphore/Lock: do not do get_hub() in __init__

because threading has a module-level lock(), it means that get_hub() was called by monkey.patch_all()

this is not good, in case use wants to fork off children, but don't actually use gevent in master or if
the user wants to configure the hub.

Also, PySemaphore is now removed (don't really see the point of maintaing the same code twice).
parent e783f8de
......@@ -21,8 +21,9 @@ class Semaphore(object):
raise ValueError("semaphore initial value must be >= 0")
self._links = []
self.counter = value
self.hub = get_hub()
self._notifier = self.hub.loop.callback()
self._notifier = None
# we don't want to do get_hub() here to allow module-level locks
# without initializing the hub
def __str__(self):
params = (self.__class__.__name__, self.counter, len(self._links))
......@@ -36,8 +37,8 @@ class Semaphore(object):
self._start_notify()
def _start_notify(self):
if self._links and self.counter > 0 and not self._notifier.active:
self._notifier.start(self._notify_links)
if self._links and self.counter > 0 and (self._notifier is None or not self._notifier.active):
self._notifier = get_hub().loop.run_callback(self._notify_links)
def _notify_links(self):
while True:
......@@ -48,7 +49,7 @@ class Semaphore(object):
try:
link(self)
except:
self.hub.handle_error((link, self), *sys.exc_info())
getcurrent().handle_error((link, self), *sys.exc_info())
if self._dirty:
break
if not self._dirty:
......@@ -83,7 +84,7 @@ class Semaphore(object):
timer = Timeout.start_new(timeout)
try:
try:
result = self.hub.switch()
result = get_hub().switch()
assert result is self, 'Invalid switch into Semaphore.wait(): %r' % (result, )
except Timeout:
ex = sys.exc_info()[1]
......@@ -108,7 +109,7 @@ class Semaphore(object):
timer = Timeout.start_new(timeout)
try:
try:
result = self.hub.switch()
result = get_hub().switch()
assert result is self, 'Invalid switch into Semaphore.acquire(): %r' % (result, )
except Timeout:
ex = sys.exc_info()[1]
......
# Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details.
"""Locking primitives"""
import sys
from gevent.hub import get_hub, getcurrent
from gevent.timeout import Timeout
from gevent.hub import getcurrent
from gevent._semaphore import Semaphore
__all__ = ['Semaphore', 'DummySemaphore', 'BoundedSemaphore', 'RLock']
class PySemaphore(object):
"""A semaphore manages a counter representing the number of release() calls minus the number of acquire() calls,
plus an initial value. The acquire() method blocks if necessary until it can return without making the counter
negative.
If not given, value defaults to 1."""
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._links = []
self.counter = value
self.hub = get_hub()
self._notifier = self.hub.loop.callback()
def __str__(self):
params = (self.__class__.__name__, self.counter, len(self._links))
return '<%s counter=%s _links[%s]>' % params
def locked(self):
return self.counter <= 0
def release(self):
self.counter += 1
self._start_notify()
def _start_notify(self):
if self._links and self.counter > 0 and not self._notifier.active:
self._notifier.start(self._notify_links)
def _notify_links(self):
while True:
self._dirty = False
for link in self._links:
if self.counter <= 0:
return
try:
link(self)
except:
self.hub.handle_error((link, self), *sys.exc_info())
if self._dirty:
break
if not self._dirty:
return
def rawlink(self, callback):
"""Register a callback to call when a counter is more than zero.
*callback* will be called in the :class:`Hub <gevent.hub.Hub>`, so it must not use blocking gevent API.
*callback* will be passed one argument: this instance.
"""
if not callable(callback):
raise TypeError('Expected callable: %r' % (callback, ))
self._links.append(callback)
self._dirty = True
def unlink(self, callback):
"""Remove the callback set by :meth:`rawlink`"""
try:
self._links.remove(callback)
self._dirty = True
except ValueError:
pass
def wait(self, timeout=None):
if self.counter > 0:
return self.counter
else:
switch = getcurrent().switch
self.rawlink(switch)
try:
timer = Timeout.start_new(timeout)
try:
try:
result = self.hub.switch()
assert result is self, 'Invalid switch into Semaphore.wait(): %r' % (result, )
except Timeout:
ex = sys.exc_info()[1]
if ex is not timer:
raise
finally:
timer.cancel()
finally:
self.unlink(switch)
return self.counter
def acquire(self, blocking=True, timeout=None):
if self.counter > 0:
self.counter -= 1
return True
elif not blocking:
return False
else:
switch = getcurrent().switch
self.rawlink(switch)
try:
timer = Timeout.start_new(timeout)
try:
try:
result = self.hub.switch()
assert result is self, 'Invalid switch into Semaphore.acquire(): %r' % (result, )
except Timeout:
ex = sys.exc_info()[1]
if ex is timer:
return False
raise
finally:
timer.cancel()
finally:
self.unlink(switch)
self.counter -= 1
assert self.counter >= 0
return True
def __enter__(self):
self.acquire()
def __exit__(self, typ, val, tb):
self.release()
try:
from gevent._semaphore import Semaphore
except ImportError:
Semaphore = PySemaphore
class DummySemaphore(object):
"""A Semaphore initialized with "infinite" initial value. Neither of its methods ever block."""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment