Commit a1f646b2 authored by Jason Madden's avatar Jason Madden

Documentation for pool and semaphore. Add return types for some Cython...

Documentation for pool and semaphore. Add return types for some Cython semaphore functions in hopes of better code.
parent 326e563f
......@@ -4,16 +4,16 @@ cdef class Semaphore:
cdef readonly object _notifier
cdef public int _dirty
cpdef locked(self)
cpdef release(self)
cpdef bint locked(self)
cpdef int release(self)
cpdef rawlink(self, object callback)
cpdef unlink(self, object callback)
cpdef wait(self, object timeout=*)
cpdef acquire(self, int blocking=*, object timeout=*)
cpdef int wait(self, object timeout=*)
cpdef bint acquire(self, int blocking=*, object timeout=*)
cpdef __enter__(self)
cpdef __exit__(self, object t, object v, object tb)
cdef class BoundedSemaphore(Semaphore):
cdef readonly int _initial_value
cpdef release(self)
cpdef int release(self)
......@@ -34,11 +34,14 @@ class Semaphore(object):
return '<%s counter=%s _links[%s]>' % params
def locked(self):
"""Return a boolean indicating whether the semaphore can be acquired.
Most useful with binary semaphores."""
return self.counter <= 0
def release(self):
self.counter += 1
self._start_notify()
return self.counter
def _start_notify(self):
if self._links and self.counter > 0 and not self._notifier:
......@@ -79,54 +82,84 @@ class Semaphore(object):
pass
def wait(self, timeout=None):
"""
Wait until it is possible to acquire this semaphore, or until the optional
*timeout* elapses.
.. warning:: If this semaphore was initialized with a size of 0,
this method will block forever if no timeout is given.
:param float timeout: If given, specifies the maximum amount of seconds
this method will block.
:return: A number indicating how many times the semaphore can be acquired
before blocking.
"""
if self.counter > 0:
return self.counter
else:
switch = getcurrent().switch
self.rawlink(switch)
switch = getcurrent().switch
self.rawlink(switch)
try:
timer = Timeout.start_new(timeout)
try:
timer = Timeout.start_new(timeout)
try:
try:
result = get_hub().switch()
assert result is self, 'Invalid switch into Semaphore.wait(): %r' % (result, )
except Timeout:
ex = sys.exc_info()[1]
if ex is not timer:
raise
finally:
timer.cancel()
result = get_hub().switch()
assert result is self, 'Invalid switch into Semaphore.wait(): %r' % (result, )
except Timeout:
ex = sys.exc_info()[1]
if ex is not timer:
raise
finally:
self.unlink(switch)
timer.cancel()
finally:
self.unlink(switch)
return self.counter
def acquire(self, blocking=True, timeout=None):
"""
Acquire the semaphore.
.. warning:: If this semaphore was initialized with a size of 0,
this method will block forever (unless a timeout is given or blocking is
set to false).
:keyword bool blocking: If True (the default), this function will block
until the semaphore is acquired.
:keyword float timeout: If given, specifies the maximum amount of seconds
this method will block.
:return: A boolean indicating whether the semaphore was acquired.
If ``blocking`` is True and ``timeout`` is None (the default), then
(so long as this semaphore was initialized with a size greater than 0)
this will always return True. If a timeout was given, and it expired before
the semaphore was acquired, False will be returned.
"""
if self.counter > 0:
self.counter -= 1
return True
elif not blocking:
if not blocking:
return False
else:
switch = getcurrent().switch
self.rawlink(switch)
switch = getcurrent().switch
self.rawlink(switch)
try:
timer = Timeout.start_new(timeout)
try:
timer = Timeout.start_new(timeout)
try:
try:
result = get_hub().switch()
assert result is self, 'Invalid switch into Semaphore.acquire(): %r' % (result, )
except Timeout:
ex = sys.exc_info()[1]
if ex is timer:
return False
raise
finally:
timer.cancel()
result = get_hub().switch()
assert result is self, 'Invalid switch into Semaphore.acquire(): %r' % (result, )
except Timeout:
ex = sys.exc_info()[1]
if ex is timer:
return False
raise
finally:
self.unlink(switch)
self.counter -= 1
assert self.counter >= 0
return True
timer.cancel()
finally:
self.unlink(switch)
self.counter -= 1
assert self.counter >= 0
return True
_py3k_acquire = acquire # PyPy needs this; it must be static for Cython
......
# Copyright (c) 2009-2011 Denis Bilenko. See LICENSE for details.
"""Managing greenlets in a group.
The :class:`Group` class in this module abstracts a group of running greenlets.
When a greenlet dies, it's automatically removed from the group.
The :class:`Pool` which a subclass of :class:`Group` provides a way to limit
concurrency: its :meth:`spawn <Pool.spawn>` method blocks if the number of
greenlets in the pool has already reached the limit, until there is a free slot.
"""
Managing greenlets in a group.
The :class:`Group` class in this module abstracts a group of running
greenlets. When a greenlet dies, it's automatically removed from the
group. All running greenlets in a group can be waited on with
:meth:`Group.joinall`, or all running greenlets can be killed with
:meth:`Group.kill`.
The :class:`Pool` class, which is a subclass of :class:`Group`,
provides a way to limit concurrency: its :meth:`spawn <Pool.spawn>`
method blocks if the number of greenlets in the pool has already
reached the limit, until there is a free slot.
"""
from bisect import insort_right
......@@ -233,7 +238,8 @@ class GroupMappingMixin(object):
def map_async(self, func, iterable, callback=None):
"""
A variant of the map() method which returns a Greenlet object.
A variant of the map() method which returns a Greenlet object that is executing
the map function.
If callback is specified then it should be a callable which accepts a
single argument.
......@@ -308,10 +314,21 @@ class Group(GroupMappingMixin):
unlink(self._discard)
def start(self, greenlet):
"""
Start the un-started *greenlet* and add it to the collection of greenlets
this group is monitoring.
"""
self.add(greenlet)
greenlet.start()
def spawn(self, *args, **kwargs):
"""
Begin a new greenlet with the given arguments (which are passed
to the greenlet constructor) and add it to the collection of greenlets
this group is monitoring.
:return: The newly started greenlet.
"""
greenlet = self.greenlet_class(*args, **kwargs)
self.start(greenlet)
return greenlet
......@@ -399,6 +416,24 @@ class Failure(object):
class Pool(Group):
def __init__(self, size=None, greenlet_class=None):
"""
Create a new pool.
A pool is like a group, but the maximum number of members
is governed by the *size* parameter.
:keyword int size: If given, this non-negative integer is the
maximum count of active greenlets that will be allowed in
this pool. A few values have special significance:
* ``None`` (the default) places no limit on the number of
greenlets. This is useful when you need to track, but not limit,
greenlets, as with :class:`gevent.pywsgi.WSGIServer`
* ``0`` creates a pool that can never have any active greenlets. Attempting
to spawn in this pool will block forever. This is only useful
if an application uses :meth:`wait_available` with a timeout and checks
:meth:`free_count` before attempting to spawn.
"""
if size is not None and size < 0:
raise ValueError('size must not be negative: %r' % (size, ))
Group.__init__(self)
......@@ -410,13 +445,36 @@ class Pool(Group):
else:
self._semaphore = Semaphore(size)
def wait_available(self):
self._semaphore.wait()
def wait_available(self, timeout=None):
"""
Wait until it's possible to spawn a greenlet in this pool.
:param float timeout: If given, only wait the specified number
of seconds.
.. warning:: If the pool was initialized with a size of 0, this
method will block forever unless a timeout is given.
:return: A number indicating how many new greenlets can be put into
the pool without blocking.
.. versionchanged:: 1.1a3
Added the ``timeout`` parameter.
"""
return self._semaphore.wait(timeout=timeout)
def full(self):
"""
Return a boolean indicating whether this pool has any room for
members. (True if it does, False if it doesn't.)
"""
return self.free_count() <= 0
def free_count(self):
"""
Return a number indicating approximately how many more members
can be added to this pool.
"""
if self.size is None:
return 1
return max(0, self.size - len(self))
......
......@@ -397,6 +397,16 @@ class TestPoolUnlimit(TestPool):
size = None
class TestPool0(greentest.TestCase):
size = 0
def test_wait_full(self):
p = pool.Pool(size=0)
self.assertEqual(0, p.free_count())
self.assertTrue(p.full())
self.assertEqual(0, p.wait_available(timeout=0.01))
class TestJoinSleep(greentest.GenericWaitTestCase):
def wait(self, timeout):
......
......@@ -53,8 +53,10 @@ if sys.platform == 'win32':
if LEAKTEST:
FAILING_TESTS += ['FLAKY test__backdoor.py']
FAILING_TESTS += ['FLAKY test__os.py']
FAILING_TESTS += [
'FLAKY test__backdoor.py',
'FLAKY test__socket_errors.py'
]
if PYPY:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment