Commit 525f40d2 authored by Antoine Pitrou's avatar Antoine Pitrou Committed by Yury Selivanov

bpo-31819: Add AbstractEventLoop.sock_recv_into() (#4051)

* bpo-31819: Add AbstractEventLoop.sock_recv_into()

* Add NEWS

* Add doc
parent ea2ef5d0
...@@ -554,6 +554,21 @@ Low-level socket operations ...@@ -554,6 +554,21 @@ Low-level socket operations
This method is a :ref:`coroutine <coroutine>`. This method is a :ref:`coroutine <coroutine>`.
.. coroutinemethod:: AbstractEventLoop.sock_recv_into(sock, buf)
Receive data from the socket. Modeled after blocking
:meth:`socket.socket.recv_into` method.
The received data is written into *buf* (a writable buffer).
The return value is the number of bytes written.
With :class:`SelectorEventLoop` event loop, the socket *sock* must be
non-blocking.
This method is a :ref:`coroutine <coroutine>`.
.. versionadded:: 3.7
.. coroutinemethod:: AbstractEventLoop.sock_sendall(sock, data) .. coroutinemethod:: AbstractEventLoop.sock_sendall(sock, data)
Send data to the socket. Modeled after blocking Send data to the socket. Modeled after blocking
......
...@@ -461,6 +461,9 @@ class AbstractEventLoop: ...@@ -461,6 +461,9 @@ class AbstractEventLoop:
def sock_recv(self, sock, nbytes): def sock_recv(self, sock, nbytes):
raise NotImplementedError raise NotImplementedError
def sock_recv_into(self, sock, buf):
raise NotImplementedError
def sock_sendall(self, sock, data): def sock_sendall(self, sock, data):
raise NotImplementedError raise NotImplementedError
......
...@@ -439,6 +439,9 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): ...@@ -439,6 +439,9 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
def sock_recv(self, sock, n): def sock_recv(self, sock, n):
return self._proactor.recv(sock, n) return self._proactor.recv(sock, n)
def sock_recv_into(self, sock, buf):
return self._proactor.recv_into(sock, buf)
def sock_sendall(self, sock, data): def sock_sendall(self, sock, data):
return self._proactor.send(sock, data) return self._proactor.send(sock, data)
......
...@@ -386,6 +386,41 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): ...@@ -386,6 +386,41 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
else: else:
fut.set_result(data) fut.set_result(data)
def sock_recv_into(self, sock, buf):
"""Receive data from the socket.
The received data is written into *buf* (a writable buffer).
The return value is the number of bytes written.
This method is a coroutine.
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
fut = self.create_future()
self._sock_recv_into(fut, False, sock, buf)
return fut
def _sock_recv_into(self, fut, registered, sock, buf):
# _sock_recv_into() can add itself as an I/O callback if the operation
# can't be done immediately. Don't use it directly, call sock_recv_into().
fd = sock.fileno()
if registered:
# Remove the callback early. It should be rare that the
# selector says the fd is ready but the call still returns
# EAGAIN, and I am willing to take a hit in that case in
# order to simplify the common case.
self.remove_reader(fd)
if fut.cancelled():
return
try:
nbytes = sock.recv_into(buf)
except (BlockingIOError, InterruptedError):
self.add_reader(fd, self._sock_recv_into, fut, True, sock, buf)
except Exception as exc:
fut.set_exception(exc)
else:
fut.set_result(nbytes)
def sock_sendall(self, sock, data): def sock_sendall(self, sock, data):
"""Send data to the socket. """Send data to the socket.
......
...@@ -448,6 +448,28 @@ class IocpProactor: ...@@ -448,6 +448,28 @@ class IocpProactor:
return self._register(ov, conn, finish_recv) return self._register(ov, conn, finish_recv)
def recv_into(self, conn, buf, flags=0):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)
try:
if isinstance(conn, socket.socket):
ov.WSARecvInto(conn.fileno(), buf, flags)
else:
ov.ReadFileInto(conn.fileno(), buf)
except BrokenPipeError:
return self._result(b'')
def finish_recv(trans, key, ov):
try:
return ov.getresult()
except OSError as exc:
if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
raise ConnectionResetError(*exc.args)
else:
raise
return self._register(ov, conn, finish_recv)
def send(self, conn, buf, flags=0): def send(self, conn, buf, flags=0):
self._register_with_iocp(conn) self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL) ov = _overlapped.Overlapped(NULL)
......
...@@ -425,6 +425,9 @@ class EventLoopTestsMixin: ...@@ -425,6 +425,9 @@ class EventLoopTestsMixin:
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
self.loop.run_until_complete( self.loop.run_until_complete(
self.loop.sock_recv(sock, 1024)) self.loop.sock_recv(sock, 1024))
with self.assertRaises(ValueError):
self.loop.run_until_complete(
self.loop.sock_recv_into(sock, bytearray()))
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
self.loop.run_until_complete( self.loop.run_until_complete(
self.loop.sock_accept(sock)) self.loop.sock_accept(sock))
...@@ -443,16 +446,37 @@ class EventLoopTestsMixin: ...@@ -443,16 +446,37 @@ class EventLoopTestsMixin:
sock.close() sock.close()
self.assertTrue(data.startswith(b'HTTP/1.0 200 OK')) self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
def _basetest_sock_recv_into(self, httpd, sock):
# same as _basetest_sock_client_ops, but using sock_recv_into
sock.setblocking(False)
self.loop.run_until_complete(
self.loop.sock_connect(sock, httpd.address))
self.loop.run_until_complete(
self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
data = bytearray(1024)
with memoryview(data) as buf:
nbytes = self.loop.run_until_complete(
self.loop.sock_recv_into(sock, buf[:1024]))
# consume data
self.loop.run_until_complete(
self.loop.sock_recv_into(sock, buf[nbytes:]))
sock.close()
self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
def test_sock_client_ops(self): def test_sock_client_ops(self):
with test_utils.run_test_server() as httpd: with test_utils.run_test_server() as httpd:
sock = socket.socket() sock = socket.socket()
self._basetest_sock_client_ops(httpd, sock) self._basetest_sock_client_ops(httpd, sock)
sock = socket.socket()
self._basetest_sock_recv_into(httpd, sock)
@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_unix_sock_client_ops(self): def test_unix_sock_client_ops(self):
with test_utils.run_test_unix_server() as httpd: with test_utils.run_test_unix_server() as httpd:
sock = socket.socket(socket.AF_UNIX) sock = socket.socket(socket.AF_UNIX)
self._basetest_sock_client_ops(httpd, sock) self._basetest_sock_client_ops(httpd, sock)
sock = socket.socket(socket.AF_UNIX)
self._basetest_sock_recv_into(httpd, sock)
def test_sock_client_fail(self): def test_sock_client_fail(self):
# Make sure that we will get an unused port # Make sure that we will get an unused port
...@@ -2612,6 +2636,8 @@ class AbstractEventLoopTests(unittest.TestCase): ...@@ -2612,6 +2636,8 @@ class AbstractEventLoopTests(unittest.TestCase):
NotImplementedError, loop.remove_writer, 1) NotImplementedError, loop.remove_writer, 1)
self.assertRaises( self.assertRaises(
NotImplementedError, loop.sock_recv, f, 10) NotImplementedError, loop.sock_recv, f, 10)
self.assertRaises(
NotImplementedError, loop.sock_recv_into, f, 10)
self.assertRaises( self.assertRaises(
NotImplementedError, loop.sock_sendall, f, 10) NotImplementedError, loop.sock_sendall, f, 10)
self.assertRaises( self.assertRaises(
......
...@@ -489,6 +489,11 @@ class BaseProactorEventLoopTests(test_utils.TestCase): ...@@ -489,6 +489,11 @@ class BaseProactorEventLoopTests(test_utils.TestCase):
self.loop.sock_recv(self.sock, 1024) self.loop.sock_recv(self.sock, 1024)
self.proactor.recv.assert_called_with(self.sock, 1024) self.proactor.recv.assert_called_with(self.sock, 1024)
def test_sock_recv_into(self):
buf = bytearray(10)
self.loop.sock_recv_into(self.sock, buf)
self.proactor.recv_into.assert_called_with(self.sock, buf)
def test_sock_sendall(self): def test_sock_sendall(self):
self.loop.sock_sendall(self.sock, b'data') self.loop.sock_sendall(self.sock, b'data')
self.proactor.send.assert_called_with(self.sock, b'data') self.proactor.send.assert_called_with(self.sock, b'data')
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment