Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
cpython
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
cpython
Commits
72d527b2
Commit
72d527b2
authored
Jun 09, 2011
by
Éric Araujo
Browse files
Options
Browse Files
Download
Plain Diff
Branch merge
parents
b3c6db7c
97178498
Changes
15
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
15 changed files
with
706 additions
and
114 deletions
+706
-114
Doc/library/concurrent.futures.rst
Doc/library/concurrent.futures.rst
+19
-0
Doc/library/mmap.rst
Doc/library/mmap.rst
+8
-4
Lib/concurrent/futures/process.py
Lib/concurrent/futures/process.py
+74
-31
Lib/multiprocessing/connection.py
Lib/multiprocessing/connection.py
+97
-52
Lib/multiprocessing/forking.py
Lib/multiprocessing/forking.py
+1
-0
Lib/multiprocessing/queues.py
Lib/multiprocessing/queues.py
+3
-3
Lib/ntpath.py
Lib/ntpath.py
+13
-0
Lib/test/test_concurrent_futures.py
Lib/test/test_concurrent_futures.py
+15
-5
Lib/test/test_mmap.py
Lib/test/test_mmap.py
+29
-0
Misc/ACKS
Misc/ACKS
+1
-0
Misc/NEWS
Misc/NEWS
+10
-0
Modules/_multiprocessing/win32_functions.c
Modules/_multiprocessing/win32_functions.c
+374
-16
Modules/arraymodule.c
Modules/arraymodule.c
+1
-1
Modules/mmapmodule.c
Modules/mmapmodule.c
+24
-2
Modules/posixmodule.c
Modules/posixmodule.c
+37
-0
No files found.
Doc/library/concurrent.futures.rst
View file @
72d527b2
...
...
@@ -169,6 +169,12 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
of at most *max_workers* processes. If *max_workers* is ``None`` or not
given, it will default to the number of processors on the machine.
.. versionchanged:: 3.3
When one of the worker processes terminates abruptly, a
:exc:`BrokenProcessPool` error is now raised. Previously, behaviour
was undefined but operations on the executor or its futures would often
freeze or deadlock.
.. _processpoolexecutor-example:
...
...
@@ -369,3 +375,16 @@ Module Functions
:pep:`3148` -- futures - execute computations asynchronously
The proposal which described this feature for inclusion in the Python
standard library.
Exception classes
-----------------
.. exception:: BrokenProcessPool
Derived from :exc:`RuntimeError`, this exception class is raised when
one of the workers of a :class:`ProcessPoolExecutor` has terminated
in a non-clean fashion (for example, if it was killed from the outside).
.. versionadded:: 3.3
Doc/library/mmap.rst
View file @
72d527b2
...
...
@@ -190,12 +190,16 @@ To map anonymous memory, -1 should be passed as the fileno along with the length
move will raise a :exc:`TypeError` exception.
.. method:: read(
num
)
.. method:: read(
[n]
)
Return a :class:`bytes` containing up to *num* bytes starting from the
current file position; the file position is updated to point after the
bytes that were returned.
Return a :class:`bytes` containing up to *n* bytes starting from the
current file position. If the argument is omitted, *None* or negative,
return all bytes from the current file position to the end of the
mapping. The file position is updated to point after the bytes that were
returned.
.. versionchanged:: 3.3
Argument can be omitted or *None*.
.. method:: read_byte()
...
...
Lib/concurrent/futures/process.py
View file @
72d527b2
...
...
@@ -46,10 +46,11 @@ Process #1..n:
__author__
=
'Brian Quinlan (brian@sweetapp.com)'
import
atexit
import
os
from
concurrent.futures
import
_base
import
queue
import
multiprocessing
from
multiprocessing.queues
import
SimpleQueue
from
multiprocessing.queues
import
SimpleQueue
,
SentinelReady
import
threading
import
weakref
...
...
@@ -122,7 +123,7 @@ def _process_worker(call_queue, result_queue):
call_item
=
call_queue
.
get
(
block
=
True
)
if
call_item
is
None
:
# Wake up queue management thread
result_queue
.
put
(
None
)
result_queue
.
put
(
os
.
getpid
()
)
return
try
:
r
=
call_item
.
fn
(
*
call_item
.
args
,
**
call_item
.
kwargs
)
...
...
@@ -194,29 +195,63 @@ def _queue_management_worker(executor_reference,
result_queue: A multiprocessing.Queue of _ResultItems generated by the
process workers.
"""
nb_shutdown_processes
=
0
def
shutdown_one_process
():
"""Tell a worker to terminate, which will in turn wake us again"""
nonlocal
nb_shutdown_processes
call_queue
.
put
(
None
)
nb_shutdown_processes
+=
1
def
shutdown_worker
():
# This is an upper bound
nb_children_alive
=
sum
(
p
.
is_alive
()
for
p
in
processes
.
values
())
for
i
in
range
(
0
,
nb_children_alive
):
call_queue
.
put
(
None
)
# If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS
# X.
for
p
in
processes
.
values
():
p
.
join
()
while
True
:
_add_call_item_to_queue
(
pending_work_items
,
work_ids_queue
,
call_queue
)
result_item
=
result_queue
.
get
()
if
result_item
is
not
None
:
work_item
=
pending_work_items
[
result_item
.
work_id
]
del
pending_work_items
[
result_item
.
work_id
]
if
result_item
.
exception
:
work_item
.
future
.
set_exception
(
result_item
.
exception
)
else
:
work_item
.
future
.
set_result
(
result_item
.
result
)
continue
# If we come here, we either got a timeout or were explicitly woken up.
# In either case, check whether we should start shutting down.
sentinels
=
[
p
.
sentinel
for
p
in
processes
.
values
()]
assert
sentinels
try
:
result_item
=
result_queue
.
get
(
sentinels
=
sentinels
)
except
SentinelReady
as
e
:
# Mark the process pool broken so that submits fail right now.
executor
=
executor_reference
()
if
executor
is
not
None
:
executor
.
_broken
=
True
executor
.
_shutdown_thread
=
True
del
executor
# All futures in flight must be marked failed
for
work_id
,
work_item
in
pending_work_items
.
items
():
work_item
.
future
.
set_exception
(
BrokenProcessPool
(
"A process in the process pool was "
"terminated abruptly while the future was "
"running or pending."
))
pending_work_items
.
clear
()
# Terminate remaining workers forcibly: the queues or their
# locks may be in a dirty state and block forever.
for
p
in
processes
.
values
():
p
.
terminate
()
for
p
in
processes
.
values
():
p
.
join
()
return
if
isinstance
(
result_item
,
int
):
# Clean shutdown of a worker using its PID
# (avoids marking the executor broken)
del
processes
[
result_item
]
elif
result_item
is
not
None
:
work_item
=
pending_work_items
.
pop
(
result_item
.
work_id
,
None
)
# work_item can be None if another process terminated (see above)
if
work_item
is
not
None
:
if
result_item
.
exception
:
work_item
.
future
.
set_exception
(
result_item
.
exception
)
else
:
work_item
.
future
.
set_result
(
result_item
.
result
)
# Check whether we should start shutting down.
executor
=
executor_reference
()
# No more work items can be added if:
# - The interpreter is shutting down OR
...
...
@@ -226,17 +261,11 @@ def _queue_management_worker(executor_reference,
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if
not
pending_work_items
:
while
nb_shutdown_processes
<
len
(
processes
):
shutdown_one_process
()
# If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS
# X.
for
p
in
processes
:
p
.
join
()
shutdown_worker
()
return
else
:
# Start shutting down by telling a process it can exit.
shutdown_one_process
(
)
call_queue
.
put
(
None
)
del
executor
_system_limits_checked
=
False
...
...
@@ -264,6 +293,14 @@ def _check_system_limits():
_system_limited
=
"system provides too few semaphores (%d available, 256 necessary)"
%
nsems_max
raise
NotImplementedError
(
_system_limited
)
class
BrokenProcessPool
(
RuntimeError
):
"""
Raised when a process in a ProcessPoolExecutor terminated abruptly
while a future was in the running state.
"""
class
ProcessPoolExecutor
(
_base
.
Executor
):
def
__init__
(
self
,
max_workers
=
None
):
"""Initializes a new ProcessPoolExecutor instance.
...
...
@@ -288,11 +325,13 @@ class ProcessPoolExecutor(_base.Executor):
self
.
_result_queue
=
SimpleQueue
()
self
.
_work_ids
=
queue
.
Queue
()
self
.
_queue_management_thread
=
None
self
.
_processes
=
set
()
# Map of pids to processes
self
.
_processes
=
{}
# Shutdown is a two-step process.
self
.
_shutdown_thread
=
False
self
.
_shutdown_lock
=
threading
.
Lock
()
self
.
_broken
=
False
self
.
_queue_count
=
0
self
.
_pending_work_items
=
{}
...
...
@@ -302,6 +341,8 @@ class ProcessPoolExecutor(_base.Executor):
def
weakref_cb
(
_
,
q
=
self
.
_result_queue
):
q
.
put
(
None
)
if
self
.
_queue_management_thread
is
None
:
# Start the processes so that their sentinels are known.
self
.
_adjust_process_count
()
self
.
_queue_management_thread
=
threading
.
Thread
(
target
=
_queue_management_worker
,
args
=
(
weakref
.
ref
(
self
,
weakref_cb
),
...
...
@@ -321,10 +362,13 @@ class ProcessPoolExecutor(_base.Executor):
args
=
(
self
.
_call_queue
,
self
.
_result_queue
))
p
.
start
()
self
.
_processes
.
add
(
p
)
self
.
_processes
[
p
.
pid
]
=
p
def
submit
(
self
,
fn
,
*
args
,
**
kwargs
):
with
self
.
_shutdown_lock
:
if
self
.
_broken
:
raise
BrokenProcessPool
(
'A child process terminated '
'abruptly, the process pool is not usable anymore'
)
if
self
.
_shutdown_thread
:
raise
RuntimeError
(
'cannot schedule new futures after shutdown'
)
...
...
@@ -338,7 +382,6 @@ class ProcessPoolExecutor(_base.Executor):
self
.
_result_queue
.
put
(
None
)
self
.
_start_queue_management_thread
()
self
.
_adjust_process_count
()
return
f
submit
.
__doc__
=
_base
.
Executor
.
submit
.
__doc__
...
...
Lib/multiprocessing/connection.py
View file @
72d527b2
...
...
@@ -48,14 +48,18 @@ import itertools
import
_multiprocessing
from
multiprocessing
import
current_process
,
AuthenticationError
,
BufferTooShort
from
multiprocessing.util
import
get_temp_dir
,
Finalize
,
sub_debug
,
debug
from
multiprocessing.util
import
(
get_temp_dir
,
Finalize
,
sub_debug
,
debug
,
_eintr_retry
)
try
:
from
_multiprocessing
import
win32
from
_subprocess
import
WAIT_OBJECT_0
,
WAIT_TIMEOUT
,
INFINITE
except
ImportError
:
if
sys
.
platform
==
'win32'
:
raise
win32
=
None
_select
=
_eintr_retry
(
select
.
select
)
#
#
#
...
...
@@ -118,6 +122,15 @@ def address_type(address):
else
:
raise
ValueError
(
'address type of %r unrecognized'
%
address
)
class
SentinelReady
(
Exception
):
"""
Raised when a sentinel is ready when polling.
"""
def
__init__
(
self
,
*
args
):
Exception
.
__init__
(
self
,
*
args
)
self
.
sentinels
=
args
[
0
]
#
# Connection classes
#
...
...
@@ -253,19 +266,17 @@ class _ConnectionBase:
(
offset
+
size
)
//
itemsize
])
return
size
def
recv
(
self
):
def
recv
(
self
,
sentinels
=
None
):
"""Receive a (picklable) object"""
self
.
_check_closed
()
self
.
_check_readable
()
buf
=
self
.
_recv_bytes
()
buf
=
self
.
_recv_bytes
(
sentinels
=
sentinels
)
return
pickle
.
loads
(
buf
.
getbuffer
())
def
poll
(
self
,
timeout
=
0.0
):
"""Whether there is any input available to be read"""
self
.
_check_closed
()
self
.
_check_readable
()
if
timeout
<
0.0
:
timeout
=
None
return
self
.
_poll
(
timeout
)
...
...
@@ -274,61 +285,88 @@ if win32:
class
PipeConnection
(
_ConnectionBase
):
"""
Connection class based on a Windows named pipe.
Overlapped I/O is used, so the handles must have been created
with FILE_FLAG_OVERLAPPED.
"""
_buffered
=
b''
def
_close
(
self
):
win32
.
CloseHandle
(
self
.
_handle
)
def
_send_bytes
(
self
,
buf
):
nwritten
=
win32
.
WriteFile
(
self
.
_handle
,
buf
)
overlapped
=
win32
.
WriteFile
(
self
.
_handle
,
buf
,
overlapped
=
True
)
nwritten
,
complete
=
overlapped
.
GetOverlappedResult
(
True
)
assert
complete
assert
nwritten
==
len
(
buf
)
def
_recv_bytes
(
self
,
maxsize
=
None
):
def
_recv_bytes
(
self
,
maxsize
=
None
,
sentinels
=
()):
if
sentinels
:
self
.
_poll
(
-
1.0
,
sentinels
)
buf
=
io
.
BytesIO
()
bufsize
=
512
if
maxsize
is
not
None
:
bufsize
=
min
(
bufsize
,
maxsize
)
try
:
firstchunk
,
complete
=
win32
.
ReadFile
(
self
.
_handle
,
bufsize
)
except
IOError
as
e
:
if
e
.
errno
==
win32
.
ERROR_BROKEN_PIPE
:
raise
EOFError
raise
lenfirstchunk
=
len
(
firstchunk
)
buf
.
write
(
firstchunk
)
if
complete
:
return
buf
firstchunk
=
self
.
_buffered
if
firstchunk
:
lenfirstchunk
=
len
(
firstchunk
)
buf
.
write
(
firstchunk
)
self
.
_buffered
=
b''
else
:
# A reasonable size for the first chunk transfer
bufsize
=
128
if
maxsize
is
not
None
and
maxsize
<
bufsize
:
bufsize
=
maxsize
try
:
overlapped
=
win32
.
ReadFile
(
self
.
_handle
,
bufsize
,
overlapped
=
True
)
lenfirstchunk
,
complete
=
overlapped
.
GetOverlappedResult
(
True
)
firstchunk
=
overlapped
.
getbuffer
()
assert
lenfirstchunk
==
len
(
firstchunk
)
except
IOError
as
e
:
if
e
.
errno
==
win32
.
ERROR_BROKEN_PIPE
:
raise
EOFError
raise
buf
.
write
(
firstchunk
)
if
complete
:
return
buf
navail
,
nleft
=
win32
.
PeekNamedPipe
(
self
.
_handle
)
if
maxsize
is
not
None
and
lenfirstchunk
+
nleft
>
maxsize
:
return
None
lastchunk
,
complete
=
win32
.
ReadFile
(
self
.
_handle
,
nleft
)
assert
complete
buf
.
write
(
lastchunk
)
if
nleft
>
0
:
overlapped
=
win32
.
ReadFile
(
self
.
_handle
,
nleft
,
overlapped
=
True
)
res
,
complete
=
overlapped
.
GetOverlappedResult
(
True
)
assert
res
==
nleft
assert
complete
buf
.
write
(
overlapped
.
getbuffer
())
return
buf
def
_poll
(
self
,
timeout
):
def
_poll
(
self
,
timeout
,
sentinels
=
()):
# Fast non-blocking path
navail
,
nleft
=
win32
.
PeekNamedPipe
(
self
.
_handle
)
if
navail
>
0
:
return
True
elif
timeout
==
0.0
:
return
False
# Setup a polling loop (translated straight from old
# pipe_connection.c)
# Blocking: use overlapped I/O
if
timeout
<
0.0
:
deadline
=
None
timeout
=
INFINITE
else
:
deadline
=
time
.
time
()
+
timeout
delay
=
0.001
max_delay
=
0.02
while
True
:
time
.
sleep
(
delay
)
navail
,
nleft
=
win32
.
PeekNamedPipe
(
self
.
_handle
)
if
navail
>
0
:
return
True
if
deadline
and
time
.
time
()
>
deadline
:
return
False
if
delay
<
max_delay
:
delay
+=
0.001
timeout
=
int
(
timeout
*
1000
+
0.5
)
overlapped
=
win32
.
ReadFile
(
self
.
_handle
,
1
,
overlapped
=
True
)
try
:
handles
=
[
overlapped
.
event
]
handles
+=
sentinels
res
=
win32
.
WaitForMultipleObjects
(
handles
,
False
,
timeout
)
finally
:
# Always cancel overlapped I/O in the same thread
# (because CancelIoEx() appears only in Vista)
overlapped
.
cancel
()
if
res
==
WAIT_TIMEOUT
:
return
False
idx
=
res
-
WAIT_OBJECT_0
if
idx
==
0
:
# I/O was successful, store received data
overlapped
.
GetOverlappedResult
(
True
)
self
.
_buffered
+=
overlapped
.
getbuffer
()
return
True
assert
0
<
idx
<
len
(
handles
)
raise
SentinelReady
([
handles
[
idx
]])
class
Connection
(
_ConnectionBase
):
...
...
@@ -357,11 +395,18 @@ class Connection(_ConnectionBase):
break
buf
=
buf
[
n
:]
def
_recv
(
self
,
size
,
read
=
_read
):
def
_recv
(
self
,
size
,
sentinels
=
(),
read
=
_read
):
buf
=
io
.
BytesIO
()
handle
=
self
.
_handle
if
sentinels
:
handles
=
[
handle
]
+
sentinels
remaining
=
size
while
remaining
>
0
:
chunk
=
read
(
self
.
_handle
,
remaining
)
if
sentinels
:
r
=
_select
(
handles
,
[],
[])[
0
]
if
handle
not
in
r
:
raise
SentinelReady
(
r
)
chunk
=
read
(
handle
,
remaining
)
n
=
len
(
chunk
)
if
n
==
0
:
if
remaining
==
size
:
...
...
@@ -381,15 +426,17 @@ class Connection(_ConnectionBase):
if
n
>
0
:
self
.
_send
(
buf
)
def
_recv_bytes
(
self
,
maxsize
=
None
):
buf
=
self
.
_recv
(
4
)
def
_recv_bytes
(
self
,
maxsize
=
None
,
sentinels
=
()
):
buf
=
self
.
_recv
(
4
,
sentinels
)
size
,
=
struct
.
unpack
(
"=i"
,
buf
.
getvalue
())
if
maxsize
is
not
None
and
size
>
maxsize
:
return
None
return
self
.
_recv
(
size
)
return
self
.
_recv
(
size
,
sentinels
)
def
_poll
(
self
,
timeout
):
r
=
select
.
select
([
self
.
_handle
],
[],
[],
timeout
)[
0
]
if
timeout
<
0.0
:
timeout
=
None
r
=
_select
([
self
.
_handle
],
[],
[],
timeout
)[
0
]
return
bool
(
r
)
...
...
@@ -495,23 +542,21 @@ else:
obsize
,
ibsize
=
0
,
BUFSIZE
h1
=
win32
.
CreateNamedPipe
(
address
,
openmode
,
address
,
openmode
|
win32
.
FILE_FLAG_OVERLAPPED
,
win32
.
PIPE_TYPE_MESSAGE
|
win32
.
PIPE_READMODE_MESSAGE
|
win32
.
PIPE_WAIT
,
1
,
obsize
,
ibsize
,
win32
.
NMPWAIT_WAIT_FOREVER
,
win32
.
NULL
)
h2
=
win32
.
CreateFile
(
address
,
access
,
0
,
win32
.
NULL
,
win32
.
OPEN_EXISTING
,
0
,
win32
.
NULL
address
,
access
,
0
,
win32
.
NULL
,
win32
.
OPEN_EXISTING
,
win32
.
FILE_FLAG_OVERLAPPED
,
win32
.
NULL
)
win32
.
SetNamedPipeHandleState
(
h2
,
win32
.
PIPE_READMODE_MESSAGE
,
None
,
None
)
try
:
win32
.
ConnectNamedPipe
(
h1
,
win32
.
NULL
)
except
WindowsError
as
e
:
if
e
.
args
[
0
]
!=
win32
.
ERROR_PIPE_CONNECTED
:
raise
overlapped
=
win32
.
ConnectNamedPipe
(
h1
,
overlapped
=
True
)
overlapped
.
GetOverlappedResult
(
True
)
c1
=
PipeConnection
(
h1
,
writable
=
duplex
)
c2
=
PipeConnection
(
h2
,
readable
=
duplex
)
...
...
Lib/multiprocessing/forking.py
View file @
72d527b2
...
...
@@ -35,6 +35,7 @@
import
os
import
sys
import
signal
import
select
from
multiprocessing
import
util
,
process
...
...
Lib/multiprocessing/queues.py
View file @
72d527b2
...
...
@@ -44,7 +44,7 @@ import weakref
from
queue
import
Empty
,
Full
import
_multiprocessing
from
multiprocessing
import
Pipe
from
multiprocessing
.connection
import
Pipe
,
SentinelReady
from
multiprocessing.synchronize
import
Lock
,
BoundedSemaphore
,
Semaphore
,
Condition
from
multiprocessing.util
import
debug
,
info
,
Finalize
,
register_after_fork
from
multiprocessing.forking
import
assert_spawning
...
...
@@ -372,10 +372,10 @@ class SimpleQueue(object):
def
_make_methods
(
self
):
recv
=
self
.
_reader
.
recv
racquire
,
rrelease
=
self
.
_rlock
.
acquire
,
self
.
_rlock
.
release
def
get
():
def
get
(
*
,
sentinels
=
None
):
racquire
()
try
:
return
recv
()
return
recv
(
sentinels
)
finally
:
rrelease
()
self
.
get
=
get
...
...
Lib/ntpath.py
View file @
72d527b2
...
...
@@ -672,3 +672,16 @@ except ImportError:
def
sameopenfile
(
f1
,
f2
):
"""Test whether two file objects reference the same file"""
return
_getfileinformation
(
f1
)
==
_getfileinformation
(
f2
)
try
:
# The genericpath.isdir implementation uses os.stat and checks the mode
# attribute to tell whether or not the path is a directory.
# This is overkill on Windows - just pass the path to GetFileAttributes
# and check the attribute from there.
from
nt
import
_isdir
except
ImportError
:
from
genericpath
import
isdir
as
_isdir
def
isdir
(
path
):
return
_isdir
(
path
)
Lib/test/test_concurrent_futures.py
View file @
72d527b2
...
...
@@ -19,7 +19,7 @@ import unittest
from
concurrent
import
futures
from
concurrent.futures._base
import
(
PENDING
,
RUNNING
,
CANCELLED
,
CANCELLED_AND_NOTIFIED
,
FINISHED
,
Future
)
import
concurrent.futures.process
from
concurrent.futures.process
import
BrokenProcessPool
def
create_future
(
state
=
PENDING
,
exception
=
None
,
result
=
None
):
...
...
@@ -154,7 +154,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
processes
=
self
.
executor
.
_processes
self
.
executor
.
shutdown
()
for
p
in
processes
:
for
p
in
processes
.
values
()
:
p
.
join
()
def
test_context_manager_shutdown
(
self
):
...
...
@@ -163,7 +163,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
self
.
assertEqual
(
list
(
e
.
map
(
abs
,
range
(
-
5
,
5
))),
[
5
,
4
,
3
,
2
,
1
,
0
,
1
,
2
,
3
,
4
])
for
p
in
processes
:
for
p
in
processes
.
values
()
:
p
.
join
()
def
test_del_shutdown
(
self
):
...
...
@@ -174,7 +174,7 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
del
executor
queue_management_thread
.
join
()
for
p
in
processes
:
for
p
in
processes
.
values
()
:
p
.
join
()
class
WaitTests
(
unittest
.
TestCase
):
...
...
@@ -381,7 +381,17 @@ class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
class
ProcessPoolExecutorTest
(
ProcessPoolMixin
,
ExecutorTest
):
pass
def
test_killed_child
(
self
):
# When a child process is abruptly terminated, the whole pool gets
# "broken".
futures
=
[
self
.
executor
.
submit
(
time
.
sleep
,
3
)]
# Get one of the processes, and terminate (kill) it
p
=
next
(
iter
(
self
.
executor
.
_processes
.
values
()))
p
.
terminate
()
for
fut
in
futures
:
self
.
assertRaises
(
BrokenProcessPool
,
fut
.
result
)
# Submitting other jobs fails as well.
self
.
assertRaises
(
BrokenProcessPool
,
self
.
executor
.
submit
,
pow
,
2
,
8
)
class
FutureTests
(
unittest
.
TestCase
):
...
...
Lib/test/test_mmap.py
View file @
72d527b2
...
...
@@ -417,6 +417,35 @@ class MmapTests(unittest.TestCase):
m
[
x
]
=
b
self
.
assertEqual
(
m
[
x
],
b
)
def
test_read_all
(
self
):
m
=
mmap
.
mmap
(
-
1
,
16
)
self
.
addCleanup
(
m
.
close
)
# With no parameters, or None or a negative argument, reads all
m
.
write
(
bytes
(
range
(
16
)))
m
.
seek
(
0
)
self
.
assertEqual
(
m
.
read
(),
bytes
(
range
(
16
)))
m
.
seek
(
8
)
self
.
assertEqual
(
m
.
read
(),
bytes
(
range
(
8
,
16
)))
m
.
seek
(
16
)
self
.
assertEqual
(
m
.
read
(),
b''
)
m
.
seek
(
3
)
self
.
assertEqual
(
m
.
read
(
None
),
bytes
(
range
(
3
,
16
)))
m
.
seek
(
4
)
self
.
assertEqual
(
m
.
read
(
-
1
),
bytes
(
range
(
4
,
16
)))
m
.
seek
(
5
)
self
.
assertEqual
(
m
.
read
(
-
2
),
bytes
(
range
(
5
,
16
)))
m
.
seek
(
9
)
self
.
assertEqual
(
m
.
read
(
-
42
),
bytes
(
range
(
9
,
16
)))
def
test_read_invalid_arg
(
self
):
m
=
mmap
.
mmap
(
-
1
,
16
)
self
.
addCleanup
(
m
.
close
)
self
.
assertRaises
(
TypeError
,
m
.
read
,
'foo'
)
self
.
assertRaises
(
TypeError
,
m
.
read
,
5.5
)
self
.
assertRaises
(
TypeError
,
m
.
read
,
[
1
,
2
,
3
])
def
test_extended_getslice
(
self
):
# Test extended slicing by comparing with list slicing.
s
=
bytes
(
reversed
(
range
(
256
)))
...
...
Misc/ACKS
View file @
72d527b2
...
...
@@ -548,6 +548,7 @@ Vincent Legoll
Kip Lehman
Joerg Lehmann
Robert Lehmann
Petri Lehtinen
Luke Kenneth Casson Leighton
Marc-Andre Lemburg
John Lenton
...
...
Misc/NEWS
View file @
72d527b2
...
...
@@ -187,6 +187,16 @@ Core and Builtins
Library
-------
- Issue #11583: Speed up os.path.isdir on Windows by using GetFileAttributes
instead of os.stat.
- Issue #12021: Make mmap'
s
read
()
method
argument
optional
.
Patch
by
Petri
Lehtinen
.
-
Issue
#
9205
:
concurrent
.
futures
.
ProcessPoolExecutor
now
detects
killed
children
and
raises
BrokenProcessPool
in
such
a
situation
.
Previously
it
would
reliably
freeze
/
deadlock
.
-
Issue
#
12040
:
Expose
a
new
attribute
``
sentinel
``
on
instances
of
:
class
:`
multiprocessing
.
Process
`.
Also
,
fix
Process
.
join
()
to
not
use
polling
anymore
,
when
given
a
timeout
.
...
...
Modules/_multiprocessing/win32_functions.c
View file @
72d527b2
This diff is collapsed.
Click to expand it.
Modules/arraymodule.c
View file @
72d527b2
...
...
@@ -2091,7 +2091,7 @@ array_repr(arrayobject *a)
if
(
len
==
0
)
{
return
PyUnicode_FromFormat
(
"array('%c')"
,
(
int
)
typecode
);
}
if
(
'u'
==
typecode
)
if
(
typecode
==
'u'
)
v
=
array_tounicode
(
a
,
NULL
);
else
v
=
array_tolist
(
a
,
NULL
);
...
...
Modules/mmapmodule.c
View file @
72d527b2
...
...
@@ -240,15 +240,37 @@ mmap_read_line_method(mmap_object *self,
return
result
;
}
/* Basically the "n" format code with the ability to turn None into -1. */
static
int
mmap_convert_ssize_t
(
PyObject
*
obj
,
void
*
result
)
{
Py_ssize_t
limit
;
if
(
obj
==
Py_None
)
{
limit
=
-
1
;
}
else
if
(
PyNumber_Check
(
obj
))
{
limit
=
PyNumber_AsSsize_t
(
obj
,
PyExc_OverflowError
);
if
(
limit
==
-
1
&&
PyErr_Occurred
())
return
0
;
}
else
{
PyErr_Format
(
PyExc_TypeError
,
"integer argument expected, got '%.200s'"
,
Py_TYPE
(
obj
)
->
tp_name
);
return
0
;
}
*
((
Py_ssize_t
*
)
result
)
=
limit
;
return
1
;
}
static
PyObject
*
mmap_read_method
(
mmap_object
*
self
,
PyObject
*
args
)
{
Py_ssize_t
num_bytes
,
n
;
Py_ssize_t
num_bytes
=
-
1
,
n
;
PyObject
*
result
;
CHECK_VALID
(
NULL
);
if
(
!
PyArg_ParseTuple
(
args
,
"
n:read"
,
&
num_bytes
))
if
(
!
PyArg_ParseTuple
(
args
,
"
|O&:read"
,
mmap_convert_ssize_t
,
&
num_bytes
))
return
(
NULL
);
/* silently 'adjust' out-of-range requests */
...
...
Modules/posixmodule.c
View file @
72d527b2
...
...
@@ -2960,6 +2960,42 @@ posix__getfileinformation(PyObject *self, PyObject *args)
info
.
nFileIndexHigh
,
info
.
nFileIndexLow
);
}
static
PyObject
*
posix__isdir
(
PyObject
*
self
,
PyObject
*
args
)
{
PyObject
*
opath
;
char
*
path
;
PyUnicodeObject
*
po
;
DWORD
attributes
;
if
(
PyArg_ParseTuple
(
args
,
"U|:_isdir"
,
&
po
))
{
Py_UNICODE
*
wpath
=
PyUnicode_AS_UNICODE
(
po
);
attributes
=
GetFileAttributesW
(
wpath
);
if
(
attributes
==
INVALID_FILE_ATTRIBUTES
)
Py_RETURN_FALSE
;
goto
check
;
}
/* Drop the argument parsing error as narrow strings
are also valid. */
PyErr_Clear
();
if
(
!
PyArg_ParseTuple
(
args
,
"O&:_isdir"
,
PyUnicode_FSConverter
,
&
opath
))
return
NULL
;
path
=
PyBytes_AsString
(
opath
);
attributes
=
GetFileAttributesA
(
path
);
if
(
attributes
==
INVALID_FILE_ATTRIBUTES
)
Py_RETURN_FALSE
;
check:
if
(
attributes
&
FILE_ATTRIBUTE_DIRECTORY
)
Py_RETURN_TRUE
;
else
Py_RETURN_FALSE
;
}
#endif
/* MS_WINDOWS */
PyDoc_STRVAR
(
posix_mkdir__doc__
,
...
...
@@ -9561,6 +9597,7 @@ static PyMethodDef posix_methods[] = {
{
"_getfullpathname"
,
posix__getfullpathname
,
METH_VARARGS
,
NULL
},
{
"_getfinalpathname"
,
posix__getfinalpathname
,
METH_VARARGS
,
NULL
},
{
"_getfileinformation"
,
posix__getfileinformation
,
METH_VARARGS
,
NULL
},
{
"_isdir"
,
posix__isdir
,
METH_VARARGS
,
NULL
},
#endif
#ifdef HAVE_GETLOADAVG
{
"getloadavg"
,
posix_getloadavg
,
METH_NOARGS
,
posix_getloadavg__doc__
},
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment