Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
cpython
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
cpython
Commits
5438ed15
Commit
5438ed15
authored
Apr 24, 2012
by
Antoine Pitrou
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Issue #4892: multiprocessing Connections can now be transferred over multiprocessing Connections.
Patch by Richard Oudkerk (sbt).
parent
9f478c02
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
283 additions
and
193 deletions
+283
-193
Doc/library/multiprocessing.rst
Doc/library/multiprocessing.rst
+4
-0
Lib/multiprocessing/__init__.py
Lib/multiprocessing/__init__.py
+3
-1
Lib/multiprocessing/connection.py
Lib/multiprocessing/connection.py
+22
-2
Lib/multiprocessing/forking.py
Lib/multiprocessing/forking.py
+0
-19
Lib/multiprocessing/reduction.py
Lib/multiprocessing/reduction.py
+178
-137
Lib/test/test_multiprocessing.py
Lib/test/test_multiprocessing.py
+69
-34
Misc/NEWS
Misc/NEWS
+3
-0
Modules/_winapi.c
Modules/_winapi.c
+4
-0
No files found.
Doc/library/multiprocessing.rst
View file @
5438ed15
...
@@ -832,6 +832,10 @@ Connection objects are usually created using :func:`Pipe` -- see also
...
@@ -832,6 +832,10 @@ Connection objects are usually created using :func:`Pipe` -- see also
raised
and
the
complete
message
is
available
as
``
e
.
args
[
0
]``
where
``
e
``
raised
and
the
complete
message
is
available
as
``
e
.
args
[
0
]``
where
``
e
``
is
the
exception
instance
.
is
the
exception
instance
.
..
versionchanged
::
3.3
Connection
objects
themselves
can
now
be
transferred
between
processes
using
:
meth
:`
Connection
.
send
`
and
:
meth
:`
Connection
.
recv
`.
For
example
:
For
example
:
...
...
Lib/multiprocessing/__init__.py
View file @
5438ed15
...
@@ -161,7 +161,9 @@ def allow_connection_pickling():
...
@@ -161,7 +161,9 @@ def allow_connection_pickling():
'''
'''
Install support for sending connections and sockets between processes
Install support for sending connections and sockets between processes
'''
'''
from
multiprocessing
import
reduction
# This is undocumented. In previous versions of multiprocessing
# its only effect was to make socket objects inheritable on Windows.
import
multiprocessing.connection
#
#
# Definitions depending on native semaphores
# Definitions depending on native semaphores
...
...
Lib/multiprocessing/connection.py
View file @
5438ed15
...
@@ -50,6 +50,7 @@ import _multiprocessing
...
@@ -50,6 +50,7 @@ import _multiprocessing
from
multiprocessing
import
current_process
,
AuthenticationError
,
BufferTooShort
from
multiprocessing
import
current_process
,
AuthenticationError
,
BufferTooShort
from
multiprocessing.util
import
(
from
multiprocessing.util
import
(
get_temp_dir
,
Finalize
,
sub_debug
,
debug
,
_eintr_retry
)
get_temp_dir
,
Finalize
,
sub_debug
,
debug
,
_eintr_retry
)
from
multiprocessing.forking
import
ForkingPickler
try
:
try
:
import
_winapi
import
_winapi
from
_winapi
import
WAIT_OBJECT_0
,
WAIT_TIMEOUT
,
INFINITE
from
_winapi
import
WAIT_OBJECT_0
,
WAIT_TIMEOUT
,
INFINITE
...
@@ -227,8 +228,9 @@ class _ConnectionBase:
...
@@ -227,8 +228,9 @@ class _ConnectionBase:
"""Send a (picklable) object"""
"""Send a (picklable) object"""
self
.
_check_closed
()
self
.
_check_closed
()
self
.
_check_writable
()
self
.
_check_writable
()
buf
=
pickle
.
dumps
(
obj
,
protocol
=
pickle
.
HIGHEST_PROTOCOL
)
buf
=
io
.
BytesIO
()
self
.
_send_bytes
(
memoryview
(
buf
))
ForkingPickler
(
buf
,
pickle
.
HIGHEST_PROTOCOL
).
dump
(
obj
)
self
.
_send_bytes
(
buf
.
getbuffer
())
def
recv_bytes
(
self
,
maxlength
=
None
):
def
recv_bytes
(
self
,
maxlength
=
None
):
"""
"""
...
@@ -880,3 +882,21 @@ else:
...
@@ -880,3 +882,21 @@ else:
raise
raise
if
timeout
is
not
None
:
if
timeout
is
not
None
:
timeout
=
deadline
-
time
.
time
()
timeout
=
deadline
-
time
.
time
()
#
# Make connection and socket objects sharable if possible
#
if
sys
.
platform
==
'win32'
:
from
.
import
reduction
ForkingPickler
.
register
(
socket
.
socket
,
reduction
.
reduce_socket
)
ForkingPickler
.
register
(
Connection
,
reduction
.
reduce_connection
)
ForkingPickler
.
register
(
PipeConnection
,
reduction
.
reduce_pipe_connection
)
else
:
try
:
from
.
import
reduction
except
ImportError
:
pass
else
:
ForkingPickler
.
register
(
socket
.
socket
,
reduction
.
reduce_socket
)
ForkingPickler
.
register
(
Connection
,
reduction
.
reduce_connection
)
Lib/multiprocessing/forking.py
View file @
5438ed15
...
@@ -407,25 +407,6 @@ else:
...
@@ -407,25 +407,6 @@ else:
return
d
return
d
#
# Make (Pipe)Connection picklable
#
# Late import because of circular import
from
.connection
import
Connection
,
PipeConnection
def
reduce_connection
(
conn
):
if
not
Popen
.
thread_is_spawning
():
raise
RuntimeError
(
'By default %s objects can only be shared between processes
\
n
'
'using inheritance'
%
type
(
conn
).
__name__
)
return
type
(
conn
),
(
Popen
.
duplicate_for_child
(
conn
.
fileno
()),
conn
.
readable
,
conn
.
writable
)
ForkingPickler
.
register
(
Connection
,
reduce_connection
)
ForkingPickler
.
register
(
PipeConnection
,
reduce_connection
)
#
#
# Prepare current process
# Prepare current process
#
#
...
...
Lib/multiprocessing/reduction.py
View file @
5438ed15
...
@@ -33,7 +33,7 @@
...
@@ -33,7 +33,7 @@
# SUCH DAMAGE.
# SUCH DAMAGE.
#
#
__all__
=
[]
__all__
=
[
'reduce_socket'
,
'reduce_connection'
,
'send_handle'
,
'recv_handle'
]
import
os
import
os
import
sys
import
sys
...
@@ -42,9 +42,8 @@ import threading
...
@@ -42,9 +42,8 @@ import threading
import
struct
import
struct
from
multiprocessing
import
current_process
from
multiprocessing
import
current_process
from
multiprocessing.forking
import
Popen
,
duplicate
,
close
,
ForkingPickler
from
multiprocessing.util
import
register_after_fork
,
debug
,
sub_debug
from
multiprocessing.util
import
register_after_fork
,
debug
,
sub_debug
from
multiprocessing.
connection
import
Client
,
Listener
,
Connection
from
multiprocessing.
util
import
is_exiting
,
sub_warning
#
#
...
@@ -60,22 +59,91 @@ if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
...
@@ -60,22 +59,91 @@ if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
#
#
if
sys
.
platform
==
'win32'
:
if
sys
.
platform
==
'win32'
:
# Windows
__all__
+=
[
'reduce_pipe_connection'
]
import
_winapi
import
_winapi
def
send_handle
(
conn
,
handle
,
destination_pid
):
def
send_handle
(
conn
,
handle
,
destination_pid
):
process_handle
=
_winapi
.
OpenProcess
(
dh
=
DupHandle
(
handle
,
_winapi
.
DUPLICATE_SAME_ACCESS
,
destination_pid
)
_winapi
.
PROCESS_ALL_ACCESS
,
False
,
destination_pid
conn
.
send
(
dh
)
)
try
:
new_handle
=
duplicate
(
handle
,
process_handle
)
conn
.
send
(
new_handle
)
finally
:
close
(
process_handle
)
def
recv_handle
(
conn
):
def
recv_handle
(
conn
):
return
conn
.
recv
()
return
conn
.
recv
().
detach
()
class
DupHandle
(
object
):
def
__init__
(
self
,
handle
,
access
,
pid
=
None
):
# duplicate handle for process with given pid
if
pid
is
None
:
pid
=
os
.
getpid
()
proc
=
_winapi
.
OpenProcess
(
_winapi
.
PROCESS_DUP_HANDLE
,
False
,
pid
)
try
:
self
.
_handle
=
_winapi
.
DuplicateHandle
(
_winapi
.
GetCurrentProcess
(),
handle
,
proc
,
access
,
False
,
0
)
finally
:
_winapi
.
CloseHandle
(
proc
)
self
.
_access
=
access
self
.
_pid
=
pid
def
detach
(
self
):
# retrieve handle from process which currently owns it
if
self
.
_pid
==
os
.
getpid
():
return
self
.
_handle
proc
=
_winapi
.
OpenProcess
(
_winapi
.
PROCESS_DUP_HANDLE
,
False
,
self
.
_pid
)
try
:
return
_winapi
.
DuplicateHandle
(
proc
,
self
.
_handle
,
_winapi
.
GetCurrentProcess
(),
self
.
_access
,
False
,
_winapi
.
DUPLICATE_CLOSE_SOURCE
)
finally
:
_winapi
.
CloseHandle
(
proc
)
class
DupSocket
(
object
):
def
__init__
(
self
,
sock
):
new_sock
=
sock
.
dup
()
def
send
(
conn
,
pid
):
share
=
new_sock
.
share
(
pid
)
conn
.
send_bytes
(
share
)
self
.
_id
=
resource_sharer
.
register
(
send
,
new_sock
.
close
)
def
detach
(
self
):
conn
=
resource_sharer
.
get_connection
(
self
.
_id
)
try
:
share
=
conn
.
recv_bytes
()
return
socket
.
fromshare
(
share
)
finally
:
conn
.
close
()
def
reduce_socket
(
s
):
return
rebuild_socket
,
(
DupSocket
(
s
),)
def
rebuild_socket
(
ds
):
return
ds
.
detach
()
def
reduce_connection
(
conn
):
handle
=
conn
.
fileno
()
with
socket
.
fromfd
(
handle
,
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
as
s
:
ds
=
DupSocket
(
s
)
return
rebuild_connection
,
(
ds
,
conn
.
readable
,
conn
.
writable
)
def
rebuild_connection
(
ds
,
readable
,
writable
):
from
.connection
import
Connection
sock
=
ds
.
detach
()
return
Connection
(
sock
.
detach
(),
readable
,
writable
)
def
reduce_pipe_connection
(
conn
):
access
=
((
_winapi
.
FILE_GENERIC_READ
if
conn
.
readable
else
0
)
|
(
_winapi
.
FILE_GENERIC_WRITE
if
conn
.
writable
else
0
))
dh
=
DupHandle
(
conn
.
fileno
(),
access
)
return
rebuild_pipe_connection
,
(
dh
,
conn
.
readable
,
conn
.
writable
)
def
rebuild_pipe_connection
(
dh
,
readable
,
writable
):
from
.connection
import
PipeConnection
handle
=
dh
.
detach
()
return
PipeConnection
(
handle
,
readable
,
writable
)
else
:
else
:
# Unix
def
send_handle
(
conn
,
handle
,
destination_pid
):
def
send_handle
(
conn
,
handle
,
destination_pid
):
with
socket
.
fromfd
(
conn
.
fileno
(),
socket
.
AF_UNIX
,
socket
.
SOCK_STREAM
)
as
s
:
with
socket
.
fromfd
(
conn
.
fileno
(),
socket
.
AF_UNIX
,
socket
.
SOCK_STREAM
)
as
s
:
s
.
sendmsg
([
b'x'
],
[(
socket
.
SOL_SOCKET
,
socket
.
SCM_RIGHTS
,
s
.
sendmsg
([
b'x'
],
[(
socket
.
SOL_SOCKET
,
socket
.
SCM_RIGHTS
,
...
@@ -94,136 +162,109 @@ else:
...
@@ -94,136 +162,109 @@ else:
pass
pass
raise
RuntimeError
(
'Invalid data received'
)
raise
RuntimeError
(
'Invalid data received'
)
class
DupFd
(
object
):
def
__init__
(
self
,
fd
):
new_fd
=
os
.
dup
(
fd
)
def
send
(
conn
,
pid
):
send_handle
(
conn
,
new_fd
,
pid
)
def
close
():
os
.
close
(
new_fd
)
self
.
_id
=
resource_sharer
.
register
(
send
,
close
)
def
detach
(
self
):
conn
=
resource_sharer
.
get_connection
(
self
.
_id
)
try
:
return
recv_handle
(
conn
)
finally
:
conn
.
close
()
#
def
reduce_socket
(
s
):
# Support for a per-process server thread which caches pickled handles
df
=
DupFd
(
s
.
fileno
())
#
return
rebuild_socket
,
(
df
,
s
.
family
,
s
.
type
,
s
.
proto
)
_cache
=
set
()
def
_reset
(
obj
):
global
_lock
,
_listener
,
_cache
for
h
in
_cache
:
close
(
h
)
_cache
.
clear
()
_lock
=
threading
.
Lock
()
_listener
=
None
_reset
(
None
)
register_after_fork
(
_reset
,
_reset
)
def
_get_listener
():
global
_listener
if
_listener
is
None
:
_lock
.
acquire
()
try
:
if
_listener
is
None
:
debug
(
'starting listener and thread for sending handles'
)
_listener
=
Listener
(
authkey
=
current_process
().
authkey
)
t
=
threading
.
Thread
(
target
=
_serve
)
t
.
daemon
=
True
t
.
start
()
finally
:
_lock
.
release
()
return
_listener
def
_serve
():
from
.util
import
is_exiting
,
sub_warning
while
1
:
try
:
conn
=
_listener
.
accept
()
handle_wanted
,
destination_pid
=
conn
.
recv
()
_cache
.
remove
(
handle_wanted
)
send_handle
(
conn
,
handle_wanted
,
destination_pid
)
close
(
handle_wanted
)
conn
.
close
()
except
:
if
not
is_exiting
():
import
traceback
sub_warning
(
'thread for sharing handles raised exception :
\
n
'
+
'-'
*
79
+
'
\
n
'
+
traceback
.
format_exc
()
+
'-'
*
79
)
#
# Functions to be used for pickling/unpickling objects with handles
#
def
reduce_handle
(
handle
):
if
Popen
.
thread_is_spawning
():
return
(
None
,
Popen
.
duplicate_for_child
(
handle
),
True
)
dup_handle
=
duplicate
(
handle
)
_cache
.
add
(
dup_handle
)
sub_debug
(
'reducing handle %d'
,
handle
)
return
(
_get_listener
().
address
,
dup_handle
,
False
)
def
rebuild_handle
(
pickled_data
):
address
,
handle
,
inherited
=
pickled_data
if
inherited
:
return
handle
sub_debug
(
'rebuilding handle %d'
,
handle
)
conn
=
Client
(
address
,
authkey
=
current_process
().
authkey
)
conn
.
send
((
handle
,
os
.
getpid
()))
new_handle
=
recv_handle
(
conn
)
conn
.
close
()
return
new_handle
#
# Register `Connection` with `ForkingPickler`
#
def
reduce_connection
(
conn
):
rh
=
reduce_handle
(
conn
.
fileno
())
return
rebuild_connection
,
(
rh
,
conn
.
readable
,
conn
.
writable
)
def
rebuild_connection
(
reduced_handle
,
readable
,
writable
):
handle
=
rebuild_handle
(
reduced_handle
)
return
Connection
(
handle
,
readable
=
readable
,
writable
=
writable
)
ForkingPickler
.
register
(
Connection
,
reduce_connection
)
#
# Register `socket.socket` with `ForkingPickler`
#
def
fromfd
(
fd
,
family
,
type_
,
proto
=
0
):
s
=
socket
.
fromfd
(
fd
,
family
,
type_
,
proto
)
if
s
.
__class__
is
not
socket
.
socket
:
s
=
socket
.
socket
(
_sock
=
s
)
return
s
def
reduce_socket
(
s
):
def
rebuild_socket
(
df
,
family
,
type
,
proto
):
reduced_handle
=
reduce_handle
(
s
.
fileno
())
fd
=
df
.
detach
()
return
rebuild_socket
,
(
reduced_handle
,
s
.
family
,
s
.
type
,
s
.
proto
)
s
=
socket
.
fromfd
(
fd
,
family
,
type
,
proto
)
os
.
close
(
fd
)
return
s
def
rebuild_socket
(
reduced_handle
,
family
,
type_
,
proto
):
def
reduce_connection
(
conn
):
fd
=
rebuild_handle
(
reduced_handle
)
df
=
DupFd
(
conn
.
fileno
())
_sock
=
fromfd
(
fd
,
family
,
type_
,
proto
)
return
rebuild_connection
,
(
df
,
conn
.
readable
,
conn
.
writable
)
close
(
fd
)
return
_sock
ForkingPickler
.
register
(
socket
.
socket
,
reduce_socket
)
def
rebuild_connection
(
df
,
readable
,
writable
):
from
.connection
import
Connection
fd
=
df
.
detach
()
return
Connection
(
fd
,
readable
,
writable
)
#
#
#
Register `_multiprocessing.PipeConnection` with `ForkingPickler`
#
Server which shares registered resources with clients
#
#
if
sys
.
platform
==
'win32'
:
class
ResourceSharer
(
object
):
from
multiprocessing.connection
import
PipeConnection
def
__init__
(
self
):
self
.
_key
=
0
def
reduce_pipe_connection
(
conn
):
self
.
_cache
=
{}
rh
=
reduce_handle
(
conn
.
fileno
())
self
.
_old_locks
=
[]
return
rebuild_pipe_connection
,
(
rh
,
conn
.
readable
,
conn
.
writable
)
self
.
_lock
=
threading
.
Lock
()
self
.
_listener
=
None
def
rebuild_pipe_connection
(
reduced_handle
,
readable
,
writable
):
self
.
_address
=
None
handle
=
rebuild_handle
(
reduced_handle
)
register_after_fork
(
self
,
ResourceSharer
.
_afterfork
)
return
PipeConnection
(
handle
,
readable
=
readable
,
writable
=
writable
def
register
(
self
,
send
,
close
):
)
with
self
.
_lock
:
if
self
.
_address
is
None
:
ForkingPickler
.
register
(
PipeConnection
,
reduce_pipe_connection
)
self
.
_start
()
self
.
_key
+=
1
self
.
_cache
[
self
.
_key
]
=
(
send
,
close
)
return
(
self
.
_address
,
self
.
_key
)
@
staticmethod
def
get_connection
(
ident
):
from
.connection
import
Client
address
,
key
=
ident
c
=
Client
(
address
,
authkey
=
current_process
().
authkey
)
c
.
send
((
key
,
os
.
getpid
()))
return
c
def
_afterfork
(
self
):
for
key
,
(
send
,
close
)
in
self
.
_cache
.
items
():
close
()
self
.
_cache
.
clear
()
# If self._lock was locked at the time of the fork, it may be broken
# -- see issue 6721. Replace it without letting it be gc'ed.
self
.
_old_locks
.
append
(
self
.
_lock
)
self
.
_lock
=
threading
.
Lock
()
if
self
.
_listener
is
not
None
:
self
.
_listener
.
close
()
self
.
_listener
=
None
self
.
_address
=
None
def
_start
(
self
):
from
.connection
import
Listener
assert
self
.
_listener
is
None
debug
(
'starting listener and thread for sending handles'
)
self
.
_listener
=
Listener
(
authkey
=
current_process
().
authkey
)
self
.
_address
=
self
.
_listener
.
address
t
=
threading
.
Thread
(
target
=
self
.
_serve
)
t
.
daemon
=
True
t
.
start
()
def
_serve
(
self
):
while
1
:
try
:
conn
=
self
.
_listener
.
accept
()
key
,
destination_pid
=
conn
.
recv
()
send
,
close
=
self
.
_cache
.
pop
(
key
)
send
(
conn
,
destination_pid
)
close
()
conn
.
close
()
except
:
if
not
is_exiting
():
import
traceback
sub_warning
(
'thread for sharing handles raised exception :
\
n
'
+
'-'
*
79
+
'
\
n
'
+
traceback
.
format_exc
()
+
'-'
*
79
)
resource_sharer
=
ResourceSharer
()
Lib/test/test_multiprocessing.py
View file @
5438ed15
...
@@ -1959,49 +1959,49 @@ class _TestPoll(unittest.TestCase):
...
@@ -1959,49 +1959,49 @@ class _TestPoll(unittest.TestCase):
#
#
# Test of sending connection and socket objects between processes
# Test of sending connection and socket objects between processes
#
#
"""
@
unittest
.
skipUnless
(
HAS_REDUCTION
,
"test needs multiprocessing.reduction"
)
class
_TestPicklingConnections
(
BaseTestCase
):
class
_TestPicklingConnections
(
BaseTestCase
):
ALLOWED_TYPES
=
(
'processes'
,)
ALLOWED_TYPES
=
(
'processes'
,)
def _listener(self, conn, families):
@
classmethod
def
_listener
(
cls
,
conn
,
families
):
for
fam
in
families
:
for
fam
in
families
:
l =
self
.connection.Listener(family=fam)
l
=
cls
.
connection
.
Listener
(
family
=
fam
)
conn
.
send
(
l
.
address
)
conn
.
send
(
l
.
address
)
new_conn
=
l
.
accept
()
new_conn
=
l
.
accept
()
conn
.
send
(
new_conn
)
conn
.
send
(
new_conn
)
new_conn
.
close
()
l
.
close
()
if self.TYPE == 'processes':
l
=
socket
.
socket
()
l = socket.socket()
l
.
bind
((
'localhost'
,
0
))
l.bind(('localhost', 0))
conn
.
send
(
l
.
getsockname
())
conn.send(l.getsockname())
l
.
listen
(
1
)
l.listen(1)
new_conn
,
addr
=
l
.
accept
()
new_conn, addr = l.accept()
conn
.
send
(
new_conn
)
conn.send(new_conn)
new_conn
.
close
()
l
.
close
()
conn
.
recv
()
conn
.
recv
()
def _remote(self, conn):
@
classmethod
def
_remote
(
cls
,
conn
):
for
(
address
,
msg
)
in
iter
(
conn
.
recv
,
None
):
for
(
address
,
msg
)
in
iter
(
conn
.
recv
,
None
):
client =
self
.connection.Client(address)
client
=
cls
.
connection
.
Client
(
address
)
client
.
send
(
msg
.
upper
())
client
.
send
(
msg
.
upper
())
client
.
close
()
client
.
close
()
if self.TYPE == 'processes':
address
,
msg
=
conn
.
recv
()
address, msg = conn.recv()
client
=
socket
.
socket
()
client = socket.socket()
client
.
connect
(
address
)
client.connect(address)
client
.
sendall
(
msg
.
upper
())
client.sendall(msg.upper())
client
.
close
()
client.close()
conn
.
close
()
conn
.
close
()
def
test_pickling
(
self
):
def
test_pickling
(
self
):
try:
multiprocessing.allow_connection_pickling()
except ImportError:
return
families
=
self
.
connection
.
families
families
=
self
.
connection
.
families
lconn
,
lconn0
=
self
.
Pipe
()
lconn
,
lconn0
=
self
.
Pipe
()
...
@@ -2025,16 +2025,12 @@ class _TestPicklingConnections(BaseTestCase):
...
@@ -2025,16 +2025,12 @@ class _TestPicklingConnections(BaseTestCase):
rconn
.
send
(
None
)
rconn
.
send
(
None
)
if self.TYPE == 'processes':
msg
=
latin
(
'This connection uses a normal socket'
)
msg = latin('This connection uses a normal socket')
address
=
lconn
.
recv
()
address = lconn.recv()
rconn
.
send
((
address
,
msg
))
rconn.send((address, msg))
new_conn
=
lconn
.
recv
()
if hasattr(socket, 'fromfd'):
self
.
assertEqual
(
new_conn
.
recv
(
100
),
msg
.
upper
())
new_conn = lconn.recv()
new_conn
.
close
()
self.assertEqual(new_conn.recv(100), msg.upper())
else:
# XXX On Windows with Py2.6 need to backport fromfd()
discard = lconn.recv_bytes()
lconn
.
send
(
None
)
lconn
.
send
(
None
)
...
@@ -2043,7 +2039,46 @@ class _TestPicklingConnections(BaseTestCase):
...
@@ -2043,7 +2039,46 @@ class _TestPicklingConnections(BaseTestCase):
lp
.
join
()
lp
.
join
()
rp
.
join
()
rp
.
join
()
"""
@
classmethod
def
child_access
(
cls
,
conn
):
w
=
conn
.
recv
()
w
.
send
(
'all is well'
)
w
.
close
()
r
=
conn
.
recv
()
msg
=
r
.
recv
()
conn
.
send
(
msg
*
2
)
conn
.
close
()
def
test_access
(
self
):
# On Windows, if we do not specify a destination pid when
# using DupHandle then we need to be careful to use the
# correct access flags for DuplicateHandle(), or else
# DupHandle.detach() will raise PermissionError. For example,
# for a read only pipe handle we should use
# access=FILE_GENERIC_READ. (Unfortunately
# DUPLICATE_SAME_ACCESS does not work.)
conn
,
child_conn
=
self
.
Pipe
()
p
=
self
.
Process
(
target
=
self
.
child_access
,
args
=
(
child_conn
,))
p
.
daemon
=
True
p
.
start
()
child_conn
.
close
()
r
,
w
=
self
.
Pipe
(
duplex
=
False
)
conn
.
send
(
w
)
w
.
close
()
self
.
assertEqual
(
r
.
recv
(),
'all is well'
)
r
.
close
()
r
,
w
=
self
.
Pipe
(
duplex
=
False
)
conn
.
send
(
r
)
r
.
close
()
w
.
send
(
'foobar'
)
w
.
close
()
self
.
assertEqual
(
conn
.
recv
(),
'foobar'
*
2
)
#
#
#
#
#
#
...
...
Misc/NEWS
View file @
5438ed15
...
@@ -71,6 +71,9 @@ Core and Builtins
...
@@ -71,6 +71,9 @@ Core and Builtins
Library
Library
-------
-------
- Issue #4892: multiprocessing Connections can now be transferred over
multiprocessing Connections. Patch by Richard Oudkerk (sbt).
- Issue #14160: TarFile.extractfile() failed to resolve symbolic links when
- Issue #14160: TarFile.extractfile() failed to resolve symbolic links when
the links were not located in an archive subdirectory.
the links were not located in an archive subdirectory.
...
...
Modules/_winapi.c
View file @
5438ed15
...
@@ -1280,6 +1280,7 @@ PyInit__winapi(void)
...
@@ -1280,6 +1280,7 @@ PyInit__winapi(void)
WINAPI_CONSTANT
(
F_DWORD
,
CREATE_NEW_CONSOLE
);
WINAPI_CONSTANT
(
F_DWORD
,
CREATE_NEW_CONSOLE
);
WINAPI_CONSTANT
(
F_DWORD
,
CREATE_NEW_PROCESS_GROUP
);
WINAPI_CONSTANT
(
F_DWORD
,
CREATE_NEW_PROCESS_GROUP
);
WINAPI_CONSTANT
(
F_DWORD
,
DUPLICATE_SAME_ACCESS
);
WINAPI_CONSTANT
(
F_DWORD
,
DUPLICATE_SAME_ACCESS
);
WINAPI_CONSTANT
(
F_DWORD
,
DUPLICATE_CLOSE_SOURCE
);
WINAPI_CONSTANT
(
F_DWORD
,
ERROR_ALREADY_EXISTS
);
WINAPI_CONSTANT
(
F_DWORD
,
ERROR_ALREADY_EXISTS
);
WINAPI_CONSTANT
(
F_DWORD
,
ERROR_BROKEN_PIPE
);
WINAPI_CONSTANT
(
F_DWORD
,
ERROR_BROKEN_PIPE
);
WINAPI_CONSTANT
(
F_DWORD
,
ERROR_IO_PENDING
);
WINAPI_CONSTANT
(
F_DWORD
,
ERROR_IO_PENDING
);
...
@@ -1298,6 +1299,8 @@ PyInit__winapi(void)
...
@@ -1298,6 +1299,8 @@ PyInit__winapi(void)
WINAPI_CONSTANT
(
F_DWORD
,
ERROR_SEM_TIMEOUT
);
WINAPI_CONSTANT
(
F_DWORD
,
ERROR_SEM_TIMEOUT
);
WINAPI_CONSTANT
(
F_DWORD
,
FILE_FLAG_FIRST_PIPE_INSTANCE
);
WINAPI_CONSTANT
(
F_DWORD
,
FILE_FLAG_FIRST_PIPE_INSTANCE
);
WINAPI_CONSTANT
(
F_DWORD
,
FILE_FLAG_OVERLAPPED
);
WINAPI_CONSTANT
(
F_DWORD
,
FILE_FLAG_OVERLAPPED
);
WINAPI_CONSTANT
(
F_DWORD
,
FILE_GENERIC_READ
);
WINAPI_CONSTANT
(
F_DWORD
,
FILE_GENERIC_WRITE
);
WINAPI_CONSTANT
(
F_DWORD
,
GENERIC_READ
);
WINAPI_CONSTANT
(
F_DWORD
,
GENERIC_READ
);
WINAPI_CONSTANT
(
F_DWORD
,
GENERIC_WRITE
);
WINAPI_CONSTANT
(
F_DWORD
,
GENERIC_WRITE
);
WINAPI_CONSTANT
(
F_DWORD
,
INFINITE
);
WINAPI_CONSTANT
(
F_DWORD
,
INFINITE
);
...
@@ -1310,6 +1313,7 @@ PyInit__winapi(void)
...
@@ -1310,6 +1313,7 @@ PyInit__winapi(void)
WINAPI_CONSTANT
(
F_DWORD
,
PIPE_UNLIMITED_INSTANCES
);
WINAPI_CONSTANT
(
F_DWORD
,
PIPE_UNLIMITED_INSTANCES
);
WINAPI_CONSTANT
(
F_DWORD
,
PIPE_WAIT
);
WINAPI_CONSTANT
(
F_DWORD
,
PIPE_WAIT
);
WINAPI_CONSTANT
(
F_DWORD
,
PROCESS_ALL_ACCESS
);
WINAPI_CONSTANT
(
F_DWORD
,
PROCESS_ALL_ACCESS
);
WINAPI_CONSTANT
(
F_DWORD
,
PROCESS_DUP_HANDLE
);
WINAPI_CONSTANT
(
F_DWORD
,
STARTF_USESHOWWINDOW
);
WINAPI_CONSTANT
(
F_DWORD
,
STARTF_USESHOWWINDOW
);
WINAPI_CONSTANT
(
F_DWORD
,
STARTF_USESTDHANDLES
);
WINAPI_CONSTANT
(
F_DWORD
,
STARTF_USESTDHANDLES
);
WINAPI_CONSTANT
(
F_DWORD
,
STD_INPUT_HANDLE
);
WINAPI_CONSTANT
(
F_DWORD
,
STD_INPUT_HANDLE
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment