Commit 35f6301d authored by Raymond Hettinger's avatar Raymond Hettinger Committed by GitHub

bpo-10978: Semaphores can release multiple threads at a time (GH-15588)

parent 0dac68f1
...@@ -802,11 +802,14 @@ Semaphores also support the :ref:`context management protocol <with-locks>`. ...@@ -802,11 +802,14 @@ Semaphores also support the :ref:`context management protocol <with-locks>`.
.. versionchanged:: 3.2 .. versionchanged:: 3.2
The *timeout* parameter is new. The *timeout* parameter is new.
.. method:: release() .. method:: release(n=1)
Release a semaphore, incrementing the internal counter by *n*. When it
was zero on entry and other threads are waiting for it to become larger
than zero again, wake up *n* of those threads.
Release a semaphore, incrementing the internal counter by one. When it .. versionchanged:: 3.9
was zero on entry and another thread is waiting for it to become larger Added the *n* parameter to release multiple waiting threads at once.
than zero again, wake up that thread.
.. class:: BoundedSemaphore(value=1) .. class:: BoundedSemaphore(value=1)
......
...@@ -663,6 +663,38 @@ class BaseSemaphoreTests(BaseTestCase): ...@@ -663,6 +663,38 @@ class BaseSemaphoreTests(BaseTestCase):
b.wait_for_finished() b.wait_for_finished()
self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1)) self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1))
def test_multirelease(self):
sem = self.semtype(7)
sem.acquire()
results1 = []
results2 = []
phase_num = 0
def f():
sem.acquire()
results1.append(phase_num)
sem.acquire()
results2.append(phase_num)
b = Bunch(f, 10)
b.wait_for_started()
while len(results1) + len(results2) < 6:
_wait()
self.assertEqual(results1 + results2, [0] * 6)
phase_num = 1
sem.release(7)
while len(results1) + len(results2) < 13:
_wait()
self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
phase_num = 2
sem.release(6)
while len(results1) + len(results2) < 19:
_wait()
self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
# The semaphore is still locked
self.assertFalse(sem.acquire(False))
# Final release, to let the last thread finish
sem.release()
b.wait_for_finished()
def test_try_acquire(self): def test_try_acquire(self):
sem = self.semtype(2) sem = self.semtype(2)
self.assertTrue(sem.acquire(False)) self.assertTrue(sem.acquire(False))
......
...@@ -439,16 +439,19 @@ class Semaphore: ...@@ -439,16 +439,19 @@ class Semaphore:
__enter__ = acquire __enter__ = acquire
def release(self): def release(self, n=1):
"""Release a semaphore, incrementing the internal counter by one. """Release a semaphore, incrementing the internal counter by one or more.
When the counter is zero on entry and another thread is waiting for it When the counter is zero on entry and another thread is waiting for it
to become larger than zero again, wake up that thread. to become larger than zero again, wake up that thread.
""" """
if n < 1:
raise ValueError('n must be one or more')
with self._cond: with self._cond:
self._value += 1 self._value += n
self._cond.notify() for i in range(n):
self._cond.notify()
def __exit__(self, t, v, tb): def __exit__(self, t, v, tb):
self.release() self.release()
...@@ -475,8 +478,8 @@ class BoundedSemaphore(Semaphore): ...@@ -475,8 +478,8 @@ class BoundedSemaphore(Semaphore):
Semaphore.__init__(self, value) Semaphore.__init__(self, value)
self._initial_value = value self._initial_value = value
def release(self): def release(self, n=1):
"""Release a semaphore, incrementing the internal counter by one. """Release a semaphore, incrementing the internal counter by one or more.
When the counter is zero on entry and another thread is waiting for it When the counter is zero on entry and another thread is waiting for it
to become larger than zero again, wake up that thread. to become larger than zero again, wake up that thread.
...@@ -485,11 +488,14 @@ class BoundedSemaphore(Semaphore): ...@@ -485,11 +488,14 @@ class BoundedSemaphore(Semaphore):
raise a ValueError. raise a ValueError.
""" """
if n < 1:
raise ValueError('n must be one or more')
with self._cond: with self._cond:
if self._value >= self._initial_value: if self._value + n > self._initial_value:
raise ValueError("Semaphore released too many times") raise ValueError("Semaphore released too many times")
self._value += 1 self._value += n
self._cond.notify() for i in range(n):
self._cond.notify()
class Event: class Event:
......
Semaphores and BoundedSemaphores can now release more than one waiting
thread at a time.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment