Commit 6b5a2797 authored by Andrew Svetlov's avatar Andrew Svetlov Committed by GitHub

bpo-32410: Implement loop.sock_sendfile() (#4976)

parent c495e799
......@@ -701,6 +701,36 @@ Low-level socket operations
:meth:`AbstractEventLoop.create_server` and :func:`start_server`.
.. coroutinemethod:: AbstractEventLoop.sock_sendfile(sock, file, \
offset=0, count=None, \
*, fallback=True)
Send a file using high-performance :mod:`os.sendfile` if possible
and return the total number of bytes which were sent.
Asynchronous version of :meth:`socket.socket.sendfile`.
*sock* must be non-blocking :class:`~socket.socket` of
:const:`socket.SOCK_STREAM` type.
*file* must be a regular file object opened in binary mode.
*offset* tells from where to start reading the file. If specified,
*count* is the total number of bytes to transmit as opposed to
sending the file until EOF is reached. File position is updated on
return or also in case of error in which case :meth:`file.tell()
<io.IOBase.tell>` can be used to figure out the number of bytes
which were sent.
*fallback* set to ``True`` makes asyncio to manually read and send
the file when the platform does not support the sendfile syscall
(e.g. Windows or SSL socket on Unix).
Raise :exc:`RuntimeError` if the system does not support
*sendfile* syscall and *fallback* is ``False``.
.. versionadded:: 3.7
Resolve host name
-----------------
......
......@@ -154,6 +154,10 @@ def _run_until_complete_cb(fut):
futures._get_loop(fut).stop()
class _SendfileNotAvailable(RuntimeError):
pass
class Server(events.AbstractServer):
def __init__(self, loop, sockets):
......@@ -647,6 +651,72 @@ class BaseEventLoop(events.AbstractEventLoop):
return await self.run_in_executor(
None, socket.getnameinfo, sockaddr, flags)
async def sock_sendfile(self, sock, file, offset=0, count=None,
*, fallback=True):
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
self._check_sendfile_params(sock, file, offset, count)
try:
return await self._sock_sendfile_native(sock, file,
offset, count)
except _SendfileNotAvailable as exc:
if fallback:
return await self._sock_sendfile_fallback(sock, file,
offset, count)
else:
raise RuntimeError(exc.args[0]) from None
async def _sock_sendfile_native(self, sock, file, offset, count):
# NB: sendfile syscall is not supported for SSL sockets and
# non-mmap files even if sendfile is supported by OS
raise _SendfileNotAvailable(
f"syscall sendfile is not available for socket {sock!r} "
"and file {file!r} combination")
async def _sock_sendfile_fallback(self, sock, file, offset, count):
if offset:
file.seek(offset)
blocksize = min(count, 16384) if count else 16384
buf = bytearray(blocksize)
total_sent = 0
try:
while True:
if count:
blocksize = min(count - total_sent, blocksize)
if blocksize <= 0:
break
view = memoryview(buf)[:blocksize]
read = file.readinto(view)
if not read:
break # EOF
await self.sock_sendall(sock, view)
total_sent += read
return total_sent
finally:
if total_sent > 0 and hasattr(file, 'seek'):
file.seek(offset + total_sent)
def _check_sendfile_params(self, sock, file, offset, count):
if 'b' not in getattr(file, 'mode', 'b'):
raise ValueError("file should be opened in binary mode")
if not sock.type == socket.SOCK_STREAM:
raise ValueError("only SOCK_STREAM type sockets are supported")
if count is not None:
if not isinstance(count, int):
raise TypeError(
"count must be a positive integer (got {!r})".format(count))
if count <= 0:
raise ValueError(
"count must be a positive integer (got {!r})".format(count))
if not isinstance(offset, int):
raise TypeError(
"offset must be a non-negative integer (got {!r})".format(
offset))
if offset < 0:
raise ValueError(
"offset must be a non-negative integer (got {!r})".format(
offset))
async def create_connection(
self, protocol_factory, host=None, port=None,
*, ssl=None, family=0,
......
......@@ -464,6 +464,10 @@ class AbstractEventLoop:
async def sock_accept(self, sock):
raise NotImplementedError
async def sock_sendfile(self, sock, file, offset=0, count=None,
*, fallback=None):
raise NotImplementedError
# Signal handling.
def add_signal_handler(self, sig, callback, *args):
......
"""Selector event loop for Unix with signal handling."""
import errno
import io
import os
import selectors
import signal
......@@ -308,6 +309,98 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
ssl_handshake_timeout=ssl_handshake_timeout)
return server
async def _sock_sendfile_native(self, sock, file, offset, count):
try:
os.sendfile
except AttributeError as exc:
raise base_events._SendfileNotAvailable(
"os.sendfile() is not available")
try:
fileno = file.fileno()
except (AttributeError, io.UnsupportedOperation) as err:
raise base_events._SendfileNotAvailable("not a regular file")
try:
fsize = os.fstat(fileno).st_size
except OSError as err:
raise base_events._SendfileNotAvailable("not a regular file")
blocksize = count if count else fsize
if not blocksize:
return 0 # empty file
fut = self.create_future()
self._sock_sendfile_native_impl(fut, None, sock, fileno,
offset, count, blocksize, 0)
return await fut
def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
offset, count, blocksize, total_sent):
fd = sock.fileno()
if registered_fd is not None:
# Remove the callback early. It should be rare that the
# selector says the fd is ready but the call still returns
# EAGAIN, and I am willing to take a hit in that case in
# order to simplify the common case.
self.remove_writer(registered_fd)
if fut.cancelled():
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
return
if count:
blocksize = count - total_sent
if blocksize <= 0:
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
fut.set_result(total_sent)
return
try:
sent = os.sendfile(fd, fileno, offset, blocksize)
except (BlockingIOError, InterruptedError):
if registered_fd is None:
self._sock_add_cancellation_callback(fut, sock)
self.add_writer(fd, self._sock_sendfile_native_impl, fut,
fd, sock, fileno,
offset, count, blocksize, total_sent)
except OSError as exc:
if total_sent == 0:
# We can get here for different reasons, the main
# one being 'file' is not a regular mmap(2)-like
# file, in which case we'll fall back on using
# plain send().
err = base_events._SendfileNotAvailable(
"os.sendfile call failed")
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
fut.set_exception(err)
else:
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
fut.set_exception(exc)
except Exception as exc:
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
fut.set_exception(exc)
else:
if sent == 0:
# EOF
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
fut.set_result(total_sent)
else:
offset += sent
total_sent += sent
if registered_fd is None:
self._sock_add_cancellation_callback(fut, sock)
self.add_writer(fd, self._sock_sendfile_native_impl, fut,
fd, sock, fileno,
offset, count, blocksize, total_sent)
def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
if total_sent > 0:
os.lseek(fileno, offset, os.SEEK_SET)
def _sock_add_cancellation_callback(self, fut, sock):
def cb(fut):
if fut.cancelled():
fd = sock.fileno()
if fd != -1:
self.remove_writer(fd)
fut.add_done_callback(cb)
class _UnixReadPipeTransport(transports.ReadTransport):
......
......@@ -1787,5 +1787,163 @@ class RunningLoopTests(unittest.TestCase):
outer_loop.close()
class BaseLoopSendfileTests(test_utils.TestCase):
DATA = b"12345abcde" * 16 * 1024 # 160 KiB
class MyProto(asyncio.Protocol):
def __init__(self, loop):
self.started = False
self.closed = False
self.data = bytearray()
self.fut = loop.create_future()
def connection_made(self, transport):
self.started = True
def data_received(self, data):
self.data.extend(data)
def connection_lost(self, exc):
self.closed = True
self.fut.set_result(None)
async def wait_closed(self):
await self.fut
@classmethod
def setUpClass(cls):
with open(support.TESTFN, 'wb') as fp:
fp.write(cls.DATA)
super().setUpClass()
@classmethod
def tearDownClass(cls):
support.unlink(support.TESTFN)
super().tearDownClass()
def setUp(self):
from asyncio.selector_events import BaseSelectorEventLoop
# BaseSelectorEventLoop() has no native implementation
self.loop = BaseSelectorEventLoop()
self.set_event_loop(self.loop)
self.file = open(support.TESTFN, 'rb')
self.addCleanup(self.file.close)
super().setUp()
def make_socket(self, blocking=False):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(blocking)
self.addCleanup(sock.close)
return sock
def run_loop(self, coro):
return self.loop.run_until_complete(coro)
def prepare(self):
sock = self.make_socket()
proto = self.MyProto(self.loop)
port = support.find_unused_port()
server = self.run_loop(self.loop.create_server(
lambda: proto, support.HOST, port))
self.run_loop(self.loop.sock_connect(sock, (support.HOST, port)))
def cleanup():
server.close()
self.run_loop(server.wait_closed())
self.addCleanup(cleanup)
return sock, proto
def test__sock_sendfile_native_failure(self):
sock, proto = self.prepare()
with self.assertRaisesRegex(base_events._SendfileNotAvailable,
"sendfile is not available"):
self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
0, None))
self.assertEqual(proto.data, b'')
self.assertEqual(self.file.tell(), 0)
def test_sock_sendfile_no_fallback(self):
sock, proto = self.prepare()
with self.assertRaisesRegex(RuntimeError,
"sendfile is not available"):
self.run_loop(self.loop.sock_sendfile(sock, self.file,
fallback=False))
self.assertEqual(self.file.tell(), 0)
self.assertEqual(proto.data, b'')
def test_sock_sendfile_fallback(self):
sock, proto = self.prepare()
ret = self.run_loop(self.loop.sock_sendfile(sock, self.file))
sock.close()
self.run_loop(proto.wait_closed())
self.assertEqual(ret, len(self.DATA))
self.assertEqual(self.file.tell(), len(self.DATA))
self.assertEqual(proto.data, self.DATA)
def test_sock_sendfile_fallback_offset_and_count(self):
sock, proto = self.prepare()
ret = self.run_loop(self.loop.sock_sendfile(sock, self.file,
1000, 2000))
sock.close()
self.run_loop(proto.wait_closed())
self.assertEqual(ret, 2000)
self.assertEqual(self.file.tell(), 3000)
self.assertEqual(proto.data, self.DATA[1000:3000])
def test_blocking_socket(self):
self.loop.set_debug(True)
sock = self.make_socket(blocking=True)
with self.assertRaisesRegex(ValueError, "must be non-blocking"):
self.run_loop(self.loop.sock_sendfile(sock, self.file))
def test_nonbinary_file(self):
sock = self.make_socket()
with open(support.TESTFN, 'r') as f:
with self.assertRaisesRegex(ValueError, "binary mode"):
self.run_loop(self.loop.sock_sendfile(sock, f))
def test_nonstream_socket(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.addCleanup(sock.close)
with self.assertRaisesRegex(ValueError, "only SOCK_STREAM type"):
self.run_loop(self.loop.sock_sendfile(sock, self.file))
def test_notint_count(self):
sock = self.make_socket()
with self.assertRaisesRegex(TypeError,
"count must be a positive integer"):
self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, 'count'))
def test_negative_count(self):
sock = self.make_socket()
with self.assertRaisesRegex(ValueError,
"count must be a positive integer"):
self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, -1))
def test_notint_offset(self):
sock = self.make_socket()
with self.assertRaisesRegex(TypeError,
"offset must be a non-negative integer"):
self.run_loop(self.loop.sock_sendfile(sock, self.file, 'offset'))
def test_negative_offset(self):
sock = self.make_socket()
with self.assertRaisesRegex(ValueError,
"offset must be a non-negative integer"):
self.run_loop(self.loop.sock_sendfile(sock, self.file, -1))
if __name__ == '__main__':
unittest.main()
......@@ -2555,6 +2555,8 @@ class AbstractEventLoopTests(unittest.TestCase):
await loop.sock_connect(f, f)
with self.assertRaises(NotImplementedError):
await loop.sock_accept(f)
with self.assertRaises(NotImplementedError):
await loop.sock_sendfile(f, mock.Mock())
with self.assertRaises(NotImplementedError):
await loop.connect_read_pipe(f, mock.sentinel.pipe)
with self.assertRaises(NotImplementedError):
......
"""Tests for unix_events.py."""
import collections
import contextlib
import errno
import io
import os
......@@ -21,6 +22,7 @@ if sys.platform == 'win32':
import asyncio
from asyncio import log
from asyncio import base_events
from asyncio import unix_events
from test.test_asyncio import utils as test_utils
......@@ -417,6 +419,255 @@ class SelectorEventLoopUnixSocketTests(test_utils.TestCase):
self.loop.run_until_complete(coro)
@unittest.skipUnless(hasattr(os, 'sendfile'),
'sendfile is not supported')
class SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase):
DATA = b"12345abcde" * 16 * 1024 # 160 KiB
class MyProto(asyncio.Protocol):
def __init__(self, loop):
self.started = False
self.closed = False
self.data = bytearray()
self.fut = loop.create_future()
self.transport = None
def connection_made(self, transport):
self.started = True
self.transport = transport
def data_received(self, data):
self.data.extend(data)
def connection_lost(self, exc):
self.closed = True
self.fut.set_result(None)
async def wait_closed(self):
await self.fut
@classmethod
def setUpClass(cls):
with open(support.TESTFN, 'wb') as fp:
fp.write(cls.DATA)
super().setUpClass()
@classmethod
def tearDownClass(cls):
support.unlink(support.TESTFN)
super().tearDownClass()
def setUp(self):
self.loop = asyncio.new_event_loop()
self.set_event_loop(self.loop)
self.file = open(support.TESTFN, 'rb')
self.addCleanup(self.file.close)
super().setUp()
def make_socket(self, blocking=False):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(blocking)
self.addCleanup(sock.close)
return sock
def run_loop(self, coro):
return self.loop.run_until_complete(coro)
def prepare(self):
sock = self.make_socket()
proto = self.MyProto(self.loop)
port = support.find_unused_port()
server = self.run_loop(self.loop.create_server(
lambda: proto, support.HOST, port))
self.run_loop(self.loop.sock_connect(sock, (support.HOST, port)))
def cleanup():
proto.transport.close()
self.run_loop(proto.wait_closed())
server.close()
self.run_loop(server.wait_closed())
self.addCleanup(cleanup)
return sock, proto
def test_success(self):
sock, proto = self.prepare()
ret = self.run_loop(self.loop.sock_sendfile(sock, self.file))
sock.close()
self.run_loop(proto.wait_closed())
self.assertEqual(ret, len(self.DATA))
self.assertEqual(proto.data, self.DATA)
self.assertEqual(self.file.tell(), len(self.DATA))
def test_with_offset_and_count(self):
sock, proto = self.prepare()
ret = self.run_loop(self.loop.sock_sendfile(sock, self.file,
1000, 2000))
sock.close()
self.run_loop(proto.wait_closed())
self.assertEqual(proto.data, self.DATA[1000:3000])
self.assertEqual(self.file.tell(), 3000)
self.assertEqual(ret, 2000)
def test_sendfile_not_available(self):
sock, proto = self.prepare()
with mock.patch('asyncio.unix_events.os', spec=[]):
with self.assertRaisesRegex(base_events._SendfileNotAvailable,
"os[.]sendfile[(][)] is not available"):
self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
0, None))
self.assertEqual(self.file.tell(), 0)
def test_sendfile_not_a_file(self):
sock, proto = self.prepare()
f = object()
with self.assertRaisesRegex(base_events._SendfileNotAvailable,
"not a regular file"):
self.run_loop(self.loop._sock_sendfile_native(sock, f,
0, None))
self.assertEqual(self.file.tell(), 0)
def test_sendfile_iobuffer(self):
sock, proto = self.prepare()
f = io.BytesIO()
with self.assertRaisesRegex(base_events._SendfileNotAvailable,
"not a regular file"):
self.run_loop(self.loop._sock_sendfile_native(sock, f,
0, None))
self.assertEqual(self.file.tell(), 0)
def test_sendfile_not_regular_file(self):
sock, proto = self.prepare()
f = mock.Mock()
f.fileno.return_value = -1
with self.assertRaisesRegex(base_events._SendfileNotAvailable,
"not a regular file"):
self.run_loop(self.loop._sock_sendfile_native(sock, f,
0, None))
self.assertEqual(self.file.tell(), 0)
def test_sendfile_zero_size(self):
sock, proto = self.prepare()
fname = support.TESTFN + '.suffix'
with open(fname, 'wb') as f:
pass # make zero sized file
f = open(fname, 'rb')
self.addCleanup(f.close)
self.addCleanup(support.unlink, fname)
ret = self.run_loop(self.loop._sock_sendfile_native(sock, f,
0, None))
sock.close()
self.run_loop(proto.wait_closed())
self.assertEqual(ret, 0)
self.assertEqual(self.file.tell(), 0)
def test_mix_sendfile_and_regular_send(self):
buf = b'1234567890' * 1024 * 1024 # 10 MB
sock, proto = self.prepare()
self.run_loop(self.loop.sock_sendall(sock, buf))
ret = self.run_loop(self.loop.sock_sendfile(sock, self.file))
self.run_loop(self.loop.sock_sendall(sock, buf))
sock.close()
self.run_loop(proto.wait_closed())
self.assertEqual(ret, len(self.DATA))
expected = buf + self.DATA + buf
self.assertEqual(proto.data, expected)
self.assertEqual(self.file.tell(), len(self.DATA))
def test_cancel1(self):
sock, proto = self.prepare()
fut = self.loop.create_future()
fileno = self.file.fileno()
self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
0, None, len(self.DATA), 0)
fut.cancel()
with contextlib.suppress(asyncio.CancelledError):
self.run_loop(fut)
with self.assertRaises(KeyError):
self.loop._selector.get_key(sock)
def test_cancel2(self):
sock, proto = self.prepare()
fut = self.loop.create_future()
fileno = self.file.fileno()
self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
0, None, len(self.DATA), 0)
fut.cancel()
self.loop._sock_sendfile_native_impl(fut, sock.fileno(), sock, fileno,
0, None, len(self.DATA), 0)
with self.assertRaises(KeyError):
self.loop._selector.get_key(sock)
def test_blocking_error(self):
sock, proto = self.prepare()
fileno = self.file.fileno()
fut = mock.Mock()
fut.cancelled.return_value = False
with mock.patch('os.sendfile', side_effect=BlockingIOError()):
self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
0, None, len(self.DATA), 0)
key = self.loop._selector.get_key(sock)
self.assertIsNotNone(key)
fut.add_done_callback.assert_called_once_with(mock.ANY)
def test_os_error_first_call(self):
sock, proto = self.prepare()
fileno = self.file.fileno()
fut = self.loop.create_future()
with mock.patch('os.sendfile', side_effect=OSError()):
self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
0, None, len(self.DATA), 0)
with self.assertRaises(KeyError):
self.loop._selector.get_key(sock)
exc = fut.exception()
self.assertIsInstance(exc, base_events._SendfileNotAvailable)
self.assertEqual(0, self.file.tell())
def test_os_error_next_call(self):
sock, proto = self.prepare()
fileno = self.file.fileno()
fut = self.loop.create_future()
err = OSError()
with mock.patch('os.sendfile', side_effect=err):
self.loop._sock_sendfile_native_impl(fut, sock.fileno(),
sock, fileno,
1000, None, len(self.DATA),
1000)
with self.assertRaises(KeyError):
self.loop._selector.get_key(sock)
exc = fut.exception()
self.assertIs(exc, err)
self.assertEqual(1000, self.file.tell())
def test_exception(self):
sock, proto = self.prepare()
fileno = self.file.fileno()
fut = self.loop.create_future()
err = RuntimeError()
with mock.patch('os.sendfile', side_effect=err):
self.loop._sock_sendfile_native_impl(fut, sock.fileno(),
sock, fileno,
1000, None, len(self.DATA),
1000)
with self.assertRaises(KeyError):
self.loop._selector.get_key(sock)
exc = fut.exception()
self.assertIs(exc, err)
self.assertEqual(1000, self.file.tell())
class UnixReadPipeTransportTests(test_utils.TestCase):
......
Implement ``loop.sock_sendfile`` for asyncio event loop.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment