Commit d443f801 authored by Victor Stinner's avatar Victor Stinner

asyncio: sync with Tulip

* PipeHandle now uses None instead of -1 for a closed handle
* Sort imports in windows_utils.
* Fix test_events on Python older than 3.5. Skip SSL tests on the
  ProactorEventLoop if ssl.MemoryIO is missing
* Fix BaseEventLoop._create_connection_transport(). Close the transport if the
  creation of the transport (if the waiter) gets an exception.
* _ProactorBasePipeTransport now sets _sock to None when the transport is
  closed.
* Fix BaseSubprocessTransport.close(). Ignore pipes for which the protocol is
  not set yet (still equal to None).
* TestLoop.close() now calls the close() method of the parent class
  (BaseEventLoop).
* Cleanup BaseSelectorEventLoop: create the protocol on a separated line for
  readability and ease debugging.
* Fix BaseSubprocessTransport._kill_wait(). Set the _returncode attribute, so
  close() doesn't try to terminate the process.
* Tests: explicitly close event loops and transports
* UNIX pipe transports: add closed/closing in repr(). Add "closed" or "closing"
  state in the __repr__() method of _UnixReadPipeTransport and
  _UnixWritePipeTransport classes.
parent b6a5f65c
...@@ -634,7 +634,12 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -634,7 +634,12 @@ class BaseEventLoop(events.AbstractEventLoop):
else: else:
transport = self._make_socket_transport(sock, protocol, waiter) transport = self._make_socket_transport(sock, protocol, waiter)
try:
yield from waiter yield from waiter
except Exception as exc:
transport.close()
raise
return transport, protocol return transport, protocol
@coroutine @coroutine
......
...@@ -71,6 +71,8 @@ class BaseSubprocessTransport(transports.SubprocessTransport): ...@@ -71,6 +71,8 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
def close(self): def close(self):
for proto in self._pipes.values(): for proto in self._pipes.values():
if proto is None:
continue
proto.pipe.close() proto.pipe.close()
if self._returncode is None: if self._returncode is None:
self.terminate() self.terminate()
...@@ -119,7 +121,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport): ...@@ -119,7 +121,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
proc.kill() proc.kill()
except ProcessLookupError: except ProcessLookupError:
pass pass
proc.wait() self._returncode = proc.wait()
@coroutine @coroutine
def _post_init(self): def _post_init(self):
......
...@@ -111,6 +111,7 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin, ...@@ -111,6 +111,7 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
if hasattr(self._sock, 'shutdown'): if hasattr(self._sock, 'shutdown'):
self._sock.shutdown(socket.SHUT_RDWR) self._sock.shutdown(socket.SHUT_RDWR)
self._sock.close() self._sock.close()
self._sock = None
server = self._server server = self._server
if server is not None: if server is not None:
server._detach() server._detach()
......
...@@ -182,13 +182,14 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): ...@@ -182,13 +182,14 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
else: else:
raise # The event loop will catch, log and ignore it. raise # The event loop will catch, log and ignore it.
else: else:
protocol = protocol_factory()
if sslcontext: if sslcontext:
self._make_ssl_transport( self._make_ssl_transport(
conn, protocol_factory(), sslcontext, conn, protocol, sslcontext,
server_side=True, extra={'peername': addr}, server=server) server_side=True, extra={'peername': addr}, server=server)
else: else:
self._make_socket_transport( self._make_socket_transport(
conn, protocol_factory(), extra={'peername': addr}, conn, protocol , extra={'peername': addr},
server=server) server=server)
# It's now up to the protocol to handle the connection. # It's now up to the protocol to handle the connection.
......
...@@ -307,6 +307,7 @@ class TestLoop(base_events.BaseEventLoop): ...@@ -307,6 +307,7 @@ class TestLoop(base_events.BaseEventLoop):
self._time += advance self._time += advance
def close(self): def close(self):
super().close()
if self._check_on_close: if self._check_on_close:
try: try:
self._gen.send(0) self._gen.send(0)
......
...@@ -301,7 +301,12 @@ class _UnixReadPipeTransport(transports.ReadTransport): ...@@ -301,7 +301,12 @@ class _UnixReadPipeTransport(transports.ReadTransport):
self._loop.call_soon(waiter._set_result_unless_cancelled, None) self._loop.call_soon(waiter._set_result_unless_cancelled, None)
def __repr__(self): def __repr__(self):
info = [self.__class__.__name__, 'fd=%s' % self._fileno] info = [self.__class__.__name__]
if self._pipe is None:
info.append('closed')
elif self._closing:
info.append('closing')
info.append('fd=%s' % self._fileno)
if self._pipe is not None: if self._pipe is not None:
polling = selector_events._test_selector_event( polling = selector_events._test_selector_event(
self._loop._selector, self._loop._selector,
...@@ -404,7 +409,12 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, ...@@ -404,7 +409,12 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
self._loop.call_soon(waiter._set_result_unless_cancelled, None) self._loop.call_soon(waiter._set_result_unless_cancelled, None)
def __repr__(self): def __repr__(self):
info = [self.__class__.__name__, 'fd=%s' % self._fileno] info = [self.__class__.__name__]
if self._pipe is None:
info.append('closed')
elif self._closing:
info.append('closing')
info.append('fd=%s' % self._fileno)
if self._pipe is not None: if self._pipe is not None:
polling = selector_events._test_selector_event( polling = selector_events._test_selector_event(
self._loop._selector, self._loop._selector,
......
...@@ -7,13 +7,13 @@ import sys ...@@ -7,13 +7,13 @@ import sys
if sys.platform != 'win32': # pragma: no cover if sys.platform != 'win32': # pragma: no cover
raise ImportError('win32 only') raise ImportError('win32 only')
import socket import _winapi
import itertools import itertools
import msvcrt import msvcrt
import os import os
import socket
import subprocess import subprocess
import tempfile import tempfile
import _winapi
__all__ = ['socketpair', 'pipe', 'Popen', 'PIPE', 'PipeHandle'] __all__ = ['socketpair', 'pipe', 'Popen', 'PIPE', 'PipeHandle']
...@@ -136,7 +136,7 @@ class PipeHandle: ...@@ -136,7 +136,7 @@ class PipeHandle:
self._handle = handle self._handle = handle
def __repr__(self): def __repr__(self):
if self._handle != -1: if self._handle is not None:
handle = 'handle=%r' % self._handle handle = 'handle=%r' % self._handle
else: else:
handle = 'closed' handle = 'closed'
...@@ -150,9 +150,9 @@ class PipeHandle: ...@@ -150,9 +150,9 @@ class PipeHandle:
return self._handle return self._handle
def close(self, *, CloseHandle=_winapi.CloseHandle): def close(self, *, CloseHandle=_winapi.CloseHandle):
if self._handle != -1: if self._handle is not None:
CloseHandle(self._handle) CloseHandle(self._handle)
self._handle = -1 self._handle = None
__del__ = close __del__ = close
......
...@@ -409,6 +409,7 @@ class BaseEventLoopTests(test_utils.TestCase): ...@@ -409,6 +409,7 @@ class BaseEventLoopTests(test_utils.TestCase):
def test_run_until_complete_loop(self): def test_run_until_complete_loop(self):
task = asyncio.Future(loop=self.loop) task = asyncio.Future(loop=self.loop)
other_loop = self.new_test_loop() other_loop = self.new_test_loop()
self.addCleanup(other_loop.close)
self.assertRaises(ValueError, self.assertRaises(ValueError,
other_loop.run_until_complete, task) other_loop.run_until_complete, task)
......
...@@ -25,6 +25,7 @@ import weakref ...@@ -25,6 +25,7 @@ import weakref
import asyncio import asyncio
from asyncio import proactor_events from asyncio import proactor_events
from asyncio import selector_events from asyncio import selector_events
from asyncio import sslproto
from asyncio import test_utils from asyncio import test_utils
try: try:
from test import support from test import support
...@@ -1585,6 +1586,7 @@ class SubprocessTestsMixin: ...@@ -1585,6 +1586,7 @@ class SubprocessTestsMixin:
self.assertTrue(all(f.done() for f in proto.disconnects.values())) self.assertTrue(all(f.done() for f in proto.disconnects.values()))
self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python') self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python')
self.assertEqual(proto.data[2], b'') self.assertEqual(proto.data[2], b'')
transp.close()
def test_subprocess_exitcode(self): def test_subprocess_exitcode(self):
connect = self.loop.subprocess_shell( connect = self.loop.subprocess_shell(
...@@ -1594,6 +1596,7 @@ class SubprocessTestsMixin: ...@@ -1594,6 +1596,7 @@ class SubprocessTestsMixin:
self.assertIsInstance(proto, MySubprocessProtocol) self.assertIsInstance(proto, MySubprocessProtocol)
self.loop.run_until_complete(proto.completed) self.loop.run_until_complete(proto.completed)
self.assertEqual(7, proto.returncode) self.assertEqual(7, proto.returncode)
transp.close()
def test_subprocess_close_after_finish(self): def test_subprocess_close_after_finish(self):
connect = self.loop.subprocess_shell( connect = self.loop.subprocess_shell(
...@@ -1621,6 +1624,7 @@ class SubprocessTestsMixin: ...@@ -1621,6 +1624,7 @@ class SubprocessTestsMixin:
transp.kill() transp.kill()
self.loop.run_until_complete(proto.completed) self.loop.run_until_complete(proto.completed)
self.check_killed(proto.returncode) self.check_killed(proto.returncode)
transp.close()
def test_subprocess_terminate(self): def test_subprocess_terminate(self):
prog = os.path.join(os.path.dirname(__file__), 'echo.py') prog = os.path.join(os.path.dirname(__file__), 'echo.py')
...@@ -1635,6 +1639,7 @@ class SubprocessTestsMixin: ...@@ -1635,6 +1639,7 @@ class SubprocessTestsMixin:
transp.terminate() transp.terminate()
self.loop.run_until_complete(proto.completed) self.loop.run_until_complete(proto.completed)
self.check_terminated(proto.returncode) self.check_terminated(proto.returncode)
transp.close()
@unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
def test_subprocess_send_signal(self): def test_subprocess_send_signal(self):
...@@ -1650,6 +1655,7 @@ class SubprocessTestsMixin: ...@@ -1650,6 +1655,7 @@ class SubprocessTestsMixin:
transp.send_signal(signal.SIGHUP) transp.send_signal(signal.SIGHUP)
self.loop.run_until_complete(proto.completed) self.loop.run_until_complete(proto.completed)
self.assertEqual(-signal.SIGHUP, proto.returncode) self.assertEqual(-signal.SIGHUP, proto.returncode)
transp.close()
def test_subprocess_stderr(self): def test_subprocess_stderr(self):
prog = os.path.join(os.path.dirname(__file__), 'echo2.py') prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
...@@ -1784,6 +1790,22 @@ if sys.platform == 'win32': ...@@ -1784,6 +1790,22 @@ if sys.platform == 'win32':
def create_event_loop(self): def create_event_loop(self):
return asyncio.ProactorEventLoop() return asyncio.ProactorEventLoop()
if not sslproto._is_sslproto_available():
def test_create_ssl_connection(self):
raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
def test_create_server_ssl(self):
raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
def test_create_server_ssl_verify_failed(self):
raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
def test_create_server_ssl_match_failed(self):
raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
def test_create_server_ssl_verified(self):
raise unittest.SkipTest("need python 3.5 (ssl.MemoryBIO)")
def test_legacy_create_ssl_connection(self): def test_legacy_create_ssl_connection(self):
raise unittest.SkipTest("IocpEventLoop incompatible with legacy SSL") raise unittest.SkipTest("IocpEventLoop incompatible with legacy SSL")
......
...@@ -29,6 +29,7 @@ class FutureTests(test_utils.TestCase): ...@@ -29,6 +29,7 @@ class FutureTests(test_utils.TestCase):
def setUp(self): def setUp(self):
self.loop = self.new_test_loop() self.loop = self.new_test_loop()
self.addCleanup(self.loop.close)
def test_initial_state(self): def test_initial_state(self):
f = asyncio.Future(loop=self.loop) f = asyncio.Future(loop=self.loop)
......
...@@ -1744,6 +1744,7 @@ class SelectorDatagramTransportTests(test_utils.TestCase): ...@@ -1744,6 +1744,7 @@ class SelectorDatagramTransportTests(test_utils.TestCase):
test_utils.MockPattern( test_utils.MockPattern(
'Fatal error on transport\nprotocol:.*\ntransport:.*'), 'Fatal error on transport\nprotocol:.*\ntransport:.*'),
exc_info=(ConnectionRefusedError, MOCK_ANY, MOCK_ANY)) exc_info=(ConnectionRefusedError, MOCK_ANY, MOCK_ANY))
transport.close()
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -598,6 +598,7 @@ class UnixWritePipeTransportTests(test_utils.TestCase): ...@@ -598,6 +598,7 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
# This is a bit overspecified. :-( # This is a bit overspecified. :-(
m_log.warning.assert_called_with( m_log.warning.assert_called_with(
'pipe closed by peer or os.write(pipe, data) raised exception.') 'pipe closed by peer or os.write(pipe, data) raised exception.')
tr.close()
@mock.patch('os.write') @mock.patch('os.write')
def test_write_close(self, m_write): def test_write_close(self, m_write):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment