Commit 0f9f7497 authored by Victor Stinner's avatar Victor Stinner

Merge 3.4 (asyncio)

parents 42cc0b71 956de691
...@@ -22,6 +22,7 @@ import logging ...@@ -22,6 +22,7 @@ import logging
import os import os
import socket import socket
import subprocess import subprocess
import threading
import time import time
import traceback import traceback
import sys import sys
...@@ -168,7 +169,9 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -168,7 +169,9 @@ class BaseEventLoop(events.AbstractEventLoop):
self._scheduled = [] self._scheduled = []
self._default_executor = None self._default_executor = None
self._internal_fds = 0 self._internal_fds = 0
self._running = False # Identifier of the thread running the event loop, or None if the
# event loop is not running
self._owner = None
self._clock_resolution = time.get_clock_info('monotonic').resolution self._clock_resolution = time.get_clock_info('monotonic').resolution
self._exception_handler = None self._exception_handler = None
self._debug = (not sys.flags.ignore_environment self._debug = (not sys.flags.ignore_environment
...@@ -246,9 +249,9 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -246,9 +249,9 @@ class BaseEventLoop(events.AbstractEventLoop):
def run_forever(self): def run_forever(self):
"""Run until stop() is called.""" """Run until stop() is called."""
self._check_closed() self._check_closed()
if self._running: if self.is_running():
raise RuntimeError('Event loop is running.') raise RuntimeError('Event loop is running.')
self._running = True self._owner = threading.get_ident()
try: try:
while True: while True:
try: try:
...@@ -256,7 +259,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -256,7 +259,7 @@ class BaseEventLoop(events.AbstractEventLoop):
except _StopError: except _StopError:
break break
finally: finally:
self._running = False self._owner = None
def run_until_complete(self, future): def run_until_complete(self, future):
"""Run until the Future is done. """Run until the Future is done.
...@@ -311,7 +314,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -311,7 +314,7 @@ class BaseEventLoop(events.AbstractEventLoop):
The event loop must not be running. The event loop must not be running.
""" """
if self._running: if self.is_running():
raise RuntimeError("Cannot close a running event loop") raise RuntimeError("Cannot close a running event loop")
if self._closed: if self._closed:
return return
...@@ -331,7 +334,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -331,7 +334,7 @@ class BaseEventLoop(events.AbstractEventLoop):
def is_running(self): def is_running(self):
"""Returns True if the event loop is running.""" """Returns True if the event loop is running."""
return self._running return (self._owner is not None)
def time(self): def time(self):
"""Return the time according to the event loop's clock. """Return the time according to the event loop's clock.
...@@ -373,7 +376,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -373,7 +376,7 @@ class BaseEventLoop(events.AbstractEventLoop):
raise TypeError("coroutines cannot be used with call_at()") raise TypeError("coroutines cannot be used with call_at()")
self._check_closed() self._check_closed()
if self._debug: if self._debug:
self._assert_is_current_event_loop() self._check_thread()
timer = events.TimerHandle(when, callback, args, self) timer = events.TimerHandle(when, callback, args, self)
if timer._source_traceback: if timer._source_traceback:
del timer._source_traceback[-1] del timer._source_traceback[-1]
...@@ -391,17 +394,17 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -391,17 +394,17 @@ class BaseEventLoop(events.AbstractEventLoop):
Any positional arguments after the callback will be passed to Any positional arguments after the callback will be passed to
the callback when it is called. the callback when it is called.
""" """
handle = self._call_soon(callback, args, check_loop=True) if self._debug:
self._check_thread()
handle = self._call_soon(callback, args)
if handle._source_traceback: if handle._source_traceback:
del handle._source_traceback[-1] del handle._source_traceback[-1]
return handle return handle
def _call_soon(self, callback, args, check_loop): def _call_soon(self, callback, args):
if (coroutines.iscoroutine(callback) if (coroutines.iscoroutine(callback)
or coroutines.iscoroutinefunction(callback)): or coroutines.iscoroutinefunction(callback)):
raise TypeError("coroutines cannot be used with call_soon()") raise TypeError("coroutines cannot be used with call_soon()")
if self._debug and check_loop:
self._assert_is_current_event_loop()
self._check_closed() self._check_closed()
handle = events.Handle(callback, args, self) handle = events.Handle(callback, args, self)
if handle._source_traceback: if handle._source_traceback:
...@@ -409,8 +412,8 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -409,8 +412,8 @@ class BaseEventLoop(events.AbstractEventLoop):
self._ready.append(handle) self._ready.append(handle)
return handle return handle
def _assert_is_current_event_loop(self): def _check_thread(self):
"""Asserts that this event loop is the current event loop. """Check that the current thread is the thread running the event loop.
Non-thread-safe methods of this class make this assumption and will Non-thread-safe methods of this class make this assumption and will
likely behave incorrectly when the assumption is violated. likely behave incorrectly when the assumption is violated.
...@@ -418,18 +421,17 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -418,18 +421,17 @@ class BaseEventLoop(events.AbstractEventLoop):
Should only be called when (self._debug == True). The caller is Should only be called when (self._debug == True). The caller is
responsible for checking this condition for performance reasons. responsible for checking this condition for performance reasons.
""" """
try: if self._owner is None:
current = events.get_event_loop()
except RuntimeError:
return return
if current is not self: thread_id = threading.get_ident()
if thread_id != self._owner:
raise RuntimeError( raise RuntimeError(
"Non-thread-safe operation invoked on an event loop other " "Non-thread-safe operation invoked on an event loop other "
"than the current one") "than the current one")
def call_soon_threadsafe(self, callback, *args): def call_soon_threadsafe(self, callback, *args):
"""Like call_soon(), but thread-safe.""" """Like call_soon(), but thread-safe."""
handle = self._call_soon(callback, args, check_loop=False) handle = self._call_soon(callback, args)
if handle._source_traceback: if handle._source_traceback:
del handle._source_traceback[-1] del handle._source_traceback[-1]
self._write_to_self() self._write_to_self()
......
...@@ -383,7 +383,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): ...@@ -383,7 +383,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
sock, protocol, waiter, extra) sock, protocol, waiter, extra)
def close(self): def close(self):
if self._running: if self.is_running():
raise RuntimeError("Cannot close a running event loop") raise RuntimeError("Cannot close a running event loop")
if self.is_closed(): if self.is_closed():
return return
...@@ -432,9 +432,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): ...@@ -432,9 +432,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
self._ssock.setblocking(False) self._ssock.setblocking(False)
self._csock.setblocking(False) self._csock.setblocking(False)
self._internal_fds += 1 self._internal_fds += 1
# don't check the current loop because _make_self_pipe() is called self.call_soon(self._loop_self_reading)
# from the event loop constructor
self._call_soon(self._loop_self_reading, (), check_loop=False)
def _loop_self_reading(self, f=None): def _loop_self_reading(self, f=None):
try: try:
......
...@@ -68,7 +68,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): ...@@ -68,7 +68,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
address, waiter, extra) address, waiter, extra)
def close(self): def close(self):
if self._running: if self.is_running():
raise RuntimeError("Cannot close a running event loop") raise RuntimeError("Cannot close a running event loop")
if self.is_closed(): if self.is_closed():
return return
......
...@@ -5,6 +5,7 @@ import logging ...@@ -5,6 +5,7 @@ import logging
import math import math
import socket import socket
import sys import sys
import threading
import time import time
import unittest import unittest
from unittest import mock from unittest import mock
...@@ -14,8 +15,8 @@ from asyncio import base_events ...@@ -14,8 +15,8 @@ from asyncio import base_events
from asyncio import constants from asyncio import constants
from asyncio import test_utils from asyncio import test_utils
try: try:
from test.script_helper import assert_python_ok
from test import support from test import support
from test.script_helper import assert_python_ok
except ImportError: except ImportError:
from asyncio import test_support as support from asyncio import test_support as support
from asyncio.test_support import assert_python_ok from asyncio.test_support import assert_python_ok
...@@ -148,28 +149,71 @@ class BaseEventLoopTests(test_utils.TestCase): ...@@ -148,28 +149,71 @@ class BaseEventLoopTests(test_utils.TestCase):
# are really slow # are really slow
self.assertLessEqual(dt, 0.9, dt) self.assertLessEqual(dt, 0.9, dt)
def test_assert_is_current_event_loop(self): def check_thread(self, loop, debug):
def cb(): def cb():
pass pass
other_loop = base_events.BaseEventLoop() loop.set_debug(debug)
other_loop._selector = mock.Mock() if debug:
asyncio.set_event_loop(other_loop) msg = ("Non-thread-safe operation invoked on an event loop other "
"than the current one")
with self.assertRaisesRegex(RuntimeError, msg):
loop.call_soon(cb)
with self.assertRaisesRegex(RuntimeError, msg):
loop.call_later(60, cb)
with self.assertRaisesRegex(RuntimeError, msg):
loop.call_at(loop.time() + 60, cb)
else:
loop.call_soon(cb)
loop.call_later(60, cb)
loop.call_at(loop.time() + 60, cb)
def test_check_thread(self):
def check_in_thread(loop, event, debug, create_loop, fut):
# wait until the event loop is running
event.wait()
try:
if create_loop:
loop2 = base_events.BaseEventLoop()
try:
asyncio.set_event_loop(loop2)
self.check_thread(loop, debug)
finally:
asyncio.set_event_loop(None)
loop2.close()
else:
self.check_thread(loop, debug)
except Exception as exc:
loop.call_soon_threadsafe(fut.set_exception, exc)
else:
loop.call_soon_threadsafe(fut.set_result, None)
def test_thread(loop, debug, create_loop=False):
event = threading.Event()
fut = asyncio.Future(loop=loop)
loop.call_soon(event.set)
args = (loop, event, debug, create_loop, fut)
thread = threading.Thread(target=check_in_thread, args=args)
thread.start()
loop.run_until_complete(fut)
thread.join()
# raise RuntimeError if the event loop is different in debug mode self.loop._process_events = mock.Mock()
self.loop.set_debug(True) self.loop._write_to_self = mock.Mock()
with self.assertRaises(RuntimeError):
self.loop.call_soon(cb) # raise RuntimeError if the thread has no event loop
with self.assertRaises(RuntimeError): test_thread(self.loop, True)
self.loop.call_later(60, cb)
with self.assertRaises(RuntimeError):
self.loop.call_at(self.loop.time() + 60, cb)
# check disabled if debug mode is disabled # check disabled if debug mode is disabled
self.loop.set_debug(False) test_thread(self.loop, False)
self.loop.call_soon(cb)
self.loop.call_later(60, cb) # raise RuntimeError if the event loop of the thread is not the called
self.loop.call_at(self.loop.time() + 60, cb) # event loop
test_thread(self.loop, True, create_loop=True)
# check disabled if debug mode is disabled
test_thread(self.loop, False, create_loop=True)
def test_run_once_in_executor_handle(self): def test_run_once_in_executor_handle(self):
def cb(): def cb():
......
...@@ -27,7 +27,7 @@ from asyncio import proactor_events ...@@ -27,7 +27,7 @@ from asyncio import proactor_events
from asyncio import selector_events from asyncio import selector_events
from asyncio import test_utils from asyncio import test_utils
try: try:
from test import support # find_unused_port, IPV6_ENABLED, TEST_HOME_DIR from test import support
except ImportError: except ImportError:
from asyncio import test_support as support from asyncio import test_support as support
......
...@@ -10,7 +10,7 @@ from unittest import mock ...@@ -10,7 +10,7 @@ from unittest import mock
import asyncio import asyncio
from asyncio import test_utils from asyncio import test_utils
try: try:
from test import support # gc_collect from test import support
except ImportError: except ImportError:
from asyncio import test_support as support from asyncio import test_support as support
......
...@@ -440,17 +440,16 @@ class BaseProactorEventLoopTests(test_utils.TestCase): ...@@ -440,17 +440,16 @@ class BaseProactorEventLoopTests(test_utils.TestCase):
self.loop = EventLoop(self.proactor) self.loop = EventLoop(self.proactor)
self.set_event_loop(self.loop, cleanup=False) self.set_event_loop(self.loop, cleanup=False)
@mock.patch.object(BaseProactorEventLoop, '_call_soon') @mock.patch.object(BaseProactorEventLoop, 'call_soon')
@mock.patch.object(BaseProactorEventLoop, '_socketpair') @mock.patch.object(BaseProactorEventLoop, '_socketpair')
def test_ctor(self, socketpair, _call_soon): def test_ctor(self, socketpair, call_soon):
ssock, csock = socketpair.return_value = ( ssock, csock = socketpair.return_value = (
mock.Mock(), mock.Mock()) mock.Mock(), mock.Mock())
loop = BaseProactorEventLoop(self.proactor) loop = BaseProactorEventLoop(self.proactor)
self.assertIs(loop._ssock, ssock) self.assertIs(loop._ssock, ssock)
self.assertIs(loop._csock, csock) self.assertIs(loop._csock, csock)
self.assertEqual(loop._internal_fds, 1) self.assertEqual(loop._internal_fds, 1)
_call_soon.assert_called_with(loop._loop_self_reading, (), call_soon.assert_called_with(loop._loop_self_reading)
check_loop=False)
def test_close_self_pipe(self): def test_close_self_pipe(self):
self.loop._close_self_pipe() self.loop._close_self_pipe()
......
"""Tests for selector_events.py""" """Tests for selector_events.py"""
import errno import errno
import gc
import pprint
import socket import socket
import sys
import unittest import unittest
from unittest import mock from unittest import mock
try: try:
......
...@@ -6,12 +6,12 @@ from unittest import mock ...@@ -6,12 +6,12 @@ from unittest import mock
import asyncio import asyncio
from asyncio import subprocess from asyncio import subprocess
from asyncio import test_utils from asyncio import test_utils
if sys.platform != 'win32':
from asyncio import unix_events
try: try:
from test import support # PIPE_MAX_SIZE from test import support
except ImportError: except ImportError:
from asyncio import test_support as support from asyncio import test_support as support
if sys.platform != 'win32':
from asyncio import unix_events
# Program blocking # Program blocking
PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)'] PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
...@@ -233,19 +233,12 @@ if sys.platform != 'win32': ...@@ -233,19 +233,12 @@ if sys.platform != 'win32':
def setUp(self): def setUp(self):
policy = asyncio.get_event_loop_policy() policy = asyncio.get_event_loop_policy()
self.loop = policy.new_event_loop() self.loop = policy.new_event_loop()
self.set_event_loop(self.loop)
# ensure that the event loop is passed explicitly in asyncio
policy.set_event_loop(None)
watcher = self.Watcher() watcher = self.Watcher()
watcher.attach_loop(self.loop) watcher.attach_loop(self.loop)
policy.set_child_watcher(watcher) policy.set_child_watcher(watcher)
self.addCleanup(policy.set_child_watcher, None)
def tearDown(self):
policy = asyncio.get_event_loop_policy()
policy.set_child_watcher(None)
self.loop.close()
super().tearDown()
class SubprocessSafeWatcherTests(SubprocessWatcherMixin, class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
test_utils.TestCase): test_utils.TestCase):
...@@ -262,17 +255,8 @@ else: ...@@ -262,17 +255,8 @@ else:
class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase): class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase):
def setUp(self): def setUp(self):
policy = asyncio.get_event_loop_policy()
self.loop = asyncio.ProactorEventLoop() self.loop = asyncio.ProactorEventLoop()
self.set_event_loop(self.loop)
# ensure that the event loop is passed explicitly in asyncio
policy.set_event_loop(None)
def tearDown(self):
policy = asyncio.get_event_loop_policy()
self.loop.close()
policy.set_event_loop(None)
super().tearDown()
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -7,17 +7,17 @@ import types ...@@ -7,17 +7,17 @@ import types
import unittest import unittest
import weakref import weakref
from unittest import mock from unittest import mock
import asyncio
from asyncio import coroutines
from asyncio import test_utils
try: try:
from test import support # gc_collect from test import support
from test.script_helper import assert_python_ok from test.script_helper import assert_python_ok
except ImportError: except ImportError:
from asyncio import test_support as support from asyncio import test_support as support
from asyncio.test_support import assert_python_ok from asyncio.test_support import assert_python_ok
import asyncio
from asyncio import coroutines
from asyncio import test_utils
PY34 = (sys.version_info >= (3, 4)) PY34 = (sys.version_info >= (3, 4))
PY35 = (sys.version_info >= (3, 5)) PY35 = (sys.version_info >= (3, 5))
......
"""Tests for unix_events.py.""" """Tests for unix_events.py."""
import collections import collections
import gc
import errno import errno
import io import io
import os import os
import pprint
import signal import signal
import socket import socket
import stat import stat
......
...@@ -5,18 +5,17 @@ import sys ...@@ -5,18 +5,17 @@ import sys
import unittest import unittest
from unittest import mock from unittest import mock
try:
from test import support # gc_collect, IPV6_ENABLED
except ImportError:
from asyncio import test_support as support
if sys.platform != 'win32': if sys.platform != 'win32':
raise unittest.SkipTest('Windows only') raise unittest.SkipTest('Windows only')
import _winapi import _winapi
from asyncio import windows_utils
from asyncio import _overlapped from asyncio import _overlapped
from asyncio import windows_utils
try:
from test import support
except ImportError:
from asyncio import test_support as support
class WinsocketpairTests(unittest.TestCase): class WinsocketpairTests(unittest.TestCase):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment