Commit c2c12e43 authored by Victor Stinner's avatar Victor Stinner

Issue #23198: Reactor asyncio.StreamReader

- Add a new _wakeup_waiter() method
- Replace _create_waiter() method with a _wait_for_data() coroutine function
- Use the value None instead of True or False to wake up the waiter
parent 231b404c
...@@ -314,7 +314,7 @@ class StreamReader: ...@@ -314,7 +314,7 @@ class StreamReader:
self._loop = loop self._loop = loop
self._buffer = bytearray() self._buffer = bytearray()
self._eof = False # Whether we're done. self._eof = False # Whether we're done.
self._waiter = None # A future. self._waiter = None # A future used by _wait_for_data()
self._exception = None self._exception = None
self._transport = None self._transport = None
self._paused = False self._paused = False
...@@ -331,6 +331,14 @@ class StreamReader: ...@@ -331,6 +331,14 @@ class StreamReader:
if not waiter.cancelled(): if not waiter.cancelled():
waiter.set_exception(exc) waiter.set_exception(exc)
def _wakeup_waiter(self):
"""Wakeup read() or readline() function waiting for data or EOF."""
waiter = self._waiter
if waiter is not None:
self._waiter = None
if not waiter.cancelled():
waiter.set_result(None)
def set_transport(self, transport): def set_transport(self, transport):
assert self._transport is None, 'Transport already set' assert self._transport is None, 'Transport already set'
self._transport = transport self._transport = transport
...@@ -342,11 +350,7 @@ class StreamReader: ...@@ -342,11 +350,7 @@ class StreamReader:
def feed_eof(self): def feed_eof(self):
self._eof = True self._eof = True
waiter = self._waiter self._wakeup_waiter()
if waiter is not None:
self._waiter = None
if not waiter.cancelled():
waiter.set_result(True)
def at_eof(self): def at_eof(self):
"""Return True if the buffer is empty and 'feed_eof' was called.""" """Return True if the buffer is empty and 'feed_eof' was called."""
...@@ -359,12 +363,7 @@ class StreamReader: ...@@ -359,12 +363,7 @@ class StreamReader:
return return
self._buffer.extend(data) self._buffer.extend(data)
self._wakeup_waiter()
waiter = self._waiter
if waiter is not None:
self._waiter = None
if not waiter.cancelled():
waiter.set_result(False)
if (self._transport is not None and if (self._transport is not None and
not self._paused and not self._paused and
...@@ -379,7 +378,8 @@ class StreamReader: ...@@ -379,7 +378,8 @@ class StreamReader:
else: else:
self._paused = True self._paused = True
def _create_waiter(self, func_name): def _wait_for_data(self, func_name):
"""Wait until feed_data() or feed_eof() is called."""
# StreamReader uses a future to link the protocol feed_data() method # StreamReader uses a future to link the protocol feed_data() method
# to a read coroutine. Running two read coroutines at the same time # to a read coroutine. Running two read coroutines at the same time
# would have an unexpected behaviour. It would not possible to know # would have an unexpected behaviour. It would not possible to know
...@@ -387,7 +387,12 @@ class StreamReader: ...@@ -387,7 +387,12 @@ class StreamReader:
if self._waiter is not None: if self._waiter is not None:
raise RuntimeError('%s() called while another coroutine is ' raise RuntimeError('%s() called while another coroutine is '
'already waiting for incoming data' % func_name) 'already waiting for incoming data' % func_name)
return futures.Future(loop=self._loop)
self._waiter = futures.Future(loop=self._loop)
try:
yield from self._waiter
finally:
self._waiter = None
@coroutine @coroutine
def readline(self): def readline(self):
...@@ -417,11 +422,7 @@ class StreamReader: ...@@ -417,11 +422,7 @@ class StreamReader:
break break
if not_enough: if not_enough:
self._waiter = self._create_waiter('readline') yield from self._wait_for_data('readline')
try:
yield from self._waiter
finally:
self._waiter = None
self._maybe_resume_transport() self._maybe_resume_transport()
return bytes(line) return bytes(line)
...@@ -448,11 +449,7 @@ class StreamReader: ...@@ -448,11 +449,7 @@ class StreamReader:
return b''.join(blocks) return b''.join(blocks)
else: else:
if not self._buffer and not self._eof: if not self._buffer and not self._eof:
self._waiter = self._create_waiter('read') yield from self._wait_for_data('read')
try:
yield from self._waiter
finally:
self._waiter = None
if n < 0 or len(self._buffer) <= n: if n < 0 or len(self._buffer) <= n:
data = bytes(self._buffer) data = bytes(self._buffer)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment