Commit 3b5162d0 authored by Guido van Rossum's avatar Guido van Rossum

Merge 3.4->default: asyncio: Fix upstream issue 168: StreamReader.read(-1)...

Merge 3.4->default: asyncio: Fix upstream issue 168: StreamReader.read(-1) from pipe may hang if data exceeds buffer limit.
parents 05278eea bf88ffba
...@@ -419,12 +419,17 @@ class StreamReader: ...@@ -419,12 +419,17 @@ class StreamReader:
return b'' return b''
if n < 0: if n < 0:
while not self._eof: # This used to just loop creating a new waiter hoping to
self._waiter = self._create_waiter('read') # collect everything in self._buffer, but that would
try: # deadlock if the subprocess sends more than self.limit
yield from self._waiter # bytes. So just call self.read(self._limit) until EOF.
finally: blocks = []
self._waiter = None while True:
block = yield from self.read(self._limit)
if not block:
break
blocks.append(block)
return b''.join(blocks)
else: else:
if not self._buffer and not self._eof: if not self._buffer and not self._eof:
self._waiter = self._create_waiter('read') self._waiter = self._create_waiter('read')
......
"""Tests for streams.py.""" """Tests for streams.py."""
import gc import gc
import os
import socket import socket
import sys
import unittest import unittest
from unittest import mock from unittest import mock
try: try:
...@@ -583,6 +585,40 @@ class StreamReaderTests(unittest.TestCase): ...@@ -583,6 +585,40 @@ class StreamReaderTests(unittest.TestCase):
server.stop() server.stop()
self.assertEqual(msg, b"hello world!\n") self.assertEqual(msg, b"hello world!\n")
@unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
def test_read_all_from_pipe_reader(self):
# See Tulip issue 168. This test is derived from the example
# subprocess_attach_read_pipe.py, but we configure the
# StreamReader's limit so that twice it is less than the size
# of the data writter. Also we must explicitly attach a child
# watcher to the event loop.
watcher = asyncio.get_child_watcher()
watcher.attach_loop(self.loop)
code = """\
import os, sys
fd = int(sys.argv[1])
os.write(fd, b'data')
os.close(fd)
"""
rfd, wfd = os.pipe()
args = [sys.executable, '-c', code, str(wfd)]
pipe = open(rfd, 'rb', 0)
reader = asyncio.StreamReader(loop=self.loop, limit=1)
protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
transport, _ = self.loop.run_until_complete(
self.loop.connect_read_pipe(lambda: protocol, pipe))
proc = self.loop.run_until_complete(
asyncio.create_subprocess_exec(*args, pass_fds={wfd}, loop=self.loop))
self.loop.run_until_complete(proc.wait())
os.close(wfd)
data = self.loop.run_until_complete(reader.read(-1))
self.assertEqual(data, b'data')
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment