Commit b4db40b8 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1153 from gevent/threadpool-opts

Optimizations for threadpool
parents 277e34ca c21db37f
......@@ -42,6 +42,9 @@ Enhancements
- Hub objects now include the value of their ``name`` attribute in
their repr.
- Pools for greenlets and threads have lower overhead, especially for
``map``. See :pr:`1153`.
Monitoring and Debugging
------------------------
......
# -*- coding: utf-8 -*-
"""
Benchmarks for greenlet pool.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import gevent.pool
import bench_threadpool
bench_threadpool.ThreadPool = gevent.pool.Pool
if __name__ == '__main__':
bench_threadpool.main()
# -*- coding: utf-8 -*-
"""
Benchmarks for thread pool.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import perf
from gevent.threadpool import ThreadPool
try:
xrange = xrange
except NameError:
xrange = range
def noop():
"Does nothing"
def identity(i):
return i
PAR_COUNT = 5
N = 20
def bench_apply(loops):
pool = ThreadPool(1)
t0 = perf.perf_counter()
for _ in xrange(loops):
for _ in xrange(N):
pool.apply(noop)
pool.join()
pool.kill()
return perf.perf_counter() - t0
def bench_spawn_wait(loops):
pool = ThreadPool(1)
t0 = perf.perf_counter()
for _ in xrange(loops):
for _ in xrange(N):
r = pool.spawn(noop)
r.get()
pool.join()
pool.kill()
return perf.perf_counter() - t0
def _map(pool, pool_func, loops):
data = [1] * N
t0 = perf.perf_counter()
# Must collect for imap to finish
for _ in xrange(loops):
list(pool_func(identity, data))
pool.join()
pool.kill()
return perf.perf_counter() - t0
def _ppool():
pool = ThreadPool(PAR_COUNT)
pool.size = PAR_COUNT
return pool
def bench_map_seq(loops):
pool = ThreadPool(1)
return _map(pool, pool.map, loops)
def bench_map_par(loops):
pool = _ppool()
return _map(pool, pool.map, loops)
def bench_imap_seq(loops):
pool = ThreadPool(1)
return _map(pool, pool.imap, loops)
def bench_imap_par(loops):
pool = _ppool()
return _map(pool, pool.imap, loops)
def bench_imap_un_seq(loops):
pool = ThreadPool(1)
return _map(pool, pool.imap_unordered, loops)
def bench_imap_un_par(loops):
pool = _ppool()
return _map(pool, pool.imap_unordered, loops)
def main():
runner = perf.Runner()
runner.bench_time_func('imap_unordered_seq',
bench_imap_un_seq)
runner.bench_time_func('imap_unordered_par',
bench_imap_un_par)
runner.bench_time_func('imap_seq',
bench_imap_seq)
runner.bench_time_func('imap_par',
bench_imap_par)
runner.bench_time_func('map_seq',
bench_map_seq)
runner.bench_time_func('map_par',
bench_map_par)
runner.bench_time_func('apply',
bench_apply)
runner.bench_time_func('spawn',
bench_spawn_wait)
if __name__ == '__main__':
main()
......@@ -8,14 +8,12 @@ This module is missing 'Thread' class, but includes 'Queue'.
from __future__ import absolute_import
from collections import deque
from itertools import islice as _islice
from gevent import monkey
from gevent._compat import thread_mod_name
__all__ = [
'Condition',
'Lock',
'Queue',
]
......@@ -26,14 +24,13 @@ start_new_thread, Lock, get_thread_ident, = monkey.get_original(thread_mod_name,
])
class Condition(object):
class _Condition(object):
# pylint:disable=method-hidden
def __init__(self, lock):
self.__lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
self.__waiters = []
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
......@@ -49,13 +46,12 @@ class Condition(object):
self._is_owned = lock._is_owned
except AttributeError:
pass
self.__waiters = []
def __enter__(self):
return self.__lock.__enter__()
def __exit__(self, *args):
return self.__lock.__exit__(*args)
def __exit__(self, t, v, tb):
return self.__lock.__exit__(t, v, tb)
def __repr__(self):
return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
......@@ -75,8 +71,7 @@ class Condition(object):
return True
def wait(self):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
# The condition MUST be owned, but we don't check that.
waiter = Lock()
waiter.acquire()
self.__waiters.append(waiter)
......@@ -86,19 +81,15 @@ class Condition(object):
finally:
self._acquire_restore(saved_state)
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self.__waiters
waiters_to_notify = deque(_islice(all_waiters, n))
if not waiters_to_notify:
return
for waiter in waiters_to_notify:
waiter.release()
def notify_one(self):
# The condition MUST be owned, but we don't check that.
try:
all_waiters.remove(waiter)
except ValueError:
waiter = self.__waiters.pop()
except IndexError:
# Nobody around
pass
else:
waiter.release()
class Queue(object):
......@@ -107,17 +98,18 @@ class Queue(object):
The queue is always infinite size.
"""
__slots__ = ('_queue', '_mutex', '_not_empty', 'unfinished_tasks')
def __init__(self):
self.queue = deque()
self._queue = deque()
# mutex must be held whenever the queue is mutating. All methods
# that acquire mutex must release it before returning. mutex
# is shared between the three conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
self.mutex = Lock()
self._mutex = Lock()
# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
self.not_empty = Condition(self.mutex)
self._not_empty = _Condition(self._mutex)
self.unfinished_tasks = 0
......@@ -135,7 +127,7 @@ class Queue(object):
Raises a ValueError if called more times than there were items
placed in the queue.
"""
with self.mutex:
with self._mutex:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
......@@ -144,8 +136,7 @@ class Queue(object):
def qsize(self, len=len):
"""Return the approximate size of the queue (not reliable!)."""
with self.mutex:
return len(self.queue)
return len(self._queue)
def empty(self):
"""Return True if the queue is empty, False otherwise (not reliable!)."""
......@@ -158,16 +149,16 @@ class Queue(object):
def put(self, item):
"""Put an item into the queue.
"""
with self.mutex:
self.queue.append(item)
with self._not_empty:
self._queue.append(item)
self.unfinished_tasks += 1
self.not_empty.notify()
self._not_empty.notify_one()
def get(self):
"""Remove and return an item from the queue.
"""
with self.mutex:
while not self.queue:
self.not_empty.wait()
item = self.queue.popleft()
with self._not_empty:
while not self._queue:
self._not_empty.wait()
item = self._queue.popleft()
return item
......@@ -13,8 +13,8 @@ provides a way to limit concurrency: its :meth:`spawn <Pool.spawn>`
method blocks if the number of greenlets in the pool has already
reached the limit, until there is a free slot.
"""
from __future__ import print_function, absolute_import, division
from bisect import insort_right
try:
from itertools import izip
except ImportError:
......@@ -28,7 +28,11 @@ from gevent.timeout import Timeout
from gevent.event import Event
from gevent.lock import Semaphore, DummySemaphore
__all__ = ['Group', 'Pool', 'PoolFull']
__all__ = [
'Group',
'Pool',
'PoolFull',
]
class IMapUnordered(Greenlet):
......@@ -36,12 +40,11 @@ class IMapUnordered(Greenlet):
At iterator of map results.
"""
_zipped = False
def __init__(self, func, iterable, spawn=None, maxsize=None, _zipped=False):
"""
An iterator that.
:keyword callable spawn: The function we use to
:keyword int maxsize: If given and not-None, specifies the maximum number of
finished results that will be allowed to accumulated awaiting the reader;
more than that number of results will cause map function greenlets to begin
......@@ -56,7 +59,6 @@ class IMapUnordered(Greenlet):
Greenlet.__init__(self)
if spawn is not None:
self.spawn = spawn
if _zipped:
self._zipped = _zipped
self.func = func
self.iterable = iterable
......@@ -82,19 +84,14 @@ class IMapUnordered(Greenlet):
factory = DummySemaphore
self._result_semaphore = factory(maxsize)
self.count = 0
self._outstanding_tasks = 0
# The index (zero based) of the maximum number of
# results we will have.
self._max_index = -1
self.finished = False
# If the queue size is unbounded, then we want to call all
# the links (_on_finish and _on_result) directly in the hub greenlet
# for efficiency. However, if the queue is bounded, we can't do that if
# the queue might block (because if there's no waiter the hub can switch to,
# the queue simply raises Full). Therefore, in that case, we use
# the safer, somewhat-slower (because it spawns a greenlet) link() methods.
# This means that _on_finish and _on_result can be called and interleaved in any order
# if the call to self.queue.put() blocks..
# Note that right now we're not bounding the queue, instead using a semaphore.
self.rawlink(self._on_finish)
# We're iterating in a different greenlet than we're running.
def __iter__(self):
return self
......@@ -109,10 +106,11 @@ class IMapUnordered(Greenlet):
def _inext(self):
return self.queue.get()
def _ispawn(self, func, item):
def _ispawn(self, func, item, item_index):
self._result_semaphore.acquire()
self.count += 1
self._outstanding_tasks += 1
g = self.spawn(func, item) if not self._zipped else self.spawn(func, *item)
g._imap_task_index = item_index
g.rawlink(self._on_result)
return g
......@@ -120,23 +118,21 @@ class IMapUnordered(Greenlet):
try:
func = self.func
for item in self.iterable:
self._ispawn(func, item)
self._max_index += 1
self._ispawn(func, item, self._max_index)
self._on_finish(None)
except BaseException as e:
self._on_finish(e)
raise
finally:
self.__dict__.pop('spawn', None)
self.__dict__.pop('func', None)
self.__dict__.pop('iterable', None)
def _on_result(self, greenlet):
# This method can either be called in the hub greenlet (if the
# queue is unbounded) or its own greenlet. If it's called in
# its own greenlet, the calls to put() may block and switch
# greenlets, which in turn could mutate our state. So any
# state on this object that we need to look at, notably
# self.count, we need to capture or mutate *before* we put.
# (Note that right now we're not bounding the queue, but we may
# choose to do so in the future so this implementation will be left in case.)
self.count -= 1
count = self.count
# This method will be called in the hub greenlet (we rawlink)
self._outstanding_tasks -= 1
count = self._outstanding_tasks
finished = self.finished
ready = self.ready()
put_finished = False
......@@ -151,20 +147,21 @@ class IMapUnordered(Greenlet):
self.queue.put(self._iqueue_value_for_failure(greenlet))
if put_finished:
self.queue.put(self._iqueue_value_for_finished())
self.queue.put(self._iqueue_value_for_self_finished())
def _on_finish(self, _self):
def _on_finish(self, exception):
# Called in this greenlet.
if self.finished:
return
if not self.successful():
if exception is not None:
self.finished = True
self.queue.put(self._iqueue_value_for_self_failure())
self.queue.put(self._iqueue_value_for_self_failure(exception))
return
if self.count <= 0:
if self._outstanding_tasks <= 0:
self.finished = True
self.queue.put(self._iqueue_value_for_finished())
self.queue.put(self._iqueue_value_for_self_finished())
def _iqueue_value_for_success(self, greenlet):
return greenlet.value
......@@ -172,75 +169,91 @@ class IMapUnordered(Greenlet):
def _iqueue_value_for_failure(self, greenlet):
return Failure(greenlet.exception, getattr(greenlet, '_raise_exception'))
def _iqueue_value_for_finished(self):
def _iqueue_value_for_self_finished(self):
return Failure(StopIteration)
def _iqueue_value_for_self_failure(self):
return Failure(self.exception, self._raise_exception)
def _iqueue_value_for_self_failure(self, exception):
return Failure(exception, self._raise_exception)
class IMap(IMapUnordered):
# A specialization of IMapUnordered that returns items
# in the order in which they were generated, not
# the order in which they finish.
# We do this by storing tuples (order, value) in the queue
# not just value.
def __init__(self, *args, **kwargs):
self.waiting = [] # QQQ maybe deque will work faster there?
# The result dictionary: {index: value}
self._results = {}
# The index of the result to return next.
self.index = 0
self.maxindex = -1
IMapUnordered.__init__(self, *args, **kwargs)
def _inext(self):
while True:
if self.waiting and self.waiting[0][0] <= self.index:
_, value = self.waiting.pop(0)
else:
try:
value = self._results.pop(self.index)
except KeyError:
# Wait for our index to finish.
while 1:
index, value = self.queue.get()
if index > self.index:
insort_right(self.waiting, (index, value))
continue
if index == self.index:
break
else:
self._results[index] = value
self.index += 1
return value
def _ispawn(self, func, item):
g = IMapUnordered._ispawn(self, func, item)
self.maxindex += 1
g.index = self.maxindex
return g
def _iqueue_value_for_success(self, greenlet):
return (greenlet.index, IMapUnordered._iqueue_value_for_success(self, greenlet))
return (greenlet._imap_task_index, IMapUnordered._iqueue_value_for_success(self, greenlet))
def _iqueue_value_for_failure(self, greenlet):
return (greenlet.index, IMapUnordered._iqueue_value_for_failure(self, greenlet))
return (greenlet._imap_task_index, IMapUnordered._iqueue_value_for_failure(self, greenlet))
def _iqueue_value_for_finished(self):
self.maxindex += 1
return (self.maxindex, IMapUnordered._iqueue_value_for_finished(self))
def _iqueue_value_for_self_finished(self):
return (self._max_index + 1, IMapUnordered._iqueue_value_for_self_finished(self))
def _iqueue_value_for_self_failure(self):
self.maxindex += 1
return (self.maxindex, IMapUnordered._iqueue_value_for_self_failure(self))
def _iqueue_value_for_self_failure(self, exception):
return (self._max_index + 1, IMapUnordered._iqueue_value_for_self_failure(self, exception))
class GroupMappingMixin(object):
# Internal, non-public API class.
# Provides mixin methods for implementing mapping pools. Subclasses must define:
# - self.spawn(func, *args, **kwargs): a function that runs `func` with `args`
# and `awargs`, potentially asynchronously. Return a value with a `get` method that
# blocks until the results of func are available, and a `link` method.
def spawn(self, func, *args, **kwargs):
"""
A function that runs *func* with *args* and *kwargs*, potentially
asynchronously. Return a value with a ``get`` method that blocks
until the results of func are available, and a ``rawlink`` method
that calls a callback when the results are available.
# - self._apply_immediately(): should the function passed to apply be called immediately,
# synchronously?
If this object has an upper bound on how many asyncronously executing
tasks can exist, this method may block until a slot becomes available.
"""
raise NotImplementedError()
# - self._apply_async_use_greenlet(): Should apply_async directly call
# Greenlet.spawn(), bypassing self.spawn? Return true when self.spawn would block
def _apply_immediately(self):
"""
should the function passed to apply be called immediately,
synchronously?
"""
raise NotImplementedError()
# - self._apply_async_cb_spawn(callback, result): Run the given callback function, possiblly
# asynchronously, possibly synchronously.
def _apply_async_use_greenlet(self):
"""
Should apply_async directly call Greenlet.spawn(), bypassing
`spawn`?
Return true when self.spawn would block.
"""
raise NotImplementedError()
def _apply_async_cb_spawn(self, callback, result):
"""
Run the given callback function, possibly
asynchronously, possibly synchronously.
"""
raise NotImplementedError()
def apply_cb(self, func, args=None, kwds=None, callback=None):
"""
......@@ -325,13 +338,46 @@ class GroupMappingMixin(object):
return func(*args, **kwds)
return self.spawn(func, *args, **kwds).get()
def __map(self, func, iterable):
return [g.get() for g in
[self.spawn(func, i) for i in iterable]]
def map(self, func, iterable):
"""Return a list made by applying the *func* to each element of
the iterable.
.. seealso:: :meth:`imap`
"""
return list(self.imap(func, iterable))
# We can't return until they're all done and in order. It
# wouldn't seem to much matter what order we wait on them in,
# so the simple, fast (50% faster than imap) solution would be:
# return [g.get() for g in
# [self.spawn(func, i) for i in iterable]]
# If the pool size is unlimited (or more than the len(iterable)), this
# is equivalent to imap (spawn() will never block, all of them run concurrently,
# we call get() in the order the iterable was given).
# Now lets imagine the pool if is limited size. Suppose the
# func is time.sleep, our pool is limited to 3 threads, and
# our input is [10, 1, 10, 1, 1] We would start three threads,
# one to sleep for 10, one to sleep for 1, and the last to
# sleep for 10. We would block starting the fourth thread. At
# time 1, we would finish the second thread and start another
# one for time 1. At time 2, we would finish that one and
# start the last thread, and then begin executing get() on the first
# thread.
# Because it's spawn that blocks, this is *also* equivalent to what
# imap would do.
# The one remaining difference is that imap runs in its own
# greenlet, potentially changing the way the event loop runs.
# That's easy enough to do.
g = Greenlet.spawn(self.__map, func, iterable)
return g.get()
def map_cb(self, func, iterable, callback=None):
result = self.map(func, iterable)
......@@ -503,12 +549,12 @@ class Group(GroupMappingMixin):
def start(self, greenlet):
"""
Add the **unstarted** *greenlet* to the collection of greenlets
this group is monitoring, nd then start it.
this group is monitoring, and then start it.
"""
self.add(greenlet)
greenlet.start()
def spawn(self, *args, **kwargs):
def spawn(self, *args, **kwargs): # pylint:disable=arguments-differ
"""
Begin a new greenlet with the given arguments (which are passed
to the greenlet constructor) and add it to the collection of greenlets
......@@ -746,7 +792,8 @@ class Pool(Group):
until space is available.
Usually you should call :meth:`start` to track and start the greenlet
instead of using this lower-level method.
instead of using this lower-level method, or :meth:`spawn` to
also create the greenlet.
:keyword bool blocking: If True (the default), this function
will block until the pool has space or a timeout occurs. If
......
......@@ -597,7 +597,7 @@ class Channel(object):
self.getters.remove(waiter)
raise
finally:
timeout.cancel()
timeout.close()
def get_nowait(self):
return self.get(False)
......
......@@ -219,7 +219,7 @@ class ThreadPool(GroupMappingMixin):
# we get LoopExit (why?). Previously it was done with a rawlink on the
# AsyncResult and the comment that it is "competing for order with get(); this is not
# good, just make ThreadResult release the semaphore before doing anything else"
thread_result = ThreadResult(result, hub=self.hub, call_when_ready=semaphore.release)
thread_result = ThreadResult(result, self.hub, semaphore.release)
task_queue.put((func, args, kwargs, thread_result))
self.adjust()
except:
......@@ -333,6 +333,21 @@ class ThreadPool(GroupMappingMixin):
# Always go to Greenlet because our self.spawn uses threads
return True
class _FakeAsync(object):
def send(self):
pass
close = stop = send
def __call_(self, result):
"fake out for 'receiver'"
def __bool__(self):
return False
__nonzero__ = __bool__
_FakeAsync = _FakeAsync()
class ThreadResult(object):
......@@ -340,9 +355,7 @@ class ThreadResult(object):
__slots__ = ('exc_info', 'async_watcher', '_call_when_ready', 'value',
'context', 'hub', 'receiver')
def __init__(self, receiver, hub=None, call_when_ready=None):
if hub is None:
hub = get_hub()
def __init__(self, receiver, hub, call_when_ready):
self.receiver = receiver
self.hub = hub
self.context = None
......@@ -359,48 +372,45 @@ class ThreadResult(object):
def _on_async(self):
self.async_watcher.stop()
self.async_watcher.close()
if self._call_when_ready:
# Typically this is pool.semaphore.release and we have to
# call this in the Hub; if we don't we get the dreaded
# LoopExit (XXX: Why?)
self._call_when_ready()
try:
if self.exc_info:
self.hub.handle_error(self.context, *self.exc_info)
self.context = None
self.async_watcher = None
self.async_watcher = _FakeAsync
self.hub = None
self._call_when_ready = None
if self.receiver is not None:
self._call_when_ready = _FakeAsync
self.receiver(self)
finally:
self.receiver = None
self.receiver = _FakeAsync
self.value = None
if self.exc_info:
self.exc_info = (self.exc_info[0], self.exc_info[1], None)
def destroy(self):
if self.async_watcher is not None:
self.async_watcher.stop()
self.async_watcher.close()
self.async_watcher = None
self.async_watcher = _FakeAsync
self.context = None
self.hub = None
self._call_when_ready = None
self.receiver = None
def _ready(self):
if self.async_watcher is not None:
self.async_watcher.send()
self._call_when_ready = _FakeAsync
self.receiver = _FakeAsync
def set(self, value):
self.value = value
self._ready()
self.async_watcher.send()
def handle_error(self, context, exc_info):
self.context = context
self.exc_info = exc_info
self._ready()
self.async_watcher.send()
# link protocol:
def successful(self):
......
......@@ -375,13 +375,15 @@ class TestPool(greentest.TestCase): # pylint:disable=too-many-public-methods
it = self.pool.imap_unordered(sqr_random_sleep, range(SMALL_RANGE))
self.assertEqual(sorted(it), list(map(squared, range(SMALL_RANGE))))
def test_empty(self):
def test_empty_imap_unordered(self):
it = self.pool.imap_unordered(sqr, [])
self.assertEqual(list(it), [])
def test_empty_imap(self):
it = self.pool.imap(sqr, [])
self.assertEqual(list(it), [])
def test_empty_map(self):
self.assertEqual(self.pool.map(sqr, []), [])
def test_terminate(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment