Commit 7d2d43c0 authored by Richard Oudkerk's avatar Richard Oudkerk

Stop making fork server have copy of semaphore_tracker_fd.

parent 0718f701
......@@ -10,6 +10,7 @@ import threading
from . import connection
from . import process
from . import reduction
from . import semaphore_tracker
from . import spawn
from . import util
......@@ -55,13 +56,14 @@ def connect_to_new_process(fds):
The calling process should write to data_w the pickled preparation and
process data.
'''
if len(fds) + 3 >= MAXFDS_TO_SEND:
if len(fds) + 4 >= MAXFDS_TO_SEND:
raise ValueError('too many fds')
with socket.socket(socket.AF_UNIX) as client:
client.connect(_forkserver_address)
parent_r, child_w = util.pipe()
child_r, parent_w = util.pipe()
allfds = [child_r, child_w, _forkserver_alive_fd]
allfds = [child_r, child_w, _forkserver_alive_fd,
semaphore_tracker._semaphore_tracker_fd]
allfds += fds
try:
reduction.sendfds(client, allfds)
......@@ -88,8 +90,6 @@ def ensure_running():
return
assert all(type(mod) is str for mod in _preload_modules)
config = process.current_process()._config
semaphore_tracker_fd = config['semaphore_tracker_fd']
cmd = ('from multiprocessing.forkserver import main; ' +
'main(%d, %d, %r, **%r)')
......@@ -110,7 +110,7 @@ def ensure_running():
# when they all terminate the read end becomes ready.
alive_r, alive_w = util.pipe()
try:
fds_to_pass = [listener.fileno(), alive_r, semaphore_tracker_fd]
fds_to_pass = [listener.fileno(), alive_r]
cmd %= (listener.fileno(), alive_r, _preload_modules, data)
exe = spawn.get_executable()
args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd]
......@@ -197,7 +197,8 @@ def _serve_one(s, listener, alive_r, handler):
fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
s.close()
assert len(fds) <= MAXFDS_TO_SEND
child_r, child_w, _forkserver_alive_fd, *_inherited_fds = fds
child_r, child_w, _forkserver_alive_fd, stfd, *_inherited_fds = fds
semaphore_tracker._semaphore_tracker_fd = stfd
# send pid to client processes
write_unsigned(child_w, os.getpid())
......
......@@ -40,7 +40,8 @@ class Popen(popen_fork.Popen):
return fd
def _launch(self, process_obj):
tracker_fd = current_process()._config['semaphore_tracker_fd']
from . import semaphore_tracker
tracker_fd = semaphore_tracker._semaphore_tracker_fd
self._fds.append(tracker_fd)
prep_data = spawn.get_preparation_data(process_obj._name)
fp = io.BytesIO()
......@@ -55,7 +56,8 @@ class Popen(popen_fork.Popen):
try:
parent_r, child_w = util.pipe()
child_r, parent_w = util.pipe()
cmd = spawn.get_command_line() + [str(child_r)]
cmd = spawn.get_command_line(tracker_fd=tracker_fd,
pipe_handle=child_r)
self._fds.extend([child_r, child_w])
self.pid = util.spawnv_passfds(spawn.get_executable(),
cmd, self._fds)
......
......@@ -32,13 +32,14 @@ class Popen(object):
def __init__(self, process_obj):
prep_data = spawn.get_preparation_data(process_obj._name)
cmd = ' '.join('"%s"' % x for x in spawn.get_command_line())
# read end of pipe will be "stolen" by the child process
# -- see spawn_main() in spawn.py.
rhandle, whandle = _winapi.CreatePipe(None, 0)
wfd = msvcrt.open_osfhandle(whandle, 0)
cmd += ' {} {}'.format(os.getpid(), rhandle)
cmd = spawn.get_command_line(parent_pid=os.getpid(),
pipe_handle=rhandle)
cmd = ' '.join('"%s"' % x for x in cmd)
with open(wfd, 'wb', closefd=True) as to_child:
# start process
......
......@@ -26,6 +26,7 @@ from . import current_process
__all__ = ['ensure_running', 'register', 'unregister']
_semaphore_tracker_fd = None
_lock = threading.Lock()
......@@ -34,9 +35,9 @@ def ensure_running():
This can be run from any process. Usually a child process will use
the semaphore created by its parent.'''
global _semaphore_tracker_fd
with _lock:
config = current_process()._config
if config.get('semaphore_tracker_fd') is not None:
if _semaphore_tracker_fd is not None:
return
fds_to_pass = []
try:
......@@ -44,7 +45,7 @@ def ensure_running():
except Exception:
pass
cmd = 'from multiprocessing.semaphore_tracker import main; main(%d)'
r, semaphore_tracker_fd = util.pipe()
r, w = util.pipe()
try:
fds_to_pass.append(r)
# process will out live us, so no need to wait on pid
......@@ -53,10 +54,10 @@ def ensure_running():
args += ['-c', cmd % r]
util.spawnv_passfds(exe, args, fds_to_pass)
except:
os.close(semaphore_tracker_fd)
os.close(w)
raise
else:
config['semaphore_tracker_fd'] = semaphore_tracker_fd
_semaphore_tracker_fd = w
finally:
os.close(r)
......@@ -77,8 +78,7 @@ def _send(cmd, name):
# posix guarantees that writes to a pipe of less than PIPE_BUF
# bytes are atomic, and that PIPE_BUF >= 512
raise ValueError('name too long')
fd = current_process()._config['semaphore_tracker_fd']
nbytes = os.write(fd, msg)
nbytes = os.write(_semaphore_tracker_fd, msg)
assert nbytes == len(msg)
......
......@@ -66,32 +66,33 @@ def freeze_support():
sys.exit()
def get_command_line():
def get_command_line(**kwds):
'''
Returns prefix of command line used for spawning a child process
'''
if getattr(sys, 'frozen', False):
return [sys.executable, '--multiprocessing-fork']
else:
prog = 'from multiprocessing.spawn import spawn_main; spawn_main()'
prog = 'from multiprocessing.spawn import spawn_main; spawn_main(%s)'
prog %= ', '.join('%s=%r' % item for item in kwds.items())
opts = util._args_from_interpreter_flags()
return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
def spawn_main():
def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
'''
Run code specifed by data received over pipe
'''
assert is_forking(sys.argv)
handle = int(sys.argv[-1])
if sys.platform == 'win32':
import msvcrt
from .reduction import steal_handle
pid = int(sys.argv[-2])
new_handle = steal_handle(pid, handle)
new_handle = steal_handle(parent_pid, pipe_handle)
fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
else:
fd = handle
from . import semaphore_tracker
semaphore_tracker._semaphore_tracker_fd = tracker_fd
fd = pipe_handle
exitcode = _main(fd)
sys.exit(exitcode)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment