Commit a96d8fe7 authored by Yury Selivanov's avatar Yury Selivanov

Issue #26909: Fix slow pipes IO in asyncio.

Patch by INADA Naoki.
parent f31cd150
...@@ -434,7 +434,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, ...@@ -434,7 +434,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
self._pipe = pipe self._pipe = pipe
self._fileno = pipe.fileno() self._fileno = pipe.fileno()
self._protocol = protocol self._protocol = protocol
self._buffer = [] self._buffer = bytearray()
self._conn_lost = 0 self._conn_lost = 0
self._closing = False # Set when close() or write_eof() called. self._closing = False # Set when close() or write_eof() called.
...@@ -450,7 +450,6 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, ...@@ -450,7 +450,6 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
"pipes, sockets and character devices") "pipes, sockets and character devices")
_set_nonblocking(self._fileno) _set_nonblocking(self._fileno)
self._loop.call_soon(self._protocol.connection_made, self) self._loop.call_soon(self._protocol.connection_made, self)
# On AIX, the reader trick (to be notified when the read end of the # On AIX, the reader trick (to be notified when the read end of the
...@@ -492,7 +491,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, ...@@ -492,7 +491,7 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
return '<%s>' % ' '.join(info) return '<%s>' % ' '.join(info)
def get_write_buffer_size(self): def get_write_buffer_size(self):
return sum(len(data) for data in self._buffer) return len(self._buffer)
def _read_ready(self): def _read_ready(self):
# Pipe was closed by peer. # Pipe was closed by peer.
...@@ -530,39 +529,37 @@ class _UnixWritePipeTransport(transports._FlowControlMixin, ...@@ -530,39 +529,37 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
if n == len(data): if n == len(data):
return return
elif n > 0: elif n > 0:
data = data[n:] data = memoryview(data)[n:]
self._loop.add_writer(self._fileno, self._write_ready) self._loop.add_writer(self._fileno, self._write_ready)
self._buffer.append(data) self._buffer += data
self._maybe_pause_protocol() self._maybe_pause_protocol()
def _write_ready(self): def _write_ready(self):
data = b''.join(self._buffer) assert self._buffer, 'Data should not be empty'
assert data, 'Data should not be empty'
self._buffer.clear()
try: try:
n = os.write(self._fileno, data) n = os.write(self._fileno, self._buffer)
except (BlockingIOError, InterruptedError): except (BlockingIOError, InterruptedError):
self._buffer.append(data) pass
except Exception as exc: except Exception as exc:
self._buffer.clear()
self._conn_lost += 1 self._conn_lost += 1
# Remove writer here, _fatal_error() doesn't it # Remove writer here, _fatal_error() doesn't it
# because _buffer is empty. # because _buffer is empty.
self._loop.remove_writer(self._fileno) self._loop.remove_writer(self._fileno)
self._fatal_error(exc, 'Fatal write error on pipe transport') self._fatal_error(exc, 'Fatal write error on pipe transport')
else: else:
if n == len(data): if n == len(self._buffer):
self._buffer.clear()
self._loop.remove_writer(self._fileno) self._loop.remove_writer(self._fileno)
self._maybe_resume_protocol() # May append to buffer. self._maybe_resume_protocol() # May append to buffer.
if not self._buffer and self._closing: if self._closing:
self._loop.remove_reader(self._fileno) self._loop.remove_reader(self._fileno)
self._call_connection_lost(None) self._call_connection_lost(None)
return return
elif n > 0: elif n > 0:
data = data[n:] del self._buffer[:n]
self._buffer.append(data) # Try again later.
def can_write_eof(self): def can_write_eof(self):
return True return True
......
...@@ -518,7 +518,7 @@ class UnixWritePipeTransportTests(test_utils.TestCase): ...@@ -518,7 +518,7 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
tr.write(b'data') tr.write(b'data')
m_write.assert_called_with(5, b'data') m_write.assert_called_with(5, b'data')
self.assertFalse(self.loop.writers) self.assertFalse(self.loop.writers)
self.assertEqual([], tr._buffer) self.assertEqual(bytearray(), tr._buffer)
@mock.patch('os.write') @mock.patch('os.write')
def test_write_no_data(self, m_write): def test_write_no_data(self, m_write):
...@@ -526,35 +526,34 @@ class UnixWritePipeTransportTests(test_utils.TestCase): ...@@ -526,35 +526,34 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
tr.write(b'') tr.write(b'')
self.assertFalse(m_write.called) self.assertFalse(m_write.called)
self.assertFalse(self.loop.writers) self.assertFalse(self.loop.writers)
self.assertEqual([], tr._buffer) self.assertEqual(bytearray(b''), tr._buffer)
@mock.patch('os.write') @mock.patch('os.write')
def test_write_partial(self, m_write): def test_write_partial(self, m_write):
tr = self.write_pipe_transport() tr = self.write_pipe_transport()
m_write.return_value = 2 m_write.return_value = 2
tr.write(b'data') tr.write(b'data')
m_write.assert_called_with(5, b'data')
self.loop.assert_writer(5, tr._write_ready) self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'ta'], tr._buffer) self.assertEqual(bytearray(b'ta'), tr._buffer)
@mock.patch('os.write') @mock.patch('os.write')
def test_write_buffer(self, m_write): def test_write_buffer(self, m_write):
tr = self.write_pipe_transport() tr = self.write_pipe_transport()
self.loop.add_writer(5, tr._write_ready) self.loop.add_writer(5, tr._write_ready)
tr._buffer = [b'previous'] tr._buffer = bytearray(b'previous')
tr.write(b'data') tr.write(b'data')
self.assertFalse(m_write.called) self.assertFalse(m_write.called)
self.loop.assert_writer(5, tr._write_ready) self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'previous', b'data'], tr._buffer) self.assertEqual(bytearray(b'previousdata'), tr._buffer)
@mock.patch('os.write') @mock.patch('os.write')
def test_write_again(self, m_write): def test_write_again(self, m_write):
tr = self.write_pipe_transport() tr = self.write_pipe_transport()
m_write.side_effect = BlockingIOError() m_write.side_effect = BlockingIOError()
tr.write(b'data') tr.write(b'data')
m_write.assert_called_with(5, b'data') m_write.assert_called_with(5, bytearray(b'data'))
self.loop.assert_writer(5, tr._write_ready) self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'data'], tr._buffer) self.assertEqual(bytearray(b'data'), tr._buffer)
@mock.patch('asyncio.unix_events.logger') @mock.patch('asyncio.unix_events.logger')
@mock.patch('os.write') @mock.patch('os.write')
...@@ -566,7 +565,7 @@ class UnixWritePipeTransportTests(test_utils.TestCase): ...@@ -566,7 +565,7 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
tr.write(b'data') tr.write(b'data')
m_write.assert_called_with(5, b'data') m_write.assert_called_with(5, b'data')
self.assertFalse(self.loop.writers) self.assertFalse(self.loop.writers)
self.assertEqual([], tr._buffer) self.assertEqual(bytearray(), tr._buffer)
tr._fatal_error.assert_called_with( tr._fatal_error.assert_called_with(
err, err,
'Fatal write error on pipe transport') 'Fatal write error on pipe transport')
...@@ -606,58 +605,55 @@ class UnixWritePipeTransportTests(test_utils.TestCase): ...@@ -606,58 +605,55 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
def test__write_ready(self, m_write): def test__write_ready(self, m_write):
tr = self.write_pipe_transport() tr = self.write_pipe_transport()
self.loop.add_writer(5, tr._write_ready) self.loop.add_writer(5, tr._write_ready)
tr._buffer = [b'da', b'ta'] tr._buffer = bytearray(b'data')
m_write.return_value = 4 m_write.return_value = 4
tr._write_ready() tr._write_ready()
m_write.assert_called_with(5, b'data')
self.assertFalse(self.loop.writers) self.assertFalse(self.loop.writers)
self.assertEqual([], tr._buffer) self.assertEqual(bytearray(), tr._buffer)
@mock.patch('os.write') @mock.patch('os.write')
def test__write_ready_partial(self, m_write): def test__write_ready_partial(self, m_write):
tr = self.write_pipe_transport() tr = self.write_pipe_transport()
self.loop.add_writer(5, tr._write_ready) self.loop.add_writer(5, tr._write_ready)
tr._buffer = [b'da', b'ta'] tr._buffer = bytearray(b'data')
m_write.return_value = 3 m_write.return_value = 3
tr._write_ready() tr._write_ready()
m_write.assert_called_with(5, b'data')
self.loop.assert_writer(5, tr._write_ready) self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'a'], tr._buffer) self.assertEqual(bytearray(b'a'), tr._buffer)
@mock.patch('os.write') @mock.patch('os.write')
def test__write_ready_again(self, m_write): def test__write_ready_again(self, m_write):
tr = self.write_pipe_transport() tr = self.write_pipe_transport()
self.loop.add_writer(5, tr._write_ready) self.loop.add_writer(5, tr._write_ready)
tr._buffer = [b'da', b'ta'] tr._buffer = bytearray(b'data')
m_write.side_effect = BlockingIOError() m_write.side_effect = BlockingIOError()
tr._write_ready() tr._write_ready()
m_write.assert_called_with(5, b'data') m_write.assert_called_with(5, bytearray(b'data'))
self.loop.assert_writer(5, tr._write_ready) self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'data'], tr._buffer) self.assertEqual(bytearray(b'data'), tr._buffer)
@mock.patch('os.write') @mock.patch('os.write')
def test__write_ready_empty(self, m_write): def test__write_ready_empty(self, m_write):
tr = self.write_pipe_transport() tr = self.write_pipe_transport()
self.loop.add_writer(5, tr._write_ready) self.loop.add_writer(5, tr._write_ready)
tr._buffer = [b'da', b'ta'] tr._buffer = bytearray(b'data')
m_write.return_value = 0 m_write.return_value = 0
tr._write_ready() tr._write_ready()
m_write.assert_called_with(5, b'data') m_write.assert_called_with(5, bytearray(b'data'))
self.loop.assert_writer(5, tr._write_ready) self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'data'], tr._buffer) self.assertEqual(bytearray(b'data'), tr._buffer)
@mock.patch('asyncio.log.logger.error') @mock.patch('asyncio.log.logger.error')
@mock.patch('os.write') @mock.patch('os.write')
def test__write_ready_err(self, m_write, m_logexc): def test__write_ready_err(self, m_write, m_logexc):
tr = self.write_pipe_transport() tr = self.write_pipe_transport()
self.loop.add_writer(5, tr._write_ready) self.loop.add_writer(5, tr._write_ready)
tr._buffer = [b'da', b'ta'] tr._buffer = bytearray(b'data')
m_write.side_effect = err = OSError() m_write.side_effect = err = OSError()
tr._write_ready() tr._write_ready()
m_write.assert_called_with(5, b'data')
self.assertFalse(self.loop.writers) self.assertFalse(self.loop.writers)
self.assertFalse(self.loop.readers) self.assertFalse(self.loop.readers)
self.assertEqual([], tr._buffer) self.assertEqual(bytearray(), tr._buffer)
self.assertTrue(tr.is_closing()) self.assertTrue(tr.is_closing())
m_logexc.assert_called_with( m_logexc.assert_called_with(
test_utils.MockPattern( test_utils.MockPattern(
...@@ -673,13 +669,12 @@ class UnixWritePipeTransportTests(test_utils.TestCase): ...@@ -673,13 +669,12 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
tr = self.write_pipe_transport() tr = self.write_pipe_transport()
self.loop.add_writer(5, tr._write_ready) self.loop.add_writer(5, tr._write_ready)
tr._closing = True tr._closing = True
tr._buffer = [b'da', b'ta'] tr._buffer = bytearray(b'data')
m_write.return_value = 4 m_write.return_value = 4
tr._write_ready() tr._write_ready()
m_write.assert_called_with(5, b'data')
self.assertFalse(self.loop.writers) self.assertFalse(self.loop.writers)
self.assertFalse(self.loop.readers) self.assertFalse(self.loop.readers)
self.assertEqual([], tr._buffer) self.assertEqual(bytearray(), tr._buffer)
self.protocol.connection_lost.assert_called_with(None) self.protocol.connection_lost.assert_called_with(None)
self.pipe.close.assert_called_with() self.pipe.close.assert_called_with()
......
...@@ -272,6 +272,9 @@ Library ...@@ -272,6 +272,9 @@ Library
- Issue #26654: Inspect functools.partial in asyncio.Handle.__repr__. - Issue #26654: Inspect functools.partial in asyncio.Handle.__repr__.
Patch by iceboy. Patch by iceboy.
- Issue #26909: Fix slow pipes IO in asyncio.
Patch by INADA Naoki.
IDLE IDLE
---- ----
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment