Commit 13e96cc5 authored by Antoine Pitrou's avatar Antoine Pitrou Committed by GitHub

Fix bpo-30596: Add close() method to multiprocessing.Process (#2010)

* Fix bpo-30596: Add close() method to multiprocessing.Process

* Raise ValueError if close() is called before the Process is finished running

* Add docs

* Add NEWS blurb
parent 0ee32c14
...@@ -598,6 +598,16 @@ The :mod:`multiprocessing` package mostly replicates the API of the ...@@ -598,6 +598,16 @@ The :mod:`multiprocessing` package mostly replicates the API of the
acquired a lock or semaphore etc. then terminating it is liable to acquired a lock or semaphore etc. then terminating it is liable to
cause other processes to deadlock. cause other processes to deadlock.
.. method:: close()
Close the :class:`Process` object, releasing all resources associated
with it. :exc:`ValueError` is raised if the underlying process
is still running. Once :meth:`close` returns successfully, most
other methods and attributes of the :class:`Process` object will
raise :exc:`ValueError`.
.. versionadded:: 3.7
Note that the :meth:`start`, :meth:`join`, :meth:`is_alive`, Note that the :meth:`start`, :meth:`join`, :meth:`is_alive`,
:meth:`terminate` and :attr:`exitcode` methods should only be called by :meth:`terminate` and :attr:`exitcode` methods should only be called by
the process that created the process object. the process that created the process object.
......
...@@ -210,8 +210,12 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): ...@@ -210,8 +210,12 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
else: else:
assert os.WIFEXITED(sts) assert os.WIFEXITED(sts)
returncode = os.WEXITSTATUS(sts) returncode = os.WEXITSTATUS(sts)
# Write the exit code to the pipe # Send exit code to client process
try:
write_signed(child_w, returncode) write_signed(child_w, returncode)
except BrokenPipeError:
# client vanished
pass
os.close(child_w) os.close(child_w)
else: else:
# This shouldn't happen really # This shouldn't happen really
...@@ -241,8 +245,12 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None): ...@@ -241,8 +245,12 @@ def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
finally: finally:
os._exit(code) os._exit(code)
else: else:
# Send pid to client processes # Send pid to client process
try:
write_signed(child_w, pid) write_signed(child_w, pid)
except BrokenPipeError:
# client vanished
pass
pid_to_fd[pid] = child_w pid_to_fd[pid] = child_w
os.close(child_r) os.close(child_r)
for fd in fds: for fd in fds:
......
...@@ -17,6 +17,7 @@ class Popen(object): ...@@ -17,6 +17,7 @@ class Popen(object):
sys.stdout.flush() sys.stdout.flush()
sys.stderr.flush() sys.stderr.flush()
self.returncode = None self.returncode = None
self.finalizer = None
self._launch(process_obj) self._launch(process_obj)
def duplicate_for_child(self, fd): def duplicate_for_child(self, fd):
...@@ -70,5 +71,9 @@ class Popen(object): ...@@ -70,5 +71,9 @@ class Popen(object):
os._exit(code) os._exit(code)
else: else:
os.close(child_w) os.close(child_w)
util.Finalize(self, os.close, (parent_r,)) self.finalizer = util.Finalize(self, os.close, (parent_r,))
self.sentinel = parent_r self.sentinel = parent_r
def close(self):
if self.finalizer is not None:
self.finalizer()
...@@ -49,7 +49,7 @@ class Popen(popen_fork.Popen): ...@@ -49,7 +49,7 @@ class Popen(popen_fork.Popen):
set_spawning_popen(None) set_spawning_popen(None)
self.sentinel, w = forkserver.connect_to_new_process(self._fds) self.sentinel, w = forkserver.connect_to_new_process(self._fds)
util.Finalize(self, os.close, (self.sentinel,)) self.finalizer = util.Finalize(self, os.close, (self.sentinel,))
with open(w, 'wb', closefd=True) as f: with open(w, 'wb', closefd=True) as f:
f.write(buf.getbuffer()) f.write(buf.getbuffer())
self.pid = forkserver.read_signed(self.sentinel) self.pid = forkserver.read_signed(self.sentinel)
......
...@@ -62,7 +62,7 @@ class Popen(popen_fork.Popen): ...@@ -62,7 +62,7 @@ class Popen(popen_fork.Popen):
f.write(fp.getbuffer()) f.write(fp.getbuffer())
finally: finally:
if parent_r is not None: if parent_r is not None:
util.Finalize(self, os.close, (parent_r,)) self.finalizer = util.Finalize(self, os.close, (parent_r,))
for fd in (child_r, child_w, parent_w): for fd in (child_r, child_w, parent_w):
if fd is not None: if fd is not None:
os.close(fd) os.close(fd)
...@@ -56,7 +56,7 @@ class Popen(object): ...@@ -56,7 +56,7 @@ class Popen(object):
self.returncode = None self.returncode = None
self._handle = hp self._handle = hp
self.sentinel = int(hp) self.sentinel = int(hp)
util.Finalize(self, _winapi.CloseHandle, (self.sentinel,)) self.finalizer = util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
# send information to child # send information to child
set_spawning_popen(self) set_spawning_popen(self)
...@@ -96,3 +96,6 @@ class Popen(object): ...@@ -96,3 +96,6 @@ class Popen(object):
except OSError: except OSError:
if self.wait(timeout=1.0) is None: if self.wait(timeout=1.0) is None:
raise raise
def close(self):
self.finalizer()
...@@ -76,6 +76,7 @@ class BaseProcess(object): ...@@ -76,6 +76,7 @@ class BaseProcess(object):
self._config = _current_process._config.copy() self._config = _current_process._config.copy()
self._parent_pid = os.getpid() self._parent_pid = os.getpid()
self._popen = None self._popen = None
self._closed = False
self._target = target self._target = target
self._args = tuple(args) self._args = tuple(args)
self._kwargs = dict(kwargs) self._kwargs = dict(kwargs)
...@@ -85,6 +86,10 @@ class BaseProcess(object): ...@@ -85,6 +86,10 @@ class BaseProcess(object):
self.daemon = daemon self.daemon = daemon
_dangling.add(self) _dangling.add(self)
def _check_closed(self):
if self._closed:
raise ValueError("process object is closed")
def run(self): def run(self):
''' '''
Method to be run in sub-process; can be overridden in sub-class Method to be run in sub-process; can be overridden in sub-class
...@@ -96,6 +101,7 @@ class BaseProcess(object): ...@@ -96,6 +101,7 @@ class BaseProcess(object):
''' '''
Start child process Start child process
''' '''
self._check_closed()
assert self._popen is None, 'cannot start a process twice' assert self._popen is None, 'cannot start a process twice'
assert self._parent_pid == os.getpid(), \ assert self._parent_pid == os.getpid(), \
'can only start a process object created by current process' 'can only start a process object created by current process'
...@@ -110,12 +116,14 @@ class BaseProcess(object): ...@@ -110,12 +116,14 @@ class BaseProcess(object):
''' '''
Terminate process; sends SIGTERM signal or uses TerminateProcess() Terminate process; sends SIGTERM signal or uses TerminateProcess()
''' '''
self._check_closed()
self._popen.terminate() self._popen.terminate()
def join(self, timeout=None): def join(self, timeout=None):
''' '''
Wait until child process terminates Wait until child process terminates
''' '''
self._check_closed()
assert self._parent_pid == os.getpid(), 'can only join a child process' assert self._parent_pid == os.getpid(), 'can only join a child process'
assert self._popen is not None, 'can only join a started process' assert self._popen is not None, 'can only join a started process'
res = self._popen.wait(timeout) res = self._popen.wait(timeout)
...@@ -126,6 +134,7 @@ class BaseProcess(object): ...@@ -126,6 +134,7 @@ class BaseProcess(object):
''' '''
Return whether process is alive Return whether process is alive
''' '''
self._check_closed()
if self is _current_process: if self is _current_process:
return True return True
assert self._parent_pid == os.getpid(), 'can only test a child process' assert self._parent_pid == os.getpid(), 'can only test a child process'
...@@ -134,6 +143,23 @@ class BaseProcess(object): ...@@ -134,6 +143,23 @@ class BaseProcess(object):
self._popen.poll() self._popen.poll()
return self._popen.returncode is None return self._popen.returncode is None
def close(self):
'''
Close the Process object.
This method releases resources held by the Process object. It is
an error to call this method if the child process is still running.
'''
if self._popen is not None:
if self._popen.poll() is None:
raise ValueError("Cannot close a process while it is still running. "
"You should first call join() or terminate().")
self._popen.close()
self._popen = None
del self._sentinel
_children.discard(self)
self._closed = True
@property @property
def name(self): def name(self):
return self._name return self._name
...@@ -174,6 +200,7 @@ class BaseProcess(object): ...@@ -174,6 +200,7 @@ class BaseProcess(object):
''' '''
Return exit code of process or `None` if it has yet to stop Return exit code of process or `None` if it has yet to stop
''' '''
self._check_closed()
if self._popen is None: if self._popen is None:
return self._popen return self._popen
return self._popen.poll() return self._popen.poll()
...@@ -183,6 +210,7 @@ class BaseProcess(object): ...@@ -183,6 +210,7 @@ class BaseProcess(object):
''' '''
Return identifier (PID) of process or `None` if it has yet to start Return identifier (PID) of process or `None` if it has yet to start
''' '''
self._check_closed()
if self is _current_process: if self is _current_process:
return os.getpid() return os.getpid()
else: else:
...@@ -196,6 +224,7 @@ class BaseProcess(object): ...@@ -196,6 +224,7 @@ class BaseProcess(object):
Return a file descriptor (Unix) or handle (Windows) suitable for Return a file descriptor (Unix) or handle (Windows) suitable for
waiting for process termination. waiting for process termination.
''' '''
self._check_closed()
try: try:
return self._sentinel return self._sentinel
except AttributeError: except AttributeError:
...@@ -204,6 +233,8 @@ class BaseProcess(object): ...@@ -204,6 +233,8 @@ class BaseProcess(object):
def __repr__(self): def __repr__(self):
if self is _current_process: if self is _current_process:
status = 'started' status = 'started'
elif self._closed:
status = 'closed'
elif self._parent_pid != os.getpid(): elif self._parent_pid != os.getpid():
status = 'unknown' status = 'unknown'
elif self._popen is None: elif self._popen is None:
...@@ -295,6 +326,7 @@ class _MainProcess(BaseProcess): ...@@ -295,6 +326,7 @@ class _MainProcess(BaseProcess):
self._name = 'MainProcess' self._name = 'MainProcess'
self._parent_pid = None self._parent_pid = None
self._popen = None self._popen = None
self._closed = False
self._config = {'authkey': AuthenticationString(os.urandom(32)), self._config = {'authkey': AuthenticationString(os.urandom(32)),
'semprefix': '/mp'} 'semprefix': '/mp'}
# Note that some versions of FreeBSD only allow named # Note that some versions of FreeBSD only allow named
...@@ -307,6 +339,9 @@ class _MainProcess(BaseProcess): ...@@ -307,6 +339,9 @@ class _MainProcess(BaseProcess):
# Everything in self._config will be inherited by descendant # Everything in self._config will be inherited by descendant
# processes. # processes.
def close(self):
pass
_current_process = _MainProcess() _current_process = _MainProcess()
_process_counter = itertools.count(1) _process_counter = itertools.count(1)
......
...@@ -403,6 +403,42 @@ class _TestProcess(BaseTestCase): ...@@ -403,6 +403,42 @@ class _TestProcess(BaseTestCase):
p.join() p.join()
self.assertTrue(wait_for_handle(sentinel, timeout=1)) self.assertTrue(wait_for_handle(sentinel, timeout=1))
@classmethod
def _test_close(cls, rc=0, q=None):
if q is not None:
q.get()
sys.exit(rc)
def test_close(self):
if self.TYPE == "threads":
self.skipTest('test not appropriate for {}'.format(self.TYPE))
q = self.Queue()
p = self.Process(target=self._test_close, kwargs={'q': q})
p.daemon = True
p.start()
self.assertEqual(p.is_alive(), True)
# Child is still alive, cannot close
with self.assertRaises(ValueError):
p.close()
q.put(None)
p.join()
self.assertEqual(p.is_alive(), False)
self.assertEqual(p.exitcode, 0)
p.close()
with self.assertRaises(ValueError):
p.is_alive()
with self.assertRaises(ValueError):
p.join()
with self.assertRaises(ValueError):
p.terminate()
p.close()
wr = weakref.ref(p)
del p
gc.collect()
self.assertIs(wr(), None)
def test_many_processes(self): def test_many_processes(self):
if self.TYPE == 'threads': if self.TYPE == 'threads':
self.skipTest('test not appropriate for {}'.format(self.TYPE)) self.skipTest('test not appropriate for {}'.format(self.TYPE))
......
Add a ``close()`` method to ``multiprocessing.Process``.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment