Commit 50a3130b authored by Jason Madden's avatar Jason Madden

Optimizations for threadpool

Especially for map. None of the pools really need map to go through
imap since they have to wait for everything anyway and they return
results ordererd.

| Benchmark          | 36_threadpool_master | 36_threadpool_opt_cond5     |
|--------------------|----------------------|-----------------------------|
| imap_unordered_seq | 1.15 ms              | 1.07 ms: 1.08x faster (-7%) |
| imap_unordered_par | 1.02 ms              | 950 us: 1.08x faster (-7%)  |
| imap_seq           | 1.17 ms              | 1.10 ms: 1.06x faster (-6%) |
| imap_par           | 1.07 ms              | 1000 us: 1.07x faster (-7%) |
| map_seq            | 1.16 ms              | 724 us: 1.60x faster (-37%) |
| map_par            | 1.07 ms              | 646 us: 1.66x faster (-40%) |
| apply              | 1.22 ms              | 1.14 ms: 1.07x faster (-7%) |
| spawn              | 1.21 ms              | 1.13 ms: 1.07x faster (-7%) |
parent 277e34ca
# -*- coding: utf-8 -*-
"""
Benchmarks for thread locals.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import perf
from gevent.threadpool import ThreadPool
try:
xrange = xrange
except NameError:
xrange = range
def noop():
"Does nothing"
def identity(i):
return i
PAR_COUNT = 5
N = 20
def bench_apply(loops):
pool = ThreadPool(1)
t0 = perf.perf_counter()
for _ in xrange(loops):
for _ in xrange(N):
pool.apply(noop)
pool.join()
pool.kill()
return perf.perf_counter() - t0
def bench_spawn_wait(loops):
pool = ThreadPool(1)
t0 = perf.perf_counter()
for _ in xrange(loops):
for _ in xrange(N):
r = pool.spawn(noop)
r.get()
pool.join()
pool.kill()
return perf.perf_counter() - t0
def _map(pool, pool_func, loops):
data = [1] * N
t0 = perf.perf_counter()
# Must collect for imap to finish
for _ in xrange(loops):
list(pool_func(identity, data))
pool.join()
pool.kill()
return perf.perf_counter() - t0
def _ppool():
pool = ThreadPool(PAR_COUNT)
pool.size = PAR_COUNT
return pool
def bench_map_seq(loops):
pool = ThreadPool(1)
return _map(pool, pool.map, loops)
def bench_map_par(loops):
pool = _ppool()
return _map(pool, pool.map, loops)
def bench_imap_seq(loops):
pool = ThreadPool(1)
return _map(pool, pool.imap, loops)
def bench_imap_par(loops):
pool = _ppool()
return _map(pool, pool.imap, loops)
def bench_imap_un_seq(loops):
pool = ThreadPool(1)
return _map(pool, pool.imap_unordered, loops)
def bench_imap_un_par(loops):
pool = _ppool()
return _map(pool, pool.imap_unordered, loops)
def main():
runner = perf.Runner()
runner.bench_time_func('imap_unordered_seq',
bench_imap_un_seq)
runner.bench_time_func('imap_unordered_par',
bench_imap_un_par)
runner.bench_time_func('imap_seq',
bench_imap_seq)
runner.bench_time_func('imap_par',
bench_imap_par)
runner.bench_time_func('map_seq',
bench_map_seq)
runner.bench_time_func('map_par',
bench_map_par)
runner.bench_time_func('apply',
bench_apply)
runner.bench_time_func('spawn',
bench_spawn_wait)
if __name__ == '__main__':
main()
......@@ -8,14 +8,12 @@ This module is missing 'Thread' class, but includes 'Queue'.
from __future__ import absolute_import
from collections import deque
from itertools import islice as _islice
from gevent import monkey
from gevent._compat import thread_mod_name
__all__ = [
'Condition',
'Lock',
'Queue',
]
......@@ -26,14 +24,12 @@ start_new_thread, Lock, get_thread_ident, = monkey.get_original(thread_mod_name,
])
class Condition(object):
class _Condition(object):
# pylint:disable=method-hidden
def __init__(self, lock):
self.__lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
......@@ -50,9 +46,13 @@ class Condition(object):
except AttributeError:
pass
self.__waiters = []
# Capture the bound method to save some time returning it
self.__notify_one = self._notify_one
self.__wait = self._wait
def __enter__(self):
return self.__lock.__enter__()
self.__lock.__enter__()
return self.__wait, self.__notify_one
def __exit__(self, *args):
return self.__lock.__exit__(*args)
......@@ -74,9 +74,10 @@ class Condition(object):
return False
return True
def wait(self):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
def _wait(self):
# The condition MUST be owned; the only way to get this
# method is through __enter__, so it is guaranteed and we don't
# need to check it.
waiter = Lock()
waiter.acquire()
self.__waiters.append(waiter)
......@@ -86,19 +87,17 @@ class Condition(object):
finally:
self._acquire_restore(saved_state)
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self.__waiters
waiters_to_notify = deque(_islice(all_waiters, n))
if not waiters_to_notify:
return
for waiter in waiters_to_notify:
waiter.release()
def _notify_one(self):
# The condition MUST be owned; the only way to get this
# method is through __enter__, so it is guaranteed and we
# don't need to check it.
try:
all_waiters.remove(waiter)
except ValueError:
waiter = self.__waiters.pop()
except IndexError:
# Nobody around
pass
else:
waiter.release()
class Queue(object):
......@@ -107,17 +106,18 @@ class Queue(object):
The queue is always infinite size.
"""
__slots__ = ('_queue', '_mutex', '_not_empty', 'unfinished_tasks')
def __init__(self):
self.queue = deque()
self._queue = deque()
# mutex must be held whenever the queue is mutating. All methods
# that acquire mutex must release it before returning. mutex
# is shared between the three conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
self.mutex = Lock()
self._mutex = Lock()
# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
self.not_empty = Condition(self.mutex)
self._not_empty = _Condition(self._mutex)
self.unfinished_tasks = 0
......@@ -135,7 +135,7 @@ class Queue(object):
Raises a ValueError if called more times than there were items
placed in the queue.
"""
with self.mutex:
with self._mutex:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
......@@ -144,8 +144,7 @@ class Queue(object):
def qsize(self, len=len):
"""Return the approximate size of the queue (not reliable!)."""
with self.mutex:
return len(self.queue)
return len(self._queue)
def empty(self):
"""Return True if the queue is empty, False otherwise (not reliable!)."""
......@@ -158,16 +157,16 @@ class Queue(object):
def put(self, item):
"""Put an item into the queue.
"""
with self.mutex:
self.queue.append(item)
with self._not_empty as (_, notify_one):
self._queue.append(item)
self.unfinished_tasks += 1
self.not_empty.notify()
notify_one()
def get(self):
"""Remove and return an item from the queue.
"""
with self.mutex:
while not self.queue:
self.not_empty.wait()
item = self.queue.popleft()
with self._not_empty as (wait, _):
while not self._queue:
wait()
item = self._queue.popleft()
return item
......@@ -331,7 +331,17 @@ class GroupMappingMixin(object):
.. seealso:: :meth:`imap`
"""
return list(self.imap(func, iterable))
# We can't return until they're all done, so it doesn't much matter
# what order we wait on them in.
# We used to do:
# return list(self.imap(func, iterable))
# Which is concise but expensive.
# Must reify the list of tasks though, to get as many *actually* spawned
# as possible.
glets = [self.spawn(func, i) for i in iterable]
return [g.get() for g in glets]
def map_cb(self, func, iterable, callback=None):
result = self.map(func, iterable)
......
......@@ -219,7 +219,7 @@ class ThreadPool(GroupMappingMixin):
# we get LoopExit (why?). Previously it was done with a rawlink on the
# AsyncResult and the comment that it is "competing for order with get(); this is not
# good, just make ThreadResult release the semaphore before doing anything else"
thread_result = ThreadResult(result, hub=self.hub, call_when_ready=semaphore.release)
thread_result = ThreadResult(result, self.hub, semaphore.release)
task_queue.put((func, args, kwargs, thread_result))
self.adjust()
except:
......@@ -333,6 +333,21 @@ class ThreadPool(GroupMappingMixin):
# Always go to Greenlet because our self.spawn uses threads
return True
class _FakeAsync(object):
def send(self):
pass
close = stop = send
def __call_(self, result):
"fake out for 'receiver'"
def __bool__(self):
return False
__nonzero__ = __bool__
_FakeAsync = _FakeAsync()
class ThreadResult(object):
......@@ -340,9 +355,7 @@ class ThreadResult(object):
__slots__ = ('exc_info', 'async_watcher', '_call_when_ready', 'value',
'context', 'hub', 'receiver')
def __init__(self, receiver, hub=None, call_when_ready=None):
if hub is None:
hub = get_hub()
def __init__(self, receiver, hub, call_when_ready):
self.receiver = receiver
self.hub = hub
self.context = None
......@@ -359,48 +372,45 @@ class ThreadResult(object):
def _on_async(self):
self.async_watcher.stop()
self.async_watcher.close()
if self._call_when_ready:
# Typically this is pool.semaphore.release and we have to
# call this in the Hub; if we don't we get the dreaded
# LoopExit (XXX: Why?)
self._call_when_ready()
try:
if self.exc_info:
self.hub.handle_error(self.context, *self.exc_info)
self.context = None
self.async_watcher = None
self.async_watcher = _FakeAsync
self.hub = None
self._call_when_ready = None
if self.receiver is not None:
self._call_when_ready = _FakeAsync
self.receiver(self)
finally:
self.receiver = None
self.receiver = _FakeAsync
self.value = None
if self.exc_info:
self.exc_info = (self.exc_info[0], self.exc_info[1], None)
def destroy(self):
if self.async_watcher is not None:
self.async_watcher.stop()
self.async_watcher.close()
self.async_watcher = None
self.async_watcher = _FakeAsync
self.context = None
self.hub = None
self._call_when_ready = None
self.receiver = None
def _ready(self):
if self.async_watcher is not None:
self.async_watcher.send()
self._call_when_ready = _FakeAsync
self.receiver = _FakeAsync
def set(self, value):
self.value = value
self._ready()
self.async_watcher.send()
def handle_error(self, context, exc_info):
self.context = context
self.exc_info = exc_info
self._ready()
self.async_watcher.send()
# link protocol:
def successful(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment