Commit 5458647b authored by Davin Potts's avatar Davin Potts

Issue #28053: Applying refactorings, docs and other cleanup to follow.

parent f1024f74
......@@ -20,11 +20,11 @@ import itertools
import _multiprocessing
from . import reduction
from . import util
from . import AuthenticationError, BufferTooShort
from .reduction import ForkingPickler
from .context import reduction
_ForkingPickler = reduction.ForkingPickler
try:
import _winapi
......@@ -203,7 +203,7 @@ class _ConnectionBase:
"""Send a (picklable) object"""
self._check_closed()
self._check_writable()
self._send_bytes(ForkingPickler.dumps(obj))
self._send_bytes(_ForkingPickler.dumps(obj))
def recv_bytes(self, maxlength=None):
"""
......@@ -248,7 +248,7 @@ class _ConnectionBase:
self._check_closed()
self._check_readable()
buf = self._recv_bytes()
return ForkingPickler.loads(buf.getbuffer())
return _ForkingPickler.loads(buf.getbuffer())
def poll(self, timeout=0.0):
"""Whether there is any input available to be read"""
......
......@@ -3,6 +3,7 @@ import sys
import threading
from . import process
from . import reduction
__all__ = [] # things are copied from here to __init__.py
......@@ -198,6 +199,16 @@ class BaseContext(object):
def set_start_method(self, method=None):
raise ValueError('cannot set start method of concrete context')
@property
def reducer(self):
'''Controls how objects will be reduced to a form that can be
shared with other processes.'''
return globals().get('reduction')
@reducer.setter
def reducer(self, reduction):
globals()['reduction'] = reduction
def _check_available(self):
pass
......@@ -245,7 +256,6 @@ class DefaultContext(BaseContext):
if sys.platform == 'win32':
return ['spawn']
else:
from . import reduction
if reduction.HAVE_SEND_HANDLE:
return ['fork', 'spawn', 'forkserver']
else:
......@@ -292,7 +302,6 @@ if sys.platform != 'win32':
_name = 'forkserver'
Process = ForkServerProcess
def _check_available(self):
from . import reduction
if not reduction.HAVE_SEND_HANDLE:
raise ValueError('forkserver start method not available')
......
......@@ -9,7 +9,7 @@ import threading
from . import connection
from . import process
from . import reduction
from .context import reduction
from . import semaphore_tracker
from . import spawn
from . import util
......
......@@ -14,8 +14,7 @@ import sys
import tempfile
import threading
from . import context
from . import reduction
from .context import reduction, assert_spawning
from . import util
__all__ = ['BufferWrapper']
......@@ -48,7 +47,7 @@ if sys.platform == 'win32':
self._state = (self.size, self.name)
def __getstate__(self):
context.assert_spawning(self)
assert_spawning(self)
return self._state
def __setstate__(self, state):
......
......@@ -23,10 +23,9 @@ from time import time as _time
from traceback import format_exc
from . import connection
from . import context
from .context import reduction, get_spawning_popen
from . import pool
from . import process
from . import reduction
from . import util
from . import get_context
......@@ -833,7 +832,7 @@ class BaseProxy(object):
def __reduce__(self):
kwds = {}
if context.get_spawning_popen() is not None:
if get_spawning_popen() is not None:
kwds['authkey'] = self._authkey
if getattr(self, '_isauto', False):
......
import io
import os
from . import reduction
from .context import reduction, set_spawning_popen
if not reduction.HAVE_SEND_HANDLE:
raise ImportError('No support for sending fds between processes')
from . import context
from . import forkserver
from . import popen_fork
from . import spawn
......@@ -42,12 +41,12 @@ class Popen(popen_fork.Popen):
def _launch(self, process_obj):
prep_data = spawn.get_preparation_data(process_obj._name)
buf = io.BytesIO()
context.set_spawning_popen(self)
set_spawning_popen(self)
try:
reduction.dump(prep_data, buf)
reduction.dump(process_obj, buf)
finally:
context.set_spawning_popen(None)
set_spawning_popen(None)
self.sentinel, w = forkserver.connect_to_new_process(self._fds)
util.Finalize(self, os.close, (self.sentinel,))
......
import io
import os
from . import context
from .context import reduction, set_spawning_popen
from . import popen_fork
from . import reduction
from . import spawn
from . import util
......@@ -42,12 +41,12 @@ class Popen(popen_fork.Popen):
self._fds.append(tracker_fd)
prep_data = spawn.get_preparation_data(process_obj._name)
fp = io.BytesIO()
context.set_spawning_popen(self)
set_spawning_popen(self)
try:
reduction.dump(prep_data, fp)
reduction.dump(process_obj, fp)
finally:
context.set_spawning_popen(None)
set_spawning_popen(None)
parent_r = child_w = child_r = parent_w = None
try:
......
......@@ -4,9 +4,8 @@ import signal
import sys
import _winapi
from . import context
from .context import reduction, get_spawning_popen, set_spawning_popen
from . import spawn
from . import reduction
from . import util
__all__ = ['Popen']
......@@ -60,15 +59,15 @@ class Popen(object):
util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
# send information to child
context.set_spawning_popen(self)
set_spawning_popen(self)
try:
reduction.dump(prep_data, to_child)
reduction.dump(process_obj, to_child)
finally:
context.set_spawning_popen(None)
set_spawning_popen(None)
def duplicate_for_child(self, handle):
assert self is context.get_spawning_popen()
assert self is get_spawning_popen()
return reduction.duplicate(handle, self.sentinel)
def wait(self, timeout=None):
......
......@@ -23,9 +23,9 @@ import _multiprocessing
from . import connection
from . import context
_ForkingPickler = context.reduction.ForkingPickler
from .util import debug, info, Finalize, register_after_fork, is_exiting
from .reduction import ForkingPickler
#
# Queue type using a pipe, buffer and thread
......@@ -110,7 +110,7 @@ class Queue(object):
finally:
self._rlock.release()
# unserialize the data after having released the lock
return ForkingPickler.loads(res)
return _ForkingPickler.loads(res)
def qsize(self):
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
......@@ -238,7 +238,7 @@ class Queue(object):
return
# serialize the data before acquiring the lock
obj = ForkingPickler.dumps(obj)
obj = _ForkingPickler.dumps(obj)
if wacquire is None:
send_bytes(obj)
else:
......@@ -342,11 +342,11 @@ class SimpleQueue(object):
with self._rlock:
res = self._reader.recv_bytes()
# unserialize the data after having released the lock
return ForkingPickler.loads(res)
return _ForkingPickler.loads(res)
def put(self, obj):
# serialize the data before acquiring the lock
obj = ForkingPickler.dumps(obj)
obj = _ForkingPickler.dumps(obj)
if self._wlock is None:
# writes to a message oriented win32 pipe are atomic
self._writer.send_bytes(obj)
......
......@@ -7,6 +7,7 @@
# Licensed to PSF under a Contributor Agreement.
#
from abc import ABCMeta, abstractmethod
import copyreg
import functools
import io
......@@ -238,3 +239,36 @@ else:
fd = df.detach()
return socket.socket(family, type, proto, fileno=fd)
register(socket.socket, _reduce_socket)
class AbstractReducer(metaclass=ABCMeta):
'''Abstract base class for use in implementing a Reduction class
suitable for use in replacing the standard reduction mechanism
used in multiprocessing.'''
ForkingPickler = ForkingPickler
register = register
dump = dump
send_handle = send_handle
recv_handle = recv_handle
if sys.platform == 'win32':
steal_handle = steal_handle
duplicate = duplicate
DupHandle = DupHandle
else:
sendfds = sendfds
recvfds = recvfds
DupFd = DupFd
_reduce_method = _reduce_method
_reduce_method_descriptor = _reduce_method_descriptor
_rebuild_partial = _rebuild_partial
_reduce_socket = _reduce_socket
_rebuild_socket = _rebuild_socket
def __init__(self, *args):
register(type(_C().f), _reduce_method)
register(type(list.append), _reduce_method_descriptor)
register(type(int.__add__), _reduce_method_descriptor)
register(functools.partial, _reduce_partial)
register(socket.socket, _reduce_socket)
......@@ -15,7 +15,7 @@ import sys
import threading
from . import process
from . import reduction
from .context import reduction
from . import util
__all__ = ['stop']
......
......@@ -13,8 +13,8 @@ import weakref
from . import heap
from . import get_context
from .context import assert_spawning
from .reduction import ForkingPickler
from .context import reduction, assert_spawning
_ForkingPickler = reduction.ForkingPickler
__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
......@@ -134,7 +134,7 @@ def reduce_ctype(obj):
def rebuild_ctype(type_, wrapper, length):
if length is not None:
type_ = type_ * length
ForkingPickler.register(type_, reduce_ctype)
_ForkingPickler.register(type_, reduce_ctype)
buf = wrapper.create_memoryview()
obj = type_.from_buffer(buf)
obj._wrapper = wrapper
......
......@@ -9,13 +9,13 @@
#
import os
import pickle
import sys
import runpy
import types
from . import get_start_method, set_start_method
from . import process
from .context import reduction
from . import util
__all__ = ['_main', 'freeze_support', 'set_executable', 'get_executable',
......@@ -96,8 +96,7 @@ def spawn_main(pipe_handle, parent_pid=None, tracker_fd=None):
assert is_forking(sys.argv)
if sys.platform == 'win32':
import msvcrt
from .reduction import steal_handle
new_handle = steal_handle(parent_pid, pipe_handle)
new_handle = reduction.steal_handle(parent_pid, pipe_handle)
fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
else:
from . import semaphore_tracker
......@@ -111,9 +110,9 @@ def _main(fd):
with os.fdopen(fd, 'rb', closefd=True) as from_parent:
process.current_process()._inheriting = True
try:
preparation_data = pickle.load(from_parent)
preparation_data = reduction.pickle.load(from_parent)
prepare(preparation_data)
self = pickle.load(from_parent)
self = reduction.pickle.load(from_parent)
finally:
del process.current_process()._inheriting
return self._bootstrap()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment