Commit 1389dc16 authored by Denis Bilenko's avatar Denis Bilenko

Pool: use Semaphore instead of Event for implementing wait_available(); add spawn_async() method

parent b56e5709
......@@ -4,6 +4,7 @@ from gevent.hub import GreenletExit, getcurrent
from gevent.greenlet import joinall, Greenlet
from gevent.timeout import Timeout
from gevent.event import Event
from gevent.coros import Semaphore, DummySemaphore
__all__ = ['GreenletSet', 'Pool']
......@@ -197,11 +198,15 @@ class Pool(GreenletSet):
self.size = size
if greenlet_class is not None:
self.greenlet_class = greenlet_class
self._available_event = Event()
self._available_event.set()
if size is None:
self._semaphore = DummySemaphore()
else:
self._semaphore = Semaphore(size)
self._semaphore.rawlink(self._on_available)
self._waiting = []
def wait_available(self):
self._available_event.wait()
self._semaphore.wait()
def full(self):
return self.free_count() <= 0
......@@ -216,48 +221,71 @@ class Pool(GreenletSet):
self.greenlets.add(greenlet)
def start(self, greenlet):
self._available_event.wait()
self.add(greenlet)
self._semaphore.acquire()
try:
self.add(greenlet)
except:
self._semaphore.release()
raise
greenlet.start()
if self.full():
self._available_event.clear()
def spawn(self, *args, **kwargs):
self._available_event.wait()
greenlet = self.greenlet_class.spawn(*args, **kwargs)
self.add(greenlet)
self._semaphore.acquire()
try:
greenlet = self.greenlet_class.spawn(*args, **kwargs)
self.add(greenlet)
except:
self._semaphore.release()
raise
return greenlet
def _on_available(self, _semaphore):
if self._waiting:
if _semaphore.acquire(blocking=False):
greenlet = self._waiting.pop()
self.start(greenlet)
def spawn_async(self, *args, **kwargs):
greenlet = self.greenlet_class(*args, **kwargs)
if self.full():
self._available_event.clear()
self._waiting.append(greenlet)
else:
greenlet.start()
return greenlet
def spawn_link(self, *args, **kwargs):
self._available_event.wait()
greenlet = self.greenlet_class.spawn_link(*args, **kwargs)
self.add(greenlet)
if self.full():
self._available_event.clear()
self._semaphore.acquire()
try:
greenlet = self.greenlet_class.spawn_link(*args, **kwargs)
self.add(greenlet)
except:
self._semaphore.release()
raise
return greenlet
def spawn_link_value(self, *args, **kwargs):
self._available_event.wait()
greenlet = self.greenlet_class.spawn_link_value(*args, **kwargs)
self.add(greenlet)
if self.full():
self._available_event.clear()
self._semaphore.acquire()
try:
greenlet = self.greenlet_class.spawn_link_value(*args, **kwargs)
self.add(greenlet)
except:
self._semaphore.release()
raise
return greenlet
def spawn_link_exception(self, *args, **kwargs):
self._available_event.wait()
greenlet = self.greenlet_class.spawn_link_exception(*args, **kwargs)
self.add(greenlet)
if self.full():
self._available_event.clear()
self._semaphore.acquire()
try:
greenlet = self.greenlet_class.spawn_link_exception(*args, **kwargs)
self.add(greenlet)
except:
self._semaphore.release()
raise
return greenlet
def discard(self, greenlet):
GreenletSet.discard(self, greenlet)
if not self.full():
self._available_event.set()
self._semaphore.release()
def get_values(greenlets):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment