Commit f7ffc65b authored by Jason Madden's avatar Jason Madden

Improve the Future returned by the ThreadPoolExecutor.

It now works as expected when both patched and not patched. It is
cooperative with greenlets in both cases. Test this.
parent 6d1818b2
......@@ -421,3 +421,23 @@ class AsyncResult(_AbstractLinkable):
self.set(source.value)
else:
self.set_exception(source.exception, getattr(source, 'exc_info', None))
# Methods to make us more like concurrent.futures.Future
def result(self, timeout=None):
return self.get(timeout=timeout)
set_result = set
def done(self):
return self.ready()
# we don't support cancelling
def cancel(self):
return False
def cancelled(self):
return False
# exception is a method, we use it as a property
......@@ -344,6 +344,50 @@ except ImportError:
else:
__all__.append("ThreadPoolExecutor")
from gevent.timeout import Timeout as GTimeout
class FutureProxy(object):
def __init__(self, asyncresult):
self.asyncresult = asyncresult
def set_running_or_notify_cancel(self):
# Does nothing, not even any consistency checks. It's
# meant to be internal to the executor and we don't use it.
return
def result(self, timeout=None):
try:
return self.asyncresult.result(timeout=timeout)
except GTimeout:
raise concurrent.futures.TimeoutError()
def exception(self, timeout=None):
try:
self.asyncresult.get(timeout=timeout)
return self.asyncresult.exception
except GTimeout:
raise concurrent.futures.TimeoutError()
def add_done_callback(self, fn):
if self.done():
fn(self)
else:
def wrap(f):
# we're called with the async result, but
# be sure to pass in ourself
try:
fn(self)
except Exception: # pylint: disable=broad-except
f.hub.print_exception((fn, self), *sys.exc_info())
self.rawlink(wrap)
def __str__(self):
return str(self.asyncresult)
def __getattr__(self, name):
return getattr(self.asyncresult, name)
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
"""
A version of :class:`concurrent.futures.ThreadPoolExecutor` that
......@@ -357,13 +401,9 @@ else:
self._threadpool = ThreadPool(max_workers)
def submit(self, fn, *args, **kwargs):
future = super(ThreadPoolExecutor, self).submit(fn, *args, **kwargs)
with self._shutdown_lock:
work_item = self._work_queue.get()
assert work_item.fn is fn
self._threadpool.spawn(work_item.run)
return future
future = self._threadpool.spawn(fn, *args, **kwargs)
return FutureProxy(future)
def shutdown(self, wait=True):
super(ThreadPoolExecutor, self).shutdown(wait)
......
......@@ -16,13 +16,18 @@ PYPY = hasattr(sys, 'pypy_version_info')
class TestCase(greentest.TestCase):
pool = None
def cleanup(self):
pool = getattr(self, 'pool', None)
if pool is not None:
pool.kill()
kill = getattr(pool, 'kill', None) or getattr(pool, 'shutdown', None)
kill()
del kill
del self.pool
class PoolBasicTests(TestCase):
def test_execute_async(self):
......@@ -427,11 +432,60 @@ class TestRefCount(TestCase):
if hasattr(gevent.threadpool, 'ThreadPoolExecutor'):
from concurrent.futures import TimeoutError as FutureTimeoutError
#from concurrent.futures import CancelledError as FutureCancelledError
from gevent import monkey
class TestTPE(_AbstractPoolTest):
MAP_IS_GEN = True
ClassUnderTest = gevent.threadpool.ThreadPoolExecutor
MONKEY_PATCHED = False
def test_future(self):
self.assertEqual(monkey.is_module_patched('threading'),
self.MONKEY_PATCHED)
pool = self.pool = self.ClassUnderTest(1)
calledback = []
def fn():
gevent.sleep(0.5)
return 42
def callback(future):
future.calledback = True
raise Exception("Expected, ignored")
future = pool.submit(fn)
future.add_done_callback(callback)
self.assertRaises(FutureTimeoutError, future.result, timeout=0.001)
def spawned():
return 2016
spawned_greenlet = gevent.spawn(spawned)
# Whether or not we are monkey patched, the background
# greenlet we spawned got to run while we waited.
self.assertEqual(future.result(), 42)
self.assertTrue(future.done())
self.assertFalse(future.cancelled())
# Make sure the notifier has a chance to run so the call back
# gets called
gevent.sleep()
self.assertEqual(getattr(future, 'calledback', None), True)
self.assertTrue(spawned_greenlet.ready())
self.assertEqual(spawned_greenlet.value, 2016)
# Adding the callback again runs immediately
future.add_done_callback(lambda f: calledback.append(True))
self.assertEqual(calledback, [True])
if __name__ == '__main__':
greentest.main()
from __future__ import print_function
from gevent import monkey; monkey.patch_all()
import greentest
import gevent.threadpool
if hasattr(gevent.threadpool, 'ThreadPoolExecutor'):
from test__threadpool import TestTPE as _Base
class TestPatchedTPE(_Base):
MONKEY_PATCHED = True
del _Base
if __name__ == '__main__':
greentest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment