Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
cpython
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
cpython
Commits
55dcc5b9
Commit
55dcc5b9
authored
Feb 12, 2014
by
Guido van Rossum
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
asyncio: Change as_completed() to use a Queue, to avoid O(N**2) behavior. Fixes issue #20566.
parent
78e7e785
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
55 additions
and
21 deletions
+55
-21
Lib/asyncio/tasks.py
Lib/asyncio/tasks.py
+33
-20
Lib/test/test_asyncio/test_tasks.py
Lib/test/test_asyncio/test_tasks.py
+22
-1
No files found.
Lib/asyncio/tasks.py
View file @
55dcc5b9
...
...
@@ -463,7 +463,11 @@ def _wait(fs, timeout, return_when, loop):
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
def
as_completed
(
fs
,
*
,
loop
=
None
,
timeout
=
None
):
"""Return an iterator whose values, when waited for, are Futures.
"""Return an iterator whose values are coroutines.
When waiting for the yielded coroutines you'll get the results (or
exceptions!) of the original Futures (or coroutines), in the order
in which and as soon as they complete.
This differs from PEP 3148; the proper way to use this is:
...
...
@@ -471,8 +475,8 @@ def as_completed(fs, *, loop=None, timeout=None):
result = yield from f # The 'yield from' may raise.
# Use result.
Raises TimeoutError if the timeout occurs before all Futures ar
e
done.
If a timeout is specified, the 'yield from' will rais
e
TimeoutError when the timeout occurs before all Futures are
done.
Note: The futures 'f' are not necessarily members of fs.
"""
...
...
@@ -481,27 +485,36 @@ def as_completed(fs, *, loop=None, timeout=None):
loop
=
loop
if
loop
is
not
None
else
events
.
get_event_loop
()
deadline
=
None
if
timeout
is
None
else
loop
.
time
()
+
timeout
todo
=
{
async
(
f
,
loop
=
loop
)
for
f
in
set
(
fs
)}
completed
=
collections
.
deque
()
from
.queues
import
Queue
# Import here to avoid circular import problem.
done
=
Queue
(
loop
=
loop
)
timeout_handle
=
None
def
_on_timeout
():
for
f
in
todo
:
f
.
remove_done_callback
(
_on_completion
)
done
.
put_nowait
(
None
)
# Queue a dummy value for _wait_for_one().
todo
.
clear
()
# Can't do todo.remove(f) in the loop.
def
_on_completion
(
f
):
if
not
todo
:
return
# _on_timeout() was here first.
todo
.
remove
(
f
)
done
.
put_nowait
(
f
)
if
not
todo
and
timeout_handle
is
not
None
:
timeout_handle
.
cancel
()
@
coroutine
def
_wait_for_one
():
while
not
completed
:
timeout
=
None
if
deadline
is
not
None
:
timeout
=
deadline
-
loop
.
time
()
if
timeout
<
0
:
raise
futures
.
TimeoutError
()
done
,
pending
=
yield
from
_wait
(
todo
,
timeout
,
FIRST_COMPLETED
,
loop
)
# Multiple callers might be waiting for the same events
# and getting the same outcome. Dedupe by updating todo.
for
f
in
done
:
if
f
in
todo
:
todo
.
remove
(
f
)
completed
.
append
(
f
)
f
=
completed
.
popleft
()
return
f
.
result
()
# May raise.
f
=
yield
from
done
.
get
()
if
f
is
None
:
# Dummy value from _on_timeout().
raise
futures
.
TimeoutError
return
f
.
result
()
# May raise f.exception().
for
f
in
todo
:
f
.
add_done_callback
(
_on_completion
)
if
todo
and
timeout
is
not
None
:
timeout_handle
=
loop
.
call_later
(
timeout
,
_on_timeout
)
for
_
in
range
(
len
(
todo
)):
yield
_wait_for_one
()
...
...
Lib/test/test_asyncio/test_tasks.py
View file @
55dcc5b9
...
...
@@ -779,7 +779,6 @@ class TaskTests(unittest.TestCase):
yield
0
yield
0
yield
0.1
yield
0.02
loop
=
test_utils
.
TestLoop
(
gen
)
self
.
addCleanup
(
loop
.
close
)
...
...
@@ -791,6 +790,8 @@ class TaskTests(unittest.TestCase):
def
foo
():
values
=
[]
for
f
in
asyncio
.
as_completed
([
a
,
b
],
timeout
=
0.12
,
loop
=
loop
):
if
values
:
loop
.
advance_time
(
0.02
)
try
:
v
=
yield
from
f
values
.
append
((
1
,
v
))
...
...
@@ -809,6 +810,26 @@ class TaskTests(unittest.TestCase):
loop
.
advance_time
(
10
)
loop
.
run_until_complete
(
asyncio
.
wait
([
a
,
b
],
loop
=
loop
))
def
test_as_completed_with_unused_timeout
(
self
):
def
gen
():
yield
yield
0
yield
0.01
loop
=
test_utils
.
TestLoop
(
gen
)
self
.
addCleanup
(
loop
.
close
)
a
=
asyncio
.
sleep
(
0.01
,
'a'
,
loop
=
loop
)
@
asyncio
.
coroutine
def
foo
():
for
f
in
asyncio
.
as_completed
([
a
],
timeout
=
1
,
loop
=
loop
):
v
=
yield
from
f
self
.
assertEqual
(
v
,
'a'
)
res
=
loop
.
run_until_complete
(
asyncio
.
Task
(
foo
(),
loop
=
loop
))
def
test_as_completed_reverse_wait
(
self
):
def
gen
():
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment