Commit eaf16abc authored by Victor Stinner's avatar Victor Stinner

asyncio: sync with github

* Fix ResourceWarning warnings in test_streams
* Return True from StreamReader.eof_received() to fix
  http://bugs.python.org/issue24539 (but still needs a unittest).
  Add StreamReader.__repr__() for easy debugging.
* remove unused imports
* Issue #234: Drop JoinableQueue on Python 3.5+
parent 71080fc3
...@@ -3,7 +3,6 @@ ...@@ -3,7 +3,6 @@
__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore'] __all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
import collections import collections
import sys
from . import compat from . import compat
from . import events from . import events
......
"""Queues""" """Queues"""
__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty', __all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
'JoinableQueue']
import collections import collections
import heapq import heapq
from . import compat
from . import events from . import events
from . import futures from . import futures
from . import locks from . import locks
...@@ -289,5 +289,7 @@ class LifoQueue(Queue): ...@@ -289,5 +289,7 @@ class LifoQueue(Queue):
return self._queue.pop() return self._queue.pop()
JoinableQueue = Queue if not compat.PY35:
"""Deprecated alias for Queue.""" JoinableQueue = Queue
"""Deprecated alias for Queue."""
__all__.append('JoinableQueue')
...@@ -6,7 +6,6 @@ __all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol', ...@@ -6,7 +6,6 @@ __all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
] ]
import socket import socket
import sys
if hasattr(socket, 'AF_UNIX'): if hasattr(socket, 'AF_UNIX'):
__all__.extend(['open_unix_connection', 'start_unix_server']) __all__.extend(['open_unix_connection', 'start_unix_server'])
...@@ -240,6 +239,7 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): ...@@ -240,6 +239,7 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
def eof_received(self): def eof_received(self):
self._stream_reader.feed_eof() self._stream_reader.feed_eof()
return True
class StreamWriter: class StreamWriter:
...@@ -321,6 +321,24 @@ class StreamReader: ...@@ -321,6 +321,24 @@ class StreamReader:
self._transport = None self._transport = None
self._paused = False self._paused = False
def __repr__(self):
info = ['StreamReader']
if self._buffer:
info.append('%d bytes' % len(info))
if self._eof:
info.append('eof')
if self._limit != _DEFAULT_LIMIT:
info.append('l=%d' % self._limit)
if self._waiter:
info.append('w=%r' % self._waiter)
if self._exception:
info.append('e=%r' % self._exception)
if self._transport:
info.append('t=%r' % self._transport)
if self._paused:
info.append('paused')
return '<%s>' % ' '.join(info)
def exception(self): def exception(self):
return self._exception return self._exception
......
__all__ = ['create_subprocess_exec', 'create_subprocess_shell'] __all__ = ['create_subprocess_exec', 'create_subprocess_shell']
import collections
import subprocess import subprocess
from . import events from . import events
from . import futures
from . import protocols from . import protocols
from . import streams from . import streams
from . import tasks from . import tasks
......
...@@ -10,8 +10,6 @@ import concurrent.futures ...@@ -10,8 +10,6 @@ import concurrent.futures
import functools import functools
import inspect import inspect
import linecache import linecache
import sys
import types
import traceback import traceback
import warnings import warnings
import weakref import weakref
......
"""Abstract Transport class.""" """Abstract Transport class."""
import sys
from asyncio import compat from asyncio import compat
__all__ = ['BaseTransport', 'ReadTransport', 'WriteTransport', __all__ = ['BaseTransport', 'ReadTransport', 'WriteTransport',
......
...@@ -446,6 +446,8 @@ class StreamReaderTests(test_utils.TestCase): ...@@ -446,6 +446,8 @@ class StreamReaderTests(test_utils.TestCase):
def handle_client(self, client_reader, client_writer): def handle_client(self, client_reader, client_writer):
data = yield from client_reader.readline() data = yield from client_reader.readline()
client_writer.write(data) client_writer.write(data)
yield from client_writer.drain()
client_writer.close()
def start(self): def start(self):
sock = socket.socket() sock = socket.socket()
...@@ -457,12 +459,8 @@ class StreamReaderTests(test_utils.TestCase): ...@@ -457,12 +459,8 @@ class StreamReaderTests(test_utils.TestCase):
return sock.getsockname() return sock.getsockname()
def handle_client_callback(self, client_reader, client_writer): def handle_client_callback(self, client_reader, client_writer):
task = asyncio.Task(client_reader.readline(), loop=self.loop) self.loop.create_task(self.handle_client(client_reader,
client_writer))
def done(task):
client_writer.write(task.result())
task.add_done_callback(done)
def start_callback(self): def start_callback(self):
sock = socket.socket() sock = socket.socket()
...@@ -522,6 +520,8 @@ class StreamReaderTests(test_utils.TestCase): ...@@ -522,6 +520,8 @@ class StreamReaderTests(test_utils.TestCase):
def handle_client(self, client_reader, client_writer): def handle_client(self, client_reader, client_writer):
data = yield from client_reader.readline() data = yield from client_reader.readline()
client_writer.write(data) client_writer.write(data)
yield from client_writer.drain()
client_writer.close()
def start(self): def start(self):
self.server = self.loop.run_until_complete( self.server = self.loop.run_until_complete(
...@@ -530,18 +530,14 @@ class StreamReaderTests(test_utils.TestCase): ...@@ -530,18 +530,14 @@ class StreamReaderTests(test_utils.TestCase):
loop=self.loop)) loop=self.loop))
def handle_client_callback(self, client_reader, client_writer): def handle_client_callback(self, client_reader, client_writer):
task = asyncio.Task(client_reader.readline(), loop=self.loop) self.loop.create_task(self.handle_client(client_reader,
client_writer))
def done(task):
client_writer.write(task.result())
task.add_done_callback(done)
def start_callback(self): def start_callback(self):
self.server = self.loop.run_until_complete( start = asyncio.start_unix_server(self.handle_client_callback,
asyncio.start_unix_server(self.handle_client_callback, path=self.path,
path=self.path, loop=self.loop)
loop=self.loop)) self.server = self.loop.run_until_complete(start)
def stop(self): def stop(self):
if self.server is not None: if self.server is not None:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment