Commit 039af1ef authored by Jason Madden's avatar Jason Madden

Consistently raise exception when using a threadpool from the wrong thread.

Still needs better docs.
parent 6bed079a
......@@ -51,6 +51,16 @@
- Fix some potential crashes using libuv async watchers.
- Make ``ThreadPool`` consistently raise ``InvalidThreadUseError``
when ``spawn`` is called from a thread different than the thread
that created the threadpool. This has never been allowed, but was
inconsistently enforced. On gevent 1.3 and before, this would always
raise "greenlet error: invalid thread switch," or ``LoopExit``. On
gevent 1.4, it *could* raise ``LoopExit``, depending on the number
of tasks, but still, calling it from a different thread was likely
to corrupt libev or libuv internals.
1.5a2 (2019-10-21)
==================
......
......@@ -23,7 +23,7 @@
the `func` synchronously.
.. note:: As implemented, attempting to use
:meth:`Threadpool.appy` from inside another function that
:meth:`Threadpool.apply` from inside another function that
was itself spawned in a threadpool (any threadpool) will
cause the function to be run immediately.
......
......@@ -38,7 +38,7 @@ class AbstractLinkable(object):
'__weakref__'
)
def __init__(self):
def __init__(self, hub=None):
# Before this implementation, AsyncResult and Semaphore
# maintained the order of notifications, but Event did not.
......@@ -71,7 +71,7 @@ class AbstractLinkable(object):
self._notify_all = True
# we don't want to do get_hub() here to allow defining module-level objects
# without initializing the hub
self.hub = None
self.hub = hub
def linkcount(self):
# For testing: how many objects are linked to this one?
......
......@@ -44,10 +44,10 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
unlinks waiters before calling them.
"""
def __init__(self, value=1):
def __init__(self, value=1, hub=None):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
super(Semaphore, self).__init__()
super(Semaphore, self).__init__(hub)
self.counter = value
self._notify_all = False
......
......@@ -77,3 +77,26 @@ class ConcurrentObjectUseError(AssertionError):
.. seealso:: `gevent.socket.wait`
"""
class InvalidThreadUseError(RuntimeError):
"""
Raised when an object is used from a different thread than
the one it is bound to.
Some objects, such as gevent sockets, semaphores, and threadpools,
are tightly bound to their hub and its loop. The hub and loop
are not thread safe, with a few exceptions. Attempting to use
such objects from a different thread is an error, and may cause
problems ranging from incorrect results to memory corruption
and a crashed process.
In some cases, gevent catches this "accidentally", and the result is
a `LoopExit`. In some cases, gevent doesn't catch this at all.
In other cases (typically when the consequences are suspected to
be more on the more severe end of the scale, and when the operation in
question is already relatively heavyweight), gevent explicitly checks
for this usage and will raise this exception when it is detected.
.. versionadded:: 1.5a3
"""
......@@ -40,6 +40,8 @@ class GroupMappingMixin(object):
# Internal, non-public API class.
# Provides mixin methods for implementing mapping pools. Subclasses must define:
__slots__ = ()
def spawn(self, func, *args, **kwargs):
"""
A function that runs *func* with *args* and *kwargs*, potentially
......
......@@ -6,11 +6,13 @@ import random
import weakref
import gc
import gevent.testing as greentest
import gevent.threadpool
from gevent.threadpool import ThreadPool
import gevent
from gevent.exceptions import InvalidThreadUseError
import gevent.testing as greentest
from gevent.testing import ExpectedException
from gevent.testing import PYPY
......@@ -34,34 +36,36 @@ class TestCase(greentest.TestCase):
# These generally need more time
__timeout__ = greentest.LARGE_TIMEOUT
pool = None
_all_pools = ()
ClassUnderTest = ThreadPool
def _FUT(self):
return self.ClassUnderTest
def _makeOne(self, size, increase=greentest.RUN_LEAKCHECKS):
self.pool = pool = self._FUT()(size)
if increase:
def _makeOne(self, maxsize, create_all_worker_threads=greentest.RUN_LEAKCHECKS):
self.pool = pool = self._FUT()(maxsize)
self._all_pools += (pool,)
if create_all_worker_threads:
# Max size to help eliminate false positives
self.pool.size = size
self.pool.size = maxsize
return pool
def cleanup(self):
pool = self.pool
if pool is not None:
self.pool = None
all_pools, self._all_pools = self._all_pools, ()
for pool in all_pools:
kill = getattr(pool, 'kill', None) or getattr(pool, 'shutdown')
kill()
del kill
del self.pool
if greentest.RUN_LEAKCHECKS:
# Each worker thread created a greenlet object and switched to it.
# It's a custom subclass, but even if it's not, it appears that
# the root greenlet for the new thread sticks around until there's a
# gc. Simply calling 'getcurrent()' is enough to "leak" a greenlet.greenlet
# and a weakref.
for _ in range(3):
gc.collect()
if greentest.RUN_LEAKCHECKS:
# Each worker thread created a greenlet object and switched to it.
# It's a custom subclass, but even if it's not, it appears that
# the root greenlet for the new thread sticks around until there's a
# gc. Simply calling 'getcurrent()' is enough to "leak" a greenlet.greenlet
# and a weakref.
for _ in range(3):
gc.collect()
class PoolBasicTests(TestCase):
......@@ -353,7 +357,7 @@ class TestSpawn(TestCase):
switch_expected = True
@greentest.ignores_leakcheck
def test(self):
def test_basics(self):
pool = self._makeOne(1)
self.assertEqual(len(pool), 0)
log = []
......@@ -371,6 +375,20 @@ class TestSpawn(TestCase):
self.assertEqual(log, ['a', 'b'])
self.assertEqual(len(pool), 0)
@greentest.ignores_leakcheck
def test_cannot_spawn_from_other_thread(self):
# Only the thread that owns a threadpool can spawn to it;
# this is because the threadpool uses the creating thread's hub,
# which is not threadsafe.
pool1 = self._makeOne(1)
pool2 = self._makeOne(2)
def func():
pool2.spawn(lambda: "Hi")
res = pool1.spawn(func)
with self.assertRaises(InvalidThreadUseError):
res.get()
def error_iter():
yield 1
......@@ -430,7 +448,7 @@ class TestSize(TestCase):
@greentest.reraises_flaky_race_condition()
def test(self):
pool = self.pool = self._makeOne(2, increase=False)
pool = self.pool = self._makeOne(2, create_all_worker_threads=False)
self.assertEqual(pool.size, 0)
pool.size = 1
self.assertEqual(pool.size, 1)
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment