Commit 8dffc456 authored by Victor Stinner's avatar Victor Stinner

Update asyncio from the Tulip project

Major changes:

- StreamReader.readexactly() now raises an IncompleteReadError if the
  end of stream is reached before we received enough bytes, instead of
  returning less bytes than requested.

- Unit tests use the main asyncio module instead of submodules like events

- _UnixWritePipeTransport now also supports character devices, as
  _UnixReadPipeTransport. Patch written by Jonathan Slenders.

- Export more symbols: BaseEventLoop, BaseProactorEventLoop,
  BaseSelectorEventLoop, Queue and Queue sublasses, Empty, Full
parent 75a5ec88
......@@ -18,13 +18,17 @@ if sys.platform == 'win32':
import _overlapped # Will also be exported.
# This relies on each of the submodules having an __all__ variable.
from .futures import *
from .base_events import *
from .events import *
from .futures import *
from .locks import *
from .transports import *
from .proactor_events import *
from .protocols import *
from .queues import *
from .selector_events import *
from .streams import *
from .tasks import *
from .transports import *
if sys.platform == 'win32': # pragma: no cover
from .windows_events import *
......@@ -32,10 +36,14 @@ else:
from .unix_events import * # pragma: no cover
__all__ = (futures.__all__ +
__all__ = (base_events.__all__ +
events.__all__ +
futures.__all__ +
locks.__all__ +
transports.__all__ +
proactor_events.__all__ +
protocols.__all__ +
queues.__all__ +
selector_events.__all__ +
streams.__all__ +
tasks.__all__)
tasks.__all__ +
transports.__all__)
......@@ -4,6 +4,8 @@ A proactor is a "notify-on-completion" multiplexer. Currently a
proactor is only implemented on Windows with IOCP.
"""
__all__ = ['BaseProactorEventLoop']
import socket
from . import base_events
......
......@@ -4,6 +4,8 @@ A selector is a "notify-when-ready" multiplexer. For a subclass which
also includes support for signal handling, see the unix_events sub-module.
"""
__all__ = ['BaseSelectorEventLoop']
import collections
import errno
import socket
......
"""Stream-related things."""
__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
'open_connection', 'start_server',
'open_connection', 'start_server', 'IncompleteReadError',
]
import collections
......@@ -14,6 +14,19 @@ from . import tasks
_DEFAULT_LIMIT = 2**16
class IncompleteReadError(EOFError):
"""
Incomplete read error. Attributes:
- partial: read bytes string before the end of stream was reached
- expected: total number of expected bytes
"""
def __init__(self, partial, expected):
EOFError.__init__(self, "%s bytes read on a total of %s expected bytes"
% (len(partial), expected))
self.partial = partial
self.expected = expected
@tasks.coroutine
def open_connection(host=None, port=None, *,
......@@ -403,12 +416,9 @@ class StreamReader:
while n > 0:
block = yield from self.read(n)
if not block:
break
partial = b''.join(blocks)
raise IncompleteReadError(partial, len(partial) + n)
blocks.append(block)
n -= len(block)
# TODO: Raise EOFError if we break before n == 0? (That would
# be a change in specification, but I've always had to add an
# explicit size check to the caller.)
return b''.join(blocks)
......@@ -259,9 +259,11 @@ class _UnixWritePipeTransport(transports.WriteTransport):
self._fileno = pipe.fileno()
mode = os.fstat(self._fileno).st_mode
is_socket = stat.S_ISSOCK(mode)
is_pipe = stat.S_ISFIFO(mode)
if not (is_socket or is_pipe):
raise ValueError("Pipe transport is for pipes/sockets only.")
if not (is_socket or
stat.S_ISFIFO(mode) or
stat.S_ISCHR(mode)):
raise ValueError("Pipe transport is only for "
"pipes, sockets and character devices")
_set_nonblocking(self._fileno)
self._protocol = protocol
self._buffer = []
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -5,7 +5,6 @@ import unittest
import unittest.mock
import asyncio
from asyncio.proactor_events import BaseProactorEventLoop
from asyncio.proactor_events import _ProactorSocketTransport
from asyncio.proactor_events import _ProactorWritePipeTransport
from asyncio.proactor_events import _ProactorDuplexPipeTransport
......@@ -345,18 +344,18 @@ class BaseProactorEventLoopTests(unittest.TestCase):
self.ssock, self.csock = unittest.mock.Mock(), unittest.mock.Mock()
class EventLoop(BaseProactorEventLoop):
class EventLoop(asyncio.BaseProactorEventLoop):
def _socketpair(s):
return (self.ssock, self.csock)
self.loop = EventLoop(self.proactor)
@unittest.mock.patch.object(BaseProactorEventLoop, 'call_soon')
@unittest.mock.patch.object(BaseProactorEventLoop, '_socketpair')
@unittest.mock.patch.object(asyncio.BaseProactorEventLoop, 'call_soon')
@unittest.mock.patch.object(asyncio.BaseProactorEventLoop, '_socketpair')
def test_ctor(self, socketpair, call_soon):
ssock, csock = socketpair.return_value = (
unittest.mock.Mock(), unittest.mock.Mock())
loop = BaseProactorEventLoop(self.proactor)
loop = asyncio.BaseProactorEventLoop(self.proactor)
self.assertIs(loop._ssock, ssock)
self.assertIs(loop._csock, csock)
self.assertEqual(loop._internal_fds, 1)
......@@ -399,7 +398,7 @@ class BaseProactorEventLoopTests(unittest.TestCase):
def test_socketpair(self):
self.assertRaises(
NotImplementedError, BaseProactorEventLoop, self.proactor)
NotImplementedError, asyncio.BaseProactorEventLoop, self.proactor)
def test_make_socket_transport(self):
tr = self.loop._make_socket_transport(self.sock, unittest.mock.Mock())
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -3,17 +3,17 @@
import unittest
import unittest.mock
from asyncio import transports
import asyncio
class TransportTests(unittest.TestCase):
def test_ctor_extra_is_none(self):
transport = transports.Transport()
transport = asyncio.Transport()
self.assertEqual(transport._extra, {})
def test_get_extra_info(self):
transport = transports.Transport({'extra': 'info'})
transport = asyncio.Transport({'extra': 'info'})
self.assertEqual('info', transport.get_extra_info('extra'))
self.assertIsNone(transport.get_extra_info('unknown'))
......@@ -21,7 +21,7 @@ class TransportTests(unittest.TestCase):
self.assertIs(default, transport.get_extra_info('unknown', default))
def test_writelines(self):
transport = transports.Transport()
transport = asyncio.Transport()
transport.write = unittest.mock.Mock()
transport.writelines([b'line1',
......@@ -31,7 +31,7 @@ class TransportTests(unittest.TestCase):
transport.write.assert_called_with(b'line1line2line3')
def test_not_implemented(self):
transport = transports.Transport()
transport = asyncio.Transport()
self.assertRaises(NotImplementedError,
transport.set_write_buffer_limits)
......@@ -45,13 +45,13 @@ class TransportTests(unittest.TestCase):
self.assertRaises(NotImplementedError, transport.abort)
def test_dgram_not_implemented(self):
transport = transports.DatagramTransport()
transport = asyncio.DatagramTransport()
self.assertRaises(NotImplementedError, transport.sendto, 'data')
self.assertRaises(NotImplementedError, transport.abort)
def test_subprocess_transport_not_implemented(self):
transport = transports.SubprocessTransport()
transport = asyncio.SubprocessTransport()
self.assertRaises(NotImplementedError, transport.get_pid)
self.assertRaises(NotImplementedError, transport.get_returncode)
......
......@@ -17,9 +17,8 @@ if sys.platform == 'win32':
raise unittest.SkipTest('UNIX only')
from asyncio import events
from asyncio import futures
from asyncio import protocols
import asyncio
from asyncio import log
from asyncio import test_utils
from asyncio import unix_events
......@@ -28,8 +27,8 @@ from asyncio import unix_events
class SelectorEventLoopTests(unittest.TestCase):
def setUp(self):
self.loop = unix_events.SelectorEventLoop()
events.set_event_loop(None)
self.loop = asyncio.SelectorEventLoop()
asyncio.set_event_loop(None)
def tearDown(self):
self.loop.close()
......@@ -44,7 +43,7 @@ class SelectorEventLoopTests(unittest.TestCase):
self.loop._handle_signal(signal.NSIG + 1, ())
def test_handle_signal_cancelled_handler(self):
h = events.Handle(unittest.mock.Mock(), ())
h = asyncio.Handle(unittest.mock.Mock(), ())
h.cancel()
self.loop._signal_handlers[signal.NSIG + 1] = h
self.loop.remove_signal_handler = unittest.mock.Mock()
......@@ -68,7 +67,7 @@ class SelectorEventLoopTests(unittest.TestCase):
cb = lambda: True
self.loop.add_signal_handler(signal.SIGHUP, cb)
h = self.loop._signal_handlers.get(signal.SIGHUP)
self.assertIsInstance(h, events.Handle)
self.assertIsInstance(h, asyncio.Handle)
self.assertEqual(h._callback, cb)
@unittest.mock.patch('asyncio.unix_events.signal')
......@@ -205,7 +204,7 @@ class UnixReadPipeTransportTests(unittest.TestCase):
def setUp(self):
self.loop = test_utils.TestLoop()
self.protocol = test_utils.make_test_protocol(protocols.Protocol)
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
self.pipe = unittest.mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
......@@ -228,7 +227,7 @@ class UnixReadPipeTransportTests(unittest.TestCase):
self.protocol.connection_made.assert_called_with(tr)
def test_ctor_with_waiter(self):
fut = futures.Future(loop=self.loop)
fut = asyncio.Future(loop=self.loop)
unix_events._UnixReadPipeTransport(
self.loop, self.pipe, self.protocol, fut)
test_utils.run_briefly(self.loop)
......@@ -368,7 +367,7 @@ class UnixWritePipeTransportTests(unittest.TestCase):
def setUp(self):
self.loop = test_utils.TestLoop()
self.protocol = test_utils.make_test_protocol(protocols.BaseProtocol)
self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
self.pipe = unittest.mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
......@@ -391,7 +390,7 @@ class UnixWritePipeTransportTests(unittest.TestCase):
self.protocol.connection_made.assert_called_with(tr)
def test_ctor_with_waiter(self):
fut = futures.Future(loop=self.loop)
fut = asyncio.Future(loop=self.loop)
tr = unix_events._UnixWritePipeTransport(
self.loop, self.pipe, self.protocol, fut)
self.loop.assert_reader(5, tr._read_ready)
......@@ -682,7 +681,7 @@ class AbstractChildWatcherTests(unittest.TestCase):
def test_not_implemented(self):
f = unittest.mock.Mock()
watcher = unix_events.AbstractChildWatcher()
watcher = asyncio.AbstractChildWatcher()
self.assertRaises(
NotImplementedError, watcher.add_child_handler, f, f)
self.assertRaises(
......@@ -717,7 +716,7 @@ WaitPidMocks = collections.namedtuple("WaitPidMocks",
class ChildWatcherTestsMixin:
ignore_warnings = unittest.mock.patch.object(unix_events.logger, "warning")
ignore_warnings = unittest.mock.patch.object(log.logger, "warning")
def setUp(self):
self.loop = test_utils.TestLoop()
......@@ -730,7 +729,7 @@ class ChildWatcherTestsMixin:
self.watcher.attach_loop(self.loop)
def waitpid(self, pid, flags):
if isinstance(self.watcher, unix_events.SafeChildWatcher) or pid != -1:
if isinstance(self.watcher, asyncio.SafeChildWatcher) or pid != -1:
self.assertGreater(pid, 0)
try:
if pid < 0:
......@@ -1205,7 +1204,7 @@ class ChildWatcherTestsMixin:
# raise an exception
m.waitpid.side_effect = ValueError
with unittest.mock.patch.object(unix_events.logger,
with unittest.mock.patch.object(log.logger,
"exception") as m_exception:
self.assertEqual(self.watcher._sig_chld(), None)
......@@ -1240,7 +1239,7 @@ class ChildWatcherTestsMixin:
self.watcher._sig_chld()
callback.assert_called(m.waitpid)
if isinstance(self.watcher, unix_events.FastChildWatcher):
if isinstance(self.watcher, asyncio.FastChildWatcher):
# here the FastChildWatche enters a deadlock
# (there is no way to prevent it)
self.assertFalse(callback.called)
......@@ -1380,7 +1379,7 @@ class ChildWatcherTestsMixin:
self.watcher.add_child_handler(64, callback1)
self.assertEqual(len(self.watcher._callbacks), 1)
if isinstance(self.watcher, unix_events.FastChildWatcher):
if isinstance(self.watcher, asyncio.FastChildWatcher):
self.assertEqual(len(self.watcher._zombies), 1)
with unittest.mock.patch.object(
......@@ -1392,31 +1391,31 @@ class ChildWatcherTestsMixin:
m_remove_signal_handler.assert_called_once_with(
signal.SIGCHLD)
self.assertFalse(self.watcher._callbacks)
if isinstance(self.watcher, unix_events.FastChildWatcher):
if isinstance(self.watcher, asyncio.FastChildWatcher):
self.assertFalse(self.watcher._zombies)
class SafeChildWatcherTests (ChildWatcherTestsMixin, unittest.TestCase):
def create_watcher(self):
return unix_events.SafeChildWatcher()
return asyncio.SafeChildWatcher()
class FastChildWatcherTests (ChildWatcherTestsMixin, unittest.TestCase):
def create_watcher(self):
return unix_events.FastChildWatcher()
return asyncio.FastChildWatcher()
class PolicyTests(unittest.TestCase):
def create_policy(self):
return unix_events.DefaultEventLoopPolicy()
return asyncio.DefaultEventLoopPolicy()
def test_get_child_watcher(self):
policy = self.create_policy()
self.assertIsNone(policy._watcher)
watcher = policy.get_child_watcher()
self.assertIsInstance(watcher, unix_events.SafeChildWatcher)
self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
self.assertIs(policy._watcher, watcher)
......@@ -1425,7 +1424,7 @@ class PolicyTests(unittest.TestCase):
def test_get_child_watcher_after_set(self):
policy = self.create_policy()
watcher = unix_events.FastChildWatcher()
watcher = asyncio.FastChildWatcher()
policy.set_child_watcher(watcher)
self.assertIs(policy._watcher, watcher)
......@@ -1438,7 +1437,7 @@ class PolicyTests(unittest.TestCase):
self.assertIsNone(policy._watcher)
watcher = policy.get_child_watcher()
self.assertIsInstance(watcher, unix_events.SafeChildWatcher)
self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
self.assertIs(watcher._loop, loop)
loop.close()
......@@ -1449,10 +1448,10 @@ class PolicyTests(unittest.TestCase):
policy.set_event_loop(policy.new_event_loop())
self.assertIsInstance(policy.get_event_loop(),
events.AbstractEventLoop)
asyncio.AbstractEventLoop)
watcher = policy.get_child_watcher()
self.assertIsInstance(watcher, unix_events.SafeChildWatcher)
self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
self.assertIsNone(watcher._loop)
policy.get_event_loop().close()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment