Commit bace3b9a authored by Antoine Pitrou's avatar Antoine Pitrou

Add tests for the atexit hook in concurrent.futures (part of #11635)

parents ea8b024d aebac0b5
......@@ -9,6 +9,9 @@ test.support.import_module('multiprocessing.synchronize')
# without thread support.
test.support.import_module('threading')
from test.script_helper import assert_python_ok
import sys
import threading
import time
import unittest
......@@ -43,9 +46,30 @@ def sleep_and_raise(t):
time.sleep(t)
raise Exception('this is an exception')
def sleep_and_print(t, msg):
time.sleep(t)
print(msg)
sys.stdout.flush()
class ExecutorMixin:
worker_count = 5
def setUp(self):
self.t1 = time.time()
try:
self.executor = self.executor_type(max_workers=self.worker_count)
except NotImplementedError as e:
self.skipTest(str(e))
self._prime_executor()
def tearDown(self):
self.executor.shutdown(wait=True)
dt = time.time() - self.t1
if test.support.verbose:
print("%.2fs" % dt, end=' ')
self.assertLess(dt, 60, "synchronization issue: test lasted too long")
def _prime_executor(self):
# Make sure that the executor is ready to do work before running the
# tests. This should reduce the probability of timeouts in the tests.
......@@ -57,24 +81,11 @@ class ExecutorMixin:
class ThreadPoolMixin(ExecutorMixin):
def setUp(self):
self.executor = futures.ThreadPoolExecutor(max_workers=5)
self._prime_executor()
def tearDown(self):
self.executor.shutdown(wait=True)
executor_type = futures.ThreadPoolExecutor
class ProcessPoolMixin(ExecutorMixin):
def setUp(self):
try:
self.executor = futures.ProcessPoolExecutor(max_workers=5)
except NotImplementedError as e:
self.skipTest(str(e))
self._prime_executor()
def tearDown(self):
self.executor.shutdown(wait=True)
executor_type = futures.ProcessPoolExecutor
class ExecutorShutdownTest(unittest.TestCase):
......@@ -84,6 +95,20 @@ class ExecutorShutdownTest(unittest.TestCase):
self.executor.submit,
pow, 2, 5)
def test_interpreter_shutdown(self):
# Test the atexit hook for shutdown of worker threads and processes
rc, out, err = assert_python_ok('-c', """if 1:
from concurrent.futures import {executor_type}
from time import sleep
from test.test_concurrent_futures import sleep_and_print
t = {executor_type}(5)
t.submit(sleep_and_print, 1.0, "apple")
""".format(executor_type=self.executor_type.__name__))
# Errors in atexit hooks don't change the process exit code, check
# stderr manually.
self.assertFalse(err)
self.assertEqual(out.strip(), b"apple")
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
def _prime_executor(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment