Commit aae6b469 authored by Jason Madden's avatar Jason Madden

Merge branch 'issue1331'

parents 9f2b1fd9 c2665e8f
......@@ -51,6 +51,17 @@
- Fix some potential crashes using libuv async watchers.
- Make ``ThreadPool`` consistently raise ``InvalidThreadUseError``
when ``spawn`` is called from a thread different than the thread
that created the threadpool. This has never been allowed, but was
inconsistently enforced. On gevent 1.3 and before, this would always
raise "greenlet error: invalid thread switch," or ``LoopExit``. On
gevent 1.4, it *could* raise ``LoopExit``, depending on the number
of tasks, but still, calling it from a different thread was likely
to corrupt libev or libuv internals.
- Remove some undocumented, deprecated functions from the threadpool module.
1.5a2 (2019-10-21)
==================
......
......@@ -23,7 +23,7 @@
the `func` synchronously.
.. note:: As implemented, attempting to use
:meth:`Threadpool.appy` from inside another function that
:meth:`Threadpool.apply` from inside another function that
was itself spawned in a threadpool (any threadpool) will
cause the function to be run immediately.
......
......@@ -38,7 +38,7 @@ class AbstractLinkable(object):
'__weakref__'
)
def __init__(self):
def __init__(self, hub=None):
# Before this implementation, AsyncResult and Semaphore
# maintained the order of notifications, but Event did not.
......@@ -71,7 +71,7 @@ class AbstractLinkable(object):
self._notify_all = True
# we don't want to do get_hub() here to allow defining module-level objects
# without initializing the hub
self.hub = None
self.hub = hub
def linkcount(self):
# For testing: how many objects are linked to this one?
......
......@@ -44,10 +44,10 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
unlinks waiters before calling them.
"""
def __init__(self, value=1):
def __init__(self, value=1, hub=None):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
super(Semaphore, self).__init__()
super(Semaphore, self).__init__(hub)
self.counter = value
self._notify_all = False
......
......@@ -77,3 +77,26 @@ class ConcurrentObjectUseError(AssertionError):
.. seealso:: `gevent.socket.wait`
"""
class InvalidThreadUseError(RuntimeError):
"""
Raised when an object is used from a different thread than
the one it is bound to.
Some objects, such as gevent sockets, semaphores, and threadpools,
are tightly bound to their hub and its loop. The hub and loop
are not thread safe, with a few exceptions. Attempting to use
such objects from a different thread is an error, and may cause
problems ranging from incorrect results to memory corruption
and a crashed process.
In some cases, gevent catches this "accidentally", and the result is
a `LoopExit`. In some cases, gevent doesn't catch this at all.
In other cases (typically when the consequences are suspected to
be more on the more severe end of the scale, and when the operation in
question is already relatively heavyweight), gevent explicitly checks
for this usage and will raise this exception when it is detected.
.. versionadded:: 1.5a3
"""
......@@ -40,6 +40,8 @@ class GroupMappingMixin(object):
# Internal, non-public API class.
# Provides mixin methods for implementing mapping pools. Subclasses must define:
__slots__ = ()
def spawn(self, func, *args, **kwargs):
"""
A function that runs *func* with *args* and *kwargs*, potentially
......
......@@ -6,11 +6,13 @@ import random
import weakref
import gc
import gevent.testing as greentest
import gevent.threadpool
from gevent.threadpool import ThreadPool
import gevent
from gevent.exceptions import InvalidThreadUseError
import gevent.testing as greentest
from gevent.testing import ExpectedException
from gevent.testing import PYPY
......@@ -34,34 +36,36 @@ class TestCase(greentest.TestCase):
# These generally need more time
__timeout__ = greentest.LARGE_TIMEOUT
pool = None
_all_pools = ()
ClassUnderTest = ThreadPool
def _FUT(self):
return self.ClassUnderTest
def _makeOne(self, size, increase=greentest.RUN_LEAKCHECKS):
self.pool = pool = self._FUT()(size)
if increase:
def _makeOne(self, maxsize, create_all_worker_threads=greentest.RUN_LEAKCHECKS):
self.pool = pool = self._FUT()(maxsize)
self._all_pools += (pool,)
if create_all_worker_threads:
# Max size to help eliminate false positives
self.pool.size = size
self.pool.size = maxsize
return pool
def cleanup(self):
pool = self.pool
if pool is not None:
self.pool = None
all_pools, self._all_pools = self._all_pools, ()
for pool in all_pools:
kill = getattr(pool, 'kill', None) or getattr(pool, 'shutdown')
kill()
del kill
del self.pool
if greentest.RUN_LEAKCHECKS:
# Each worker thread created a greenlet object and switched to it.
# It's a custom subclass, but even if it's not, it appears that
# the root greenlet for the new thread sticks around until there's a
# gc. Simply calling 'getcurrent()' is enough to "leak" a greenlet.greenlet
# and a weakref.
for _ in range(3):
gc.collect()
if greentest.RUN_LEAKCHECKS:
# Each worker thread created a greenlet object and switched to it.
# It's a custom subclass, but even if it's not, it appears that
# the root greenlet for the new thread sticks around until there's a
# gc. Simply calling 'getcurrent()' is enough to "leak" a greenlet.greenlet
# and a weakref.
for _ in range(3):
gc.collect()
class PoolBasicTests(TestCase):
......@@ -353,7 +357,7 @@ class TestSpawn(TestCase):
switch_expected = True
@greentest.ignores_leakcheck
def test(self):
def test_basics(self):
pool = self._makeOne(1)
self.assertEqual(len(pool), 0)
log = []
......@@ -371,6 +375,20 @@ class TestSpawn(TestCase):
self.assertEqual(log, ['a', 'b'])
self.assertEqual(len(pool), 0)
@greentest.ignores_leakcheck
def test_cannot_spawn_from_other_thread(self):
# Only the thread that owns a threadpool can spawn to it;
# this is because the threadpool uses the creating thread's hub,
# which is not threadsafe.
pool1 = self._makeOne(1)
pool2 = self._makeOne(2)
def func():
pool2.spawn(lambda: "Hi")
res = pool1.spawn(func)
with self.assertRaises(InvalidThreadUseError):
res.get()
def error_iter():
yield 1
......@@ -430,7 +448,7 @@ class TestSize(TestCase):
@greentest.reraises_flaky_race_condition()
def test(self):
pool = self.pool = self._makeOne(2, increase=False)
pool = self.pool = self._makeOne(2, create_all_worker_threads=False)
self.assertEqual(pool.size, 0)
pool.size = 1
self.assertEqual(pool.size, 1)
......
# Copyright (c) 2012 Denis Bilenko. See LICENSE for details.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
import os
from weakref import ref as wref
from greenlet import greenlet as RawGreenlet
from gevent._compat import integer_types
from gevent.event import AsyncResult
from gevent.exceptions import InvalidThreadUseError
from gevent.greenlet import Greenlet
from gevent.hub import _get_hub
from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import getcurrent
from gevent.hub import sleep
from gevent.hub import _get_hub
from gevent.event import AsyncResult
from gevent.greenlet import Greenlet
from gevent.pool import GroupMappingMixin
from gevent.lock import Semaphore
from gevent.pool import GroupMappingMixin
from gevent._threading import Lock
from gevent._threading import Queue
......@@ -36,41 +38,115 @@ class _WorkerGreenlet(RawGreenlet):
def __init__(self, threadpool):
RawGreenlet.__init__(self, threadpool._worker)
self.thread_ident = get_thread_ident()
self._threadpool_wref = wref(threadpool)
self._threadpool = threadpool
# Inform the gevent.util.GreenletTree that this should be
# considered the root (for printing purposes) and to
# ignore the parent attribute. (We can't set parent to None.)
self.greenlet_tree_is_root = True
self.parent.greenlet_tree_is_ignored = True
@classmethod
def run_in_worker_thread(cls, threadpool):
# The root function of each new worker thread.
glet = cls(threadpool)
glet.switch()
def __repr__(self):
return "<ThreadPoolWorker at 0x%x thread_ident=0x%x %s>" % (
id(self),
self.thread_ident,
self._threadpool_wref())
self._threadpool
)
class ThreadPool(GroupMappingMixin):
# TODO: Document thread safety restrictions.
"""
A pool of native worker threads.
This can be useful for CPU intensive functions, or those that
otherwise will not cooperate with gevent. The best functions to execute
in a thread pool are small functions with a single purpose; ideally they release
the CPython GIL. Such functions are extension functions implemented in C.
It implements the same operations as a :class:`gevent.pool.Pool`,
but using threads instead of greenlets.
.. note:: The method :meth:`apply_async` will always return a new
greenlet, bypassing the threadpool entirely.
Most users will not need to create instances of this class. Instead,
use the threadpool already associated with gevent's hub::
pool = gevent.get_hub().threadpool
result = pool.spawn(lambda: "Some func").get()
.. important:: It is only possible to use instances of this class from
the thread running their hub. Typically that means from the thread that
created them. Using the pattern shown above takes care of this.
There is no gevent-provided way to have a single process-wide limit on the
number of threads in various pools when doing that, however. The suggested
way to use gevent and threadpools is to have a single gevent hub
and its one threadpool (which is the default without doing any extra work).
Only dispatch minimal blocking functions to the threadpool, functions that
do not use the gevent hub.
The `len` of instances of this class is the number of enqueued
(unfinished) tasks.
.. caution:: Instances of this class are only true if they have
unfinished tasks.
.. versionchanged:: 1.5a3
The undocumented ``apply_e`` function, deprecated since 1.1,
was removed.
"""
__slots__ = (
'hub',
'_maxsize',
# A Greenlet that runs to adjust the number of worker
# threads.
'manager',
# The PID of the process we were created in.
# Used to help detect a fork and then re-create
# internal state.
'pid',
'fork_watcher',
# A semaphore initialized with ``maxsize`` counting the
# number of available worker threads we have. As a
# gevent.lock.Semaphore, this is only safe to use from a single
# native thread.
'_available_worker_threads_greenlet_sem',
# A native threading lock, used to protect internals
# that are safe to call across multiple threads.
'_native_thread_internal_lock',
# The number of running worker threads
'_num_worker_threads',
# The task queue is itself safe to use from multiple
# native threads.
'task_queue',
)
def __init__(self, maxsize, hub=None):
if hub is None:
hub = get_hub()
self.hub = hub
self._maxsize = 0
self.manager = None
self.pid = os.getpid()
self.manager = None
self.task_queue = Queue()
self.fork_watcher = None
self._maxsize = 0
# Note that by starting with 1, we actually allow
# maxsize + 1 tasks in the queue.
self._available_worker_threads_greenlet_sem = Semaphore(1, hub)
self._native_thread_internal_lock = Lock()
self._num_worker_threads = 0
self._set_maxsize(maxsize)
self.fork_watcher = hub.loop.fork(ref=False)
try:
self._init(maxsize)
except:
self.fork_watcher.close()
raise
def _set_maxsize(self, maxsize):
if not isinstance(maxsize, integer_types):
......@@ -78,16 +154,23 @@ class ThreadPool(GroupMappingMixin):
if maxsize < 0:
raise ValueError('maxsize must not be negative: %r' % (maxsize, ))
difference = maxsize - self._maxsize
self._semaphore.counter += difference
self._available_worker_threads_greenlet_sem.counter += difference
self._maxsize = maxsize
self.adjust()
# make sure all currently blocking spawn() start unlocking if maxsize increased
self._semaphore._start_notify()
self._available_worker_threads_greenlet_sem._start_notify()
def _get_maxsize(self):
return self._maxsize
maxsize = property(_get_maxsize, _set_maxsize)
maxsize = property(_get_maxsize, _set_maxsize, doc="""\
The maximum allowed number of worker threads.
This is also (approximately) a limit on the number of tasks that
can be queued without blocking the waiting greenlet. If this many
tasks are already running, then the next greenlet that submits a task
will block waiting for a task to finish.
""")
def __repr__(self):
return '<%s at 0x%x %s/%s/%s hub=<%s at 0x%x thread_ident=0x%s>>' % (
......@@ -103,7 +186,9 @@ class ThreadPool(GroupMappingMixin):
return self.task_queue.unfinished_tasks
def _get_size(self):
return self._size
# TODO: This probably needs to acquire the lock. We
# modify this from self._add_thread under a lock.
return self._num_worker_threads
def _set_size(self, size):
if size < 0:
......@@ -112,29 +197,32 @@ class ThreadPool(GroupMappingMixin):
raise ValueError('Size of the pool cannot be bigger than maxsize: %r > %r' % (size, self._maxsize))
if self.manager:
self.manager.kill()
while self._size < size:
while self._num_worker_threads < size:
self._add_thread()
delay = self.hub.loop.approx_timer_resolution
while self._size > size:
while self._size - size > self.task_queue.unfinished_tasks:
while self._num_worker_threads > size:
while self._num_worker_threads - size > self.task_queue.unfinished_tasks:
self.task_queue.put(None)
if getcurrent() is self.hub:
break
sleep(delay)
delay = min(delay * 2, .05)
if self._size:
if self._num_worker_threads:
self.fork_watcher.start(self._on_fork)
else:
self.fork_watcher.stop()
size = property(_get_size, _set_size)
size = property(_get_size, _set_size, doc="""\
The number of running pooled worker threads.
Setting this attribute will add or remove running
worker threads, up to `maxsize`.
Initially there are no pooled running worker threads, and
threads are created on demand to satisfy concurrent
requests up to `maxsize` threads.
""")
def _init(self, maxsize):
self._size = 0
self._semaphore = Semaphore(1)
self._lock = Lock()
self.task_queue = Queue()
self._set_maxsize(maxsize)
def _on_fork(self):
# fork() only leaves one thread; also screws up locks;
......@@ -142,11 +230,9 @@ class ThreadPool(GroupMappingMixin):
# NOTE: See comment in gevent.hub.reinit.
pid = os.getpid()
if pid != self.pid:
self.pid = pid
# Do not mix fork() and threads; since fork() only copies one thread
# all objects referenced by other threads has refcount that will never
# go down to 0.
self._init(self._maxsize)
# The OS threads have been destroyed, but the Python objects
# may live on, creating refcount "leaks"
self.__init__(self._maxsize)
def join(self):
"""Waits until all outstanding tasks have been completed."""
......@@ -161,68 +247,87 @@ class ThreadPool(GroupMappingMixin):
def _adjust_step(self):
# if there is a possibility & necessity for adding a thread, do it
while self._size < self._maxsize and self.task_queue.unfinished_tasks > self._size:
while (self._num_worker_threads < self._maxsize
and self.task_queue.unfinished_tasks > self._num_worker_threads):
self._add_thread()
# while the number of threads is more than maxsize, kill one
# we do not check what's already in task_queue - it could be all Nones
while self._size - self._maxsize > self.task_queue.unfinished_tasks:
while self._num_worker_threads - self._maxsize > self.task_queue.unfinished_tasks:
self.task_queue.put(None)
if self._size:
if self._num_worker_threads:
self.fork_watcher.start(self._on_fork)
else:
elif self.fork_watcher is not None:
self.fork_watcher.stop()
def _adjust_wait(self):
delay = 0.0001
while True:
self._adjust_step()
if self._size <= self._maxsize:
if self._num_worker_threads <= self._maxsize:
return
sleep(delay)
delay = min(delay * 2, .05)
def adjust(self):
self._adjust_step()
if not self.manager and self._size > self._maxsize:
# might need to feed more Nones into the pool
if not self.manager and self._num_worker_threads > self._maxsize:
# might need to feed more Nones into the pool to shutdown
# threads.
self.manager = Greenlet.spawn(self._adjust_wait)
def _add_thread(self):
with self._lock:
self._size += 1
with self._native_thread_internal_lock:
self._num_worker_threads += 1
try:
start_new_thread(self.__trampoline, ())
start_new_thread(_WorkerGreenlet.run_in_worker_thread, (self,))
except:
with self._lock:
self._size -= 1
with self._native_thread_internal_lock:
self._num_worker_threads -= 1
raise
def spawn(self, func, *args, **kwargs):
"""
Add a new task to the threadpool that will run ``func(*args, **kwargs)``.
Add a new task to the threadpool that will run ``func(*args,
**kwargs)``.
Waits until a slot is available. Creates a new native thread if necessary.
Waits until a slot is available. Creates a new native thread
if necessary.
This must only be called from the native thread that owns this object's
hub.
This must only be called from the native thread that owns this
object's hub. This is because creating the necessary data
structures to communicate back to this thread isn't thread
safe, so the hub must not be running something else. Also,
ensuring the pool size stays correct only works within a
single thread.
:return: A :class:`gevent.event.AsyncResult`.
:raises InvalidThreadUseError: If called from a different thread.
.. versionchanged:: 1.5
Document the thread-safety requirements.
"""
if self.hub != get_hub():
raise InvalidThreadUseError
while 1:
semaphore = self._semaphore
semaphore = self._available_worker_threads_greenlet_sem
semaphore.acquire()
if semaphore is self._semaphore:
if semaphore is self._available_worker_threads_greenlet_sem:
# If we were asked to change size or re-init we could have changed
# semaphore objects.
break
# Returned; lets a greenlet in this thread wait
# for the pool thread. Signaled when the async watcher
# is fired from the pool thread back into this thread.
result = AsyncResult()
task_queue = self.task_queue
# Encapsulates the async watcher the worker thread uses to
# call back into this thread. Immediately allocates and starts the
# async watcher in this thread, because it uses this hub/loop,
# which is not thread safe.
thread_result = None
try:
task_queue = self.task_queue
result = AsyncResult()
# XXX We're calling the semaphore release function in the hub, otherwise
# we get LoopExit (why?). Previously it was done with a rawlink on the
# AsyncResult and the comment that it is "competing for order with get(); this is not
# good, just make ThreadResult release the semaphore before doing anything else"
assert self.hub == get_hub()
thread_result = ThreadResult(result, self.hub, semaphore.release)
task_queue.put((func, args, kwargs, thread_result))
self.adjust()
......@@ -236,33 +341,21 @@ class ThreadPool(GroupMappingMixin):
def _decrease_size(self):
if sys is None:
return
_lock = getattr(self, '_lock', None)
if _lock is not None:
_lock = self._native_thread_internal_lock
if _lock is not None: # XXX: When would this be None?
with _lock:
self._size -= 1
# XXX: This used to be false by default. It really seems like
# it should be true to avoid leaking resources.
_destroy_worker_hub = True
self._num_worker_threads -= 1
def __ignore_current_greenlet_blocking(self, hub):
if hub is not None and hub.periodic_monitoring_thread is not None:
hub.periodic_monitoring_thread.ignore_current_greenlet_blocking()
def __trampoline(self):
# The target that we create new threads with. It exists
# solely to create the _WorkerGreenlet and switch to it.
# (the __class__ of a raw greenlet cannot be changed.)
g = _WorkerGreenlet(self)
g.switch()
def _worker(self):
# pylint:disable=too-many-branches
need_decrease = True
try:
while 1: # tiny bit faster than True on Py2
h = _get_hub()
h = _get_hub() # Don't create one; only set if a worker function did it
if h is not None:
h.name = 'ThreadPool Worker Hub'
task_queue = self.task_queue
......@@ -302,26 +395,12 @@ class ThreadPool(GroupMappingMixin):
finally:
if need_decrease:
self._decrease_size()
if sys is not None and self._destroy_worker_hub:
if sys is not None:
hub = _get_hub()
if hub is not None:
hub.destroy(True)
del hub
def apply_e(self, expected_errors, function, args=None, kwargs=None):
"""
.. deprecated:: 1.1a2
Identical to :meth:`apply`; the ``expected_errors`` argument is ignored.
"""
# pylint:disable=unused-argument
# Deprecated but never documented. In the past, before
# self.apply() allowed all errors to be raised to the caller,
# expected_errors allowed a caller to specify a set of errors
# they wanted to be raised, through the wrap_errors function.
# In practice, it always took the value Exception or
# BaseException.
return self.apply(function, args, kwargs)
def _apply_immediately(self):
# If we're being called from a different thread than the one that
# created us, e.g., because a worker task is trying to use apply()
......@@ -354,6 +433,13 @@ class _FakeAsync(object):
_FakeAsync = _FakeAsync()
class ThreadResult(object):
"""
A one-time event for cross-thread communication.
Uses a hub's "async" watcher capability; it must be constructed and
destroyed in the thread running the hub (because creating, starting, and
destroying async watchers isn't guaranteed to be thread safe).
"""
# Using slots here helps to debug reference cycles/leaks
__slots__ = ('exc_info', 'async_watcher', '_call_when_ready', 'value',
......@@ -430,16 +516,6 @@ class ThreadResult(object):
return self.exception is None
def wrap_errors(errors, function, args, kwargs):
"""
.. deprecated:: 1.1a2
Previously used by ThreadPool.apply_e.
"""
try:
return True, function(*args, **kwargs)
except errors as ex:
return False, ex
try:
import concurrent.futures
except ImportError:
......@@ -560,10 +636,16 @@ else:
This is a provisional API.
"""
def __init__(self, max_workers):
super(ThreadPoolExecutor, self).__init__(max_workers)
self._threadpool = ThreadPool(max_workers)
self._threadpool._destroy_worker_hub = True
def __init__(self, *args, **kwargs):
"""
Takes the same arguments as ``concurrent.futures.ThreadPoolExecuter``, which
vary between Python versions.
The first argument is always *max_workers*, the maximum number of
threads to use. Most other arguments, while accepted, are ignored.
"""
super(ThreadPoolExecutor, self).__init__(*args, **kwargs)
self._threadpool = ThreadPool(self._max_workers)
def submit(self, fn, *args, **kwargs): # pylint:disable=arguments-differ
with self._shutdown_lock: # pylint:disable=not-context-manager
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment