Commit 91d3a519 authored by Jason Madden's avatar Jason Madden

Improve compatibility with c.f.wait/as_completed.

parent f7ffc65b
......@@ -345,12 +345,49 @@ else:
__all__.append("ThreadPoolExecutor")
from gevent.timeout import Timeout as GTimeout
from gevent._util import Lazy
from concurrent.futures import _base as cfb
class FutureProxy(object):
class _FutureProxy(object):
def __init__(self, asyncresult):
self.asyncresult = asyncresult
# Internal implementation details of a c.f.Future
@Lazy
def _condition(self):
from gevent import monkey
if monkey.is_module_patched('threading') or self.done():
import threading
return threading.Condition()
# We can only properly work with conditions
# when we've been monkey-patched. This is necessary
# for the wait/as_completed module functions.
raise AttributeError("_condition")
@Lazy
def _waiters(self):
self.asyncresult.rawlink(self.__when_done)
return []
def __when_done(self, _):
# We should only be called when _waiters has
# already been accessed.
waiters = self.__dict__['_waiters']
for w in waiters:
if self.successful():
w.add_result(self)
else:
w.add_exception(self)
@property
def _state(self):
if self.done():
return cfb.FINISHED
return cfb.RUNNING
def set_running_or_notify_cancel(self):
# Does nothing, not even any consistency checks. It's
# meant to be internal to the executor and we don't use it.
......@@ -375,12 +412,21 @@ else:
else:
def wrap(f):
# we're called with the async result, but
# be sure to pass in ourself
# be sure to pass in ourself. Also automatically
# unlink ourself so that we don't get called multiple
# times.
try:
fn(self)
except Exception: # pylint: disable=broad-except
f.hub.print_exception((fn, self), *sys.exc_info())
self.rawlink(wrap)
finally:
self.unlink(wrap)
self.asyncresult.rawlink(wrap)
def rawlink(self, fn):
def wrap(_aresult):
return fn(self)
self.asyncresult.rawlink(wrap)
def __str__(self):
return str(self.asyncresult)
......@@ -393,7 +439,17 @@ else:
A version of :class:`concurrent.futures.ThreadPoolExecutor` that
always uses native threads, even when threading is monkey-patched.
The ``Future`` objects returned from this object can be used
with gevent waiting primitives like :func:`gevent.wait`.
.. caution:: If threading is *not* monkey-patched, then the ``Future``
objects returned by this object are not guaranteed to work with
:func:`~concurrent.futures.as_completed` and :func:`~concurrent.futures.wait`.
The individual blocking methods like :meth:`~concurrent.futures.Future.result`
and :meth:`~concurrent.futures.Future.exception` will always work.
.. versionadded:: 1.2a1
This is a provisional API.
"""
def __init__(self, max_workers):
......@@ -403,7 +459,7 @@ else:
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
future = self._threadpool.spawn(fn, *args, **kwargs)
return FutureProxy(future)
return _FutureProxy(future)
def shutdown(self, wait=True):
super(ThreadPoolExecutor, self).shutdown(wait)
......
......@@ -433,7 +433,8 @@ class TestRefCount(TestCase):
if hasattr(gevent.threadpool, 'ThreadPoolExecutor'):
from concurrent.futures import TimeoutError as FutureTimeoutError
#from concurrent.futures import CancelledError as FutureCancelledError
from concurrent.futures import wait as cf_wait
from concurrent.futures import as_completed as cf_as_completed
from gevent import monkey
......@@ -457,10 +458,11 @@ if hasattr(gevent.threadpool, 'ThreadPoolExecutor'):
return 42
def callback(future):
future.calledback = True
future.calledback += 1
raise Exception("Expected, ignored")
future = pool.submit(fn)
future.calledback = 0
future.add_done_callback(callback)
self.assertRaises(FutureTimeoutError, future.result, timeout=0.001)
......@@ -478,7 +480,7 @@ if hasattr(gevent.threadpool, 'ThreadPoolExecutor'):
# Make sure the notifier has a chance to run so the call back
# gets called
gevent.sleep()
self.assertEqual(getattr(future, 'calledback', None), True)
self.assertEqual(future.calledback, 1)
self.assertTrue(spawned_greenlet.ready())
self.assertEqual(spawned_greenlet.value, 2016)
......@@ -487,5 +489,71 @@ if hasattr(gevent.threadpool, 'ThreadPoolExecutor'):
future.add_done_callback(lambda f: calledback.append(True))
self.assertEqual(calledback, [True])
# We can wait on the finished future
done, _not_done = cf_wait((future,))
self.assertEqual(list(done), [future])
self.assertEqual(list(cf_as_completed((future,))), [future])
# Doing so does not call the callback again
self.assertEqual(future.calledback, 1)
# even after a trip around the event loop
gevent.sleep()
self.assertEqual(future.calledback, 1)
def test_future_wait_module_function(self):
# Instead of waiting on the result, we can wait
# on the future using the module functions
self.assertEqual(monkey.is_module_patched('threading'),
self.MONKEY_PATCHED)
pool = self.pool = self.ClassUnderTest(1)
def fn():
gevent.sleep(0.5)
return 42
future = pool.submit(fn)
if self.MONKEY_PATCHED:
# Things work as expected when monkey-patched
_done, not_done = cf_wait((future,), timeout=0.001)
self.assertEqual(list(not_done), [future])
def spawned():
return 2016
spawned_greenlet = gevent.spawn(spawned)
done, _not_done = cf_wait((future,))
self.assertEqual(list(done), [future])
self.assertTrue(spawned_greenlet.ready())
self.assertEqual(spawned_greenlet.value, 2016)
else:
# When not monkey-patched, raises an AttributeError
self.assertRaises(AttributeError, cf_wait, (future,))
def test_future_wait_gevent_function(self):
# The future object can be waited on with gevent functions.
self.assertEqual(monkey.is_module_patched('threading'),
self.MONKEY_PATCHED)
pool = self.pool = self.ClassUnderTest(1)
def fn():
gevent.sleep(0.5)
return 42
future = pool.submit(fn)
def spawned():
return 2016
spawned_greenlet = gevent.spawn(spawned)
done = gevent.wait((future,))
self.assertEqual(list(done), [future])
self.assertTrue(spawned_greenlet.ready())
self.assertEqual(spawned_greenlet.value, 2016)
if __name__ == '__main__':
greentest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment