Commit b4c52966 authored by Victor Stinner's avatar Victor Stinner Committed by GitHub

bpo-26762: test_multiprocessing close more queues (#2855)

* Close explicitly queues to make sure that we don't leave dangling
  threads
* test_queue_in_process(): remove unused queue
* test_access() joins also the process to fix a random warning
parent ffb49408
...@@ -283,6 +283,7 @@ class _TestProcess(BaseTestCase): ...@@ -283,6 +283,7 @@ class _TestProcess(BaseTestCase):
self.assertEqual(p.exitcode, 0) self.assertEqual(p.exitcode, 0)
self.assertEqual(p.is_alive(), False) self.assertEqual(p.is_alive(), False)
self.assertNotIn(p, self.active_children()) self.assertNotIn(p, self.active_children())
close_queue(q)
@classmethod @classmethod
def _sleep_some(cls): def _sleep_some(cls):
...@@ -461,6 +462,8 @@ class _TestProcess(BaseTestCase): ...@@ -461,6 +462,8 @@ class _TestProcess(BaseTestCase):
gc.collect() gc.collect()
self.assertIs(wr(), None) self.assertIs(wr(), None)
close_queue(q)
def test_many_processes(self): def test_many_processes(self):
if self.TYPE == 'threads': if self.TYPE == 'threads':
self.skipTest('test not appropriate for {}'.format(self.TYPE)) self.skipTest('test not appropriate for {}'.format(self.TYPE))
...@@ -501,6 +504,7 @@ class _TestProcess(BaseTestCase): ...@@ -501,6 +504,7 @@ class _TestProcess(BaseTestCase):
p.join() p.join()
self.assertIs(wr(), None) self.assertIs(wr(), None)
self.assertEqual(q.get(), 5) self.assertEqual(q.get(), 5)
close_queue(q)
@classmethod @classmethod
def _test_child_fd_inflation(self, evt, q): def _test_child_fd_inflation(self, evt, q):
...@@ -536,6 +540,7 @@ class _TestProcess(BaseTestCase): ...@@ -536,6 +540,7 @@ class _TestProcess(BaseTestCase):
evt.set() evt.set()
for p in procs: for p in procs:
p.join() p.join()
close_queue(q)
# #
# #
...@@ -721,6 +726,7 @@ class _TestQueue(BaseTestCase): ...@@ -721,6 +726,7 @@ class _TestQueue(BaseTestCase):
self.assertEqual(queue_full(queue, MAXSIZE), False) self.assertEqual(queue_full(queue, MAXSIZE), False)
proc.join() proc.join()
close_queue(queue)
@classmethod @classmethod
def _test_get(cls, queue, child_can_start, parent_can_continue): def _test_get(cls, queue, child_can_start, parent_can_continue):
...@@ -783,6 +789,7 @@ class _TestQueue(BaseTestCase): ...@@ -783,6 +789,7 @@ class _TestQueue(BaseTestCase):
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
proc.join() proc.join()
close_queue(queue)
@classmethod @classmethod
def _test_fork(cls, queue): def _test_fork(cls, queue):
...@@ -818,6 +825,7 @@ class _TestQueue(BaseTestCase): ...@@ -818,6 +825,7 @@ class _TestQueue(BaseTestCase):
self.assertRaises(pyqueue.Empty, queue.get, False) self.assertRaises(pyqueue.Empty, queue.get, False)
p.join() p.join()
close_queue(queue)
def test_qsize(self): def test_qsize(self):
q = self.Queue() q = self.Queue()
...@@ -861,6 +869,7 @@ class _TestQueue(BaseTestCase): ...@@ -861,6 +869,7 @@ class _TestQueue(BaseTestCase):
for p in workers: for p in workers:
p.join() p.join()
close_queue(queue)
def test_no_import_lock_contention(self): def test_no_import_lock_contention(self):
with test.support.temp_cwd(): with test.support.temp_cwd():
...@@ -891,6 +900,7 @@ class _TestQueue(BaseTestCase): ...@@ -891,6 +900,7 @@ class _TestQueue(BaseTestCase):
# Tolerate a delta of 30 ms because of the bad clock resolution on # Tolerate a delta of 30 ms because of the bad clock resolution on
# Windows (usually 15.6 ms) # Windows (usually 15.6 ms)
self.assertGreaterEqual(delta, 0.170) self.assertGreaterEqual(delta, 0.170)
close_queue(q)
def test_queue_feeder_donot_stop_onexc(self): def test_queue_feeder_donot_stop_onexc(self):
# bpo-30414: verify feeder handles exceptions correctly # bpo-30414: verify feeder handles exceptions correctly
...@@ -1503,6 +1513,7 @@ class _TestBarrier(BaseTestCase): ...@@ -1503,6 +1513,7 @@ class _TestBarrier(BaseTestCase):
self.run_threads(self._test_wait_return_f, (self.barrier, queue)) self.run_threads(self._test_wait_return_f, (self.barrier, queue))
results = [queue.get() for i in range(self.N)] results = [queue.get() for i in range(self.N)]
self.assertEqual(results.count(0), 1) self.assertEqual(results.count(0), 1)
close_queue(queue)
@classmethod @classmethod
def _test_action_f(cls, barrier, results): def _test_action_f(cls, barrier, results):
...@@ -3158,6 +3169,8 @@ class _TestPicklingConnections(BaseTestCase): ...@@ -3158,6 +3169,8 @@ class _TestPicklingConnections(BaseTestCase):
w.close() w.close()
self.assertEqual(conn.recv(), 'foobar'*2) self.assertEqual(conn.recv(), 'foobar'*2)
p.join()
# #
# #
# #
...@@ -3654,7 +3667,7 @@ def _this_sub_process(q): ...@@ -3654,7 +3667,7 @@ def _this_sub_process(q):
except pyqueue.Empty: except pyqueue.Empty:
pass pass
def _test_process(q): def _test_process():
queue = multiprocessing.Queue() queue = multiprocessing.Queue()
subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
subProc.daemon = True subProc.daemon = True
...@@ -3694,8 +3707,7 @@ class _file_like(object): ...@@ -3694,8 +3707,7 @@ class _file_like(object):
class TestStdinBadfiledescriptor(unittest.TestCase): class TestStdinBadfiledescriptor(unittest.TestCase):
def test_queue_in_process(self): def test_queue_in_process(self):
queue = multiprocessing.Queue() proc = multiprocessing.Process(target=_test_process)
proc = multiprocessing.Process(target=_test_process, args=(queue,))
proc.start() proc.start()
proc.join() proc.join()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment