Commit d757aaf9 authored by Yury Selivanov's avatar Yury Selivanov Committed by GitHub

bpo-32356: idempotent pause_/resume_reading; new is_reading method. (#4914)

parent 2d8f0638
...@@ -118,17 +118,31 @@ ReadTransport ...@@ -118,17 +118,31 @@ ReadTransport
Interface for read-only transports. Interface for read-only transports.
.. method:: is_reading()
Return ``True`` if the transport is receiving new data.
.. versionadded:: 3.7
.. method:: pause_reading() .. method:: pause_reading()
Pause the receiving end of the transport. No data will be passed to Pause the receiving end of the transport. No data will be passed to
the protocol's :meth:`data_received` method until :meth:`resume_reading` the protocol's :meth:`data_received` method until :meth:`resume_reading`
is called. is called.
.. versionchanged:: 3.7
The method is idempotent, i.e. it can be called when the
transport is already paused or closed.
.. method:: resume_reading() .. method:: resume_reading()
Resume the receiving end. The protocol's :meth:`data_received` method Resume the receiving end. The protocol's :meth:`data_received` method
will be called once again if some data is available for reading. will be called once again if some data is available for reading.
.. versionchanged:: 3.7
The method is idempotent, i.e. it can be called when the
transport is already reading.
WriteTransport WriteTransport
-------------- --------------
......
...@@ -152,21 +152,20 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport, ...@@ -152,21 +152,20 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
self._paused = False self._paused = False
self._loop.call_soon(self._loop_reading) self._loop.call_soon(self._loop_reading)
def is_reading(self):
return not self._paused and not self._closing
def pause_reading(self): def pause_reading(self):
if self._closing: if self._closing or self._paused:
raise RuntimeError('Cannot pause_reading() when closing') return
if self._paused:
raise RuntimeError('Already paused')
self._paused = True self._paused = True
if self._loop.get_debug(): if self._loop.get_debug():
logger.debug("%r pauses reading", self) logger.debug("%r pauses reading", self)
def resume_reading(self): def resume_reading(self):
if not self._paused: if self._closing or not self._paused:
raise RuntimeError('Not paused')
self._paused = False
if self._closing:
return return
self._paused = False
self._loop.call_soon(self._loop_reading, self._read_fut) self._loop.call_soon(self._loop_reading, self._read_fut)
if self._loop.get_debug(): if self._loop.get_debug():
logger.debug("%r resumes reading", self) logger.debug("%r resumes reading", self)
......
...@@ -702,22 +702,21 @@ class _SelectorSocketTransport(_SelectorTransport): ...@@ -702,22 +702,21 @@ class _SelectorSocketTransport(_SelectorTransport):
self._loop.call_soon(futures._set_result_unless_cancelled, self._loop.call_soon(futures._set_result_unless_cancelled,
waiter, None) waiter, None)
def is_reading(self):
return not self._paused and not self._closing
def pause_reading(self): def pause_reading(self):
if self._closing: if self._closing or self._paused:
raise RuntimeError('Cannot pause_reading() when closing') return
if self._paused:
raise RuntimeError('Already paused')
self._paused = True self._paused = True
self._loop._remove_reader(self._sock_fd) self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug(): if self._loop.get_debug():
logger.debug("%r pauses reading", self) logger.debug("%r pauses reading", self)
def resume_reading(self): def resume_reading(self):
if not self._paused: if self._closing or not self._paused:
raise RuntimeError('Not paused')
self._paused = False
if self._closing:
return return
self._paused = False
self._loop._add_reader(self._sock_fd, self._read_ready) self._loop._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug(): if self._loop.get_debug():
logger.debug("%r resumes reading", self) logger.debug("%r resumes reading", self)
......
...@@ -317,6 +317,12 @@ class _SSLProtocolTransport(transports._FlowControlMixin, ...@@ -317,6 +317,12 @@ class _SSLProtocolTransport(transports._FlowControlMixin,
source=self) source=self)
self.close() self.close()
def is_reading(self):
tr = self._ssl_protocol._transport
if tr is None:
raise RuntimeError('SSL transport has not been initialized yet')
return tr.is_reading()
def pause_reading(self): def pause_reading(self):
"""Pause the receiving end. """Pause the receiving end.
......
...@@ -44,6 +44,10 @@ class BaseTransport: ...@@ -44,6 +44,10 @@ class BaseTransport:
class ReadTransport(BaseTransport): class ReadTransport(BaseTransport):
"""Interface for read-only transports.""" """Interface for read-only transports."""
def is_reading(self):
"""Return True if the transport is receiving."""
raise NotImplementedError
def pause_reading(self): def pause_reading(self):
"""Pause the receiving end. """Pause the receiving end.
......
...@@ -334,26 +334,36 @@ class ProactorSocketTransportTests(test_utils.TestCase): ...@@ -334,26 +334,36 @@ class ProactorSocketTransportTests(test_utils.TestCase):
f = asyncio.Future(loop=self.loop) f = asyncio.Future(loop=self.loop)
f.set_result(msg) f.set_result(msg)
futures.append(f) futures.append(f)
self.loop._proactor.recv.side_effect = futures self.loop._proactor.recv.side_effect = futures
self.loop._run_once() self.loop._run_once()
self.assertFalse(tr._paused) self.assertFalse(tr._paused)
self.assertTrue(tr.is_reading())
self.loop._run_once() self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data1') self.protocol.data_received.assert_called_with(b'data1')
self.loop._run_once() self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data2') self.protocol.data_received.assert_called_with(b'data2')
tr.pause_reading()
tr.pause_reading() tr.pause_reading()
self.assertTrue(tr._paused) self.assertTrue(tr._paused)
self.assertFalse(tr.is_reading())
for i in range(10): for i in range(10):
self.loop._run_once() self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data2') self.protocol.data_received.assert_called_with(b'data2')
tr.resume_reading()
tr.resume_reading() tr.resume_reading()
self.assertFalse(tr._paused) self.assertFalse(tr._paused)
self.assertTrue(tr.is_reading())
self.loop._run_once() self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data3') self.protocol.data_received.assert_called_with(b'data3')
self.loop._run_once() self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data4') self.protocol.data_received.assert_called_with(b'data4')
tr.close() tr.close()
self.assertFalse(tr.is_reading())
def pause_writing_transport(self, high): def pause_writing_transport(self, high):
tr = self.socket_transport() tr = self.socket_transport()
......
...@@ -80,10 +80,23 @@ class BaseSelectorEventLoopTests(test_utils.TestCase): ...@@ -80,10 +80,23 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
with test_utils.disable_logger(): with test_utils.disable_logger():
transport = self.loop._make_ssl_transport( transport = self.loop._make_ssl_transport(
m, asyncio.Protocol(), m, waiter) m, asyncio.Protocol(), m, waiter)
with self.assertRaisesRegex(RuntimeError,
r'SSL transport.*not.*initialized'):
transport.is_reading()
# execute the handshake while the logger is disabled # execute the handshake while the logger is disabled
# to ignore SSL handshake failure # to ignore SSL handshake failure
test_utils.run_briefly(self.loop) test_utils.run_briefly(self.loop)
self.assertTrue(transport.is_reading())
transport.pause_reading()
transport.pause_reading()
self.assertFalse(transport.is_reading())
transport.resume_reading()
transport.resume_reading()
self.assertTrue(transport.is_reading())
# Sanity check # Sanity check
class_name = transport.__class__.__name__ class_name = transport.__class__.__name__
self.assertIn("ssl", class_name.lower()) self.assertIn("ssl", class_name.lower())
...@@ -894,15 +907,24 @@ class SelectorSocketTransportTests(test_utils.TestCase): ...@@ -894,15 +907,24 @@ class SelectorSocketTransportTests(test_utils.TestCase):
tr = self.socket_transport() tr = self.socket_transport()
test_utils.run_briefly(self.loop) test_utils.run_briefly(self.loop)
self.assertFalse(tr._paused) self.assertFalse(tr._paused)
self.assertTrue(tr.is_reading())
self.loop.assert_reader(7, tr._read_ready) self.loop.assert_reader(7, tr._read_ready)
tr.pause_reading()
tr.pause_reading() tr.pause_reading()
self.assertTrue(tr._paused) self.assertTrue(tr._paused)
self.assertFalse(7 in self.loop.readers) self.assertFalse(tr.is_reading())
self.loop.assert_no_reader(7)
tr.resume_reading()
tr.resume_reading() tr.resume_reading()
self.assertFalse(tr._paused) self.assertFalse(tr._paused)
self.assertTrue(tr.is_reading())
self.loop.assert_reader(7, tr._read_ready) self.loop.assert_reader(7, tr._read_ready)
with self.assertRaises(RuntimeError):
tr.resume_reading() tr.close()
self.assertFalse(tr.is_reading())
self.loop.assert_no_reader(7)
def test_read_ready(self): def test_read_ready(self):
transport = self.socket_transport() transport = self.socket_transport()
......
...@@ -327,12 +327,19 @@ class TestLoop(base_events.BaseEventLoop): ...@@ -327,12 +327,19 @@ class TestLoop(base_events.BaseEventLoop):
return False return False
def assert_reader(self, fd, callback, *args): def assert_reader(self, fd, callback, *args):
assert fd in self.readers, 'fd {} is not registered'.format(fd) if fd not in self.readers:
raise AssertionError(f'fd {fd} is not registered')
handle = self.readers[fd] handle = self.readers[fd]
assert handle._callback == callback, '{!r} != {!r}'.format( if handle._callback != callback:
handle._callback, callback) raise AssertionError(
assert handle._args == args, '{!r} != {!r}'.format( f'unexpected callback: {handle._callback} != {callback}')
handle._args, args) if handle._args != args:
raise AssertionError(
f'unexpected callback args: {handle._args} != {args}')
def assert_no_reader(self, fd):
if fd in self.readers:
raise AssertionError(f'fd {fd} is registered')
def _add_writer(self, fd, callback, *args): def _add_writer(self, fd, callback, *args):
self.writers[fd] = events.Handle(callback, args, self) self.writers[fd] = events.Handle(callback, args, self)
......
asyncio.transport.resume_reading() and pause_reading() are now idempotent.
New transport.is_reading() method is added.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment