Commit 588b34f0 authored by Jason Madden's avatar Jason Madden

Simplify threadpool worker tracking.

Keep the actual greenlet objects, not just a counter. This way we can clean up after them better.
parent c4b3da12
......@@ -146,3 +146,15 @@ class Queue(object):
self._not_empty.wait()
item = self._queue.popleft()
return item
def kill(self):
"""
Call to destroy this object.
Use this when it's not possible to safely drain the queue, e.g.,
after a fork when the locks are in an uncertain state.
"""
self._queue = None
self._mutex = None
self._not_empty = None
self.unfinished_tasks = None
......@@ -94,4 +94,7 @@ except SkipTest as e:
print("Ran 0 tests in 0.0s")
print('OK (skipped=0)')
finally:
os.remove(temp_path)
try:
os.remove(temp_path)
except OSError:
pass
......@@ -33,9 +33,9 @@ except (ImportError, OSError, IOError):
pass
TIMEOUT = 100
NWORKERS = int(os.environ.get('NWORKERS') or max(cpu_count() - 1, 4))
if NWORKERS > 10:
NWORKERS = 10
DEFAULT_NWORKERS = int(os.environ.get('NWORKERS') or max(cpu_count() - 1, 4))
if DEFAULT_NWORKERS > 10:
DEFAULT_NWORKERS = 10
if RUN_LEAKCHECKS:
# Capturing the stats takes time, and we run each
......@@ -49,9 +49,7 @@ DEFAULT_RUN_OPTIONS = {
if RUNNING_ON_CI:
# Too many and we get spurious timeouts
NWORKERS = 4
DEFAULT_NWORKERS = 4
def _package_relative_filename(filename, package):
......@@ -96,7 +94,8 @@ class Runner(object):
configured_failing_tests=(),
failfast=False,
quiet=False,
configured_run_alone_tests=()):
configured_run_alone_tests=(),
worker_count=DEFAULT_NWORKERS):
"""
:keyword quiet: Set to True or False to explicitly choose. Set to
`None` to use the default, which may come from the environment variable
......@@ -112,7 +111,7 @@ class Runner(object):
self.results.total = len(self._tests)
self._running_jobs = []
self._worker_count = min(len(tests), NWORKERS) or 1
self._worker_count = min(len(tests), worker_count) or 1
def _run_one(self, cmd, **kwargs):
if self._quiet is not None:
......@@ -516,6 +515,11 @@ def main():
parser.add_argument("--verbose", action="store_false", dest='quiet')
parser.add_argument("--debug", action="store_true", default=False)
parser.add_argument("--package", default="gevent.tests")
parser.add_argument(
"--processes", "-j", default=DEFAULT_NWORKERS, type=int,
help="Use up to the given number of parallel processes to execute tests. "
"Defaults to %(default)s."
)
parser.add_argument('-u', '--use', metavar='RES1,RES2,...',
action='store', type=parse_resources,
help='specify which special resource intensive tests '
......@@ -614,6 +618,7 @@ def main():
failfast=options.failfast,
quiet=options.quiet,
configured_run_alone_tests=RUN_ALONE,
worker_count=options.processes,
)
if options.travis_fold:
......
from __future__ import print_function
import gevent.monkey
gevent.monkey.patch_all()
from gevent import monkey
monkey.patch_all()
import os
import unittest
import multiprocessing
import gevent
......@@ -20,29 +21,42 @@ fork_watcher = hub.loop.fork(ref=False)
fork_watcher.start(on_fork)
def run(q):
def in_child(q):
# libev only calls fork callbacks at the beginning of
# the loop; we use callbacks extensively so it takes *two*
# calls to sleep (with a timer) to actually get wrapped
# around to the beginning of the loop.
gevent.sleep(0.01)
gevent.sleep(0.01)
gevent.sleep(0.001)
gevent.sleep(0.001)
q.put(newpid)
def test():
# Use a thread to make us multi-threaded
hub.threadpool.apply(lambda: None)
# If the Queue is global, q.get() hangs on Windows; must pass as
# an argument.
q = multiprocessing.Queue()
p = multiprocessing.Process(target=run, args=(q,))
p.start()
p.join()
p_val = q.get()
class Test(unittest.TestCase):
assert p_val is not None, "The fork watcher didn't run"
assert p_val != pid
def test(self):
self.assertEqual(hub.threadpool.size, 0)
# Use a thread to make us multi-threaded
hub.threadpool.apply(lambda: None)
self.assertEqual(hub.threadpool.size, 1)
# If the Queue is global, q.get() hangs on Windows; must pass as
# an argument.
q = multiprocessing.Queue()
p = multiprocessing.Process(target=in_child, args=(q,))
p.start()
p.join()
p_val = q.get()
self.assertIsNone(
newpid,
"The fork watcher ran in the parent for some reason."
)
self.assertIsNotNone(
p_val,
"The child process returned nothing, meaning the fork watcher didn't run in the child."
)
self.assertNotEqual(p_val, pid)
assert p_val != pid
if __name__ == '__main__':
# Must call for Windows to fork properly; the fork can't be in the top-level
......@@ -57,4 +71,4 @@ if __name__ == '__main__':
# to create a whole new process that has no relation to the current process;
# that process then calls multiprocessing.forking.main() to do its work.
# Since no state is shared, a fork watcher cannot exist in that process.
test()
unittest.main()
......@@ -19,57 +19,68 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Test that modules in gevent.green package are indeed green.
To do that spawn a green server and then access it using a green socket.
If either operation blocked the whole script would block and timeout.
"""
Trivial test that a single process (and single thread) can both read
and write from green sockets (when monkey patched).
"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from gevent import monkey
monkey.patch_all()
import gevent.testing as greentest
try:
from urllib import request as urllib2
from http import server as BaseHTTPServer
from http.server import HTTPServer
from http.server import SimpleHTTPRequestHandler
except ImportError:
# Python 2
import urllib2
import BaseHTTPServer
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
import gevent
from gevent.testing import params
class QuietHandler(SimpleHTTPRequestHandler, object):
class TestGreenness(greentest.TestCase):
check_totalrefcount = False
def log_message(self, *args): # pylint:disable=arguments-differ
self.server.messages += ((args,),)
def setUp(self):
server_address = params.DEFAULT_BIND_ADDR_TUPLE
BaseHTTPServer.BaseHTTPRequestHandler.protocol_version = "HTTP/1.0"
self.httpd = BaseHTTPServer.HTTPServer(server_address,
SimpleHTTPRequestHandler)
self.httpd.request_count = 0
class Server(HTTPServer, object):
def tearDown(self):
self.httpd.server_close()
self.httpd = None
messages = ()
requests_handled = 0
def serve(self):
self.httpd.handle_request()
self.httpd.request_count += 1
def __init__(self):
HTTPServer.__init__(self,
params.DEFAULT_BIND_ADDR_TUPLE,
QuietHandler)
def handle_request(self):
HTTPServer.handle_request(self)
self.requests_handled += 1
class TestGreenness(greentest.TestCase):
check_totalrefcount = False
def test_urllib2(self):
server = gevent.spawn(self.serve)
httpd = Server()
server_greenlet = gevent.spawn(httpd.handle_request)
port = self.httpd.socket.getsockname()[1]
port = httpd.socket.getsockname()[1]
rsp = urllib2.urlopen('http://127.0.0.1:%s' % port)
rsp.read()
rsp.close()
server.join()
self.assertEqual(self.httpd.request_count, 1)
server_greenlet.join()
self.assertEqual(httpd.requests_handled, 1)
httpd.server_close()
if __name__ == '__main__':
......
This diff is collapsed.
......@@ -154,6 +154,10 @@ def format_run_info(thread_stacks=True,
return lines
def is_idle_threadpool_worker(frame):
return frame.f_locals and frame.f_locals.get('gevent_threadpool_worker_idle')
def _format_thread_info(lines, thread_stacks, limit, current_thread_ident):
import threading
......@@ -172,7 +176,7 @@ def _format_thread_info(lines, thread_stacks, limit, current_thread_ident):
if not thread:
# Is it an idle threadpool thread? thread pool threads
# don't have a Thread object, they're low-level
if frame.f_locals and frame.f_locals.get('gevent_threadpool_worker_idle'):
if is_idle_threadpool_worker(frame):
name = 'idle threadpool worker'
do_stacks = False
else:
......@@ -633,3 +637,15 @@ class assert_switches(object):
message += '\n'
message += '\n'.join(report_lines)
raise _FailedToSwitch(message)
def clear_stack_frames(frame):
"""Do our best to clear local variables in all frames in a stack."""
# On Python 3, frames have a .clear() method that can raise a RuntimeError.
while frame is not None:
try:
frame.clear()
except (RuntimeError, AttributeError):
pass
frame.f_locals.clear()
frame = frame.f_back
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment