From e1d62e0b7cc842d6b75b4d480391f4a94e503255 Mon Sep 17 00:00:00 2001
From: Andrey Egorov <andr06@gmail.com>
Date: Tue, 14 Nov 2017 12:18:59 +0300
Subject: [PATCH] =?UTF-8?q?bpo-32015:=20Asyncio=20looping=20during=20simul?=
 =?UTF-8?q?taneously=20socket=20read/write=20an=E2=80=A6=20(#4386)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* bpo-32015: Asyncio cycling during simultaneously socket read/write and reconnection

* Tests fix

* Tests fix

* News add

* Add new unit tests
---
 Lib/asyncio/selector_events.py                | 37 +++++----
 Lib/test/test_asyncio/test_selector_events.py | 78 ++++++++++++++-----
 .../2017-11-13-17-48-33.bpo-32015.4nqRTD.rst  |  2 +
 3 files changed, 79 insertions(+), 38 deletions(-)
 create mode 100644 Misc/NEWS.d/next/Library/2017-11-13-17-48-33.bpo-32015.4nqRTD.rst

diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 00d9a7ede29..f3b278c9ea7 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -370,25 +370,25 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
         if self._debug and sock.gettimeout() != 0:
             raise ValueError("the socket must be non-blocking")
         fut = self.create_future()
-        self._sock_recv(fut, False, sock, n)
+        self._sock_recv(fut, None, sock, n)
         return fut
 
-    def _sock_recv(self, fut, registered, sock, n):
+    def _sock_recv(self, fut, registered_fd, sock, n):
         # _sock_recv() can add itself as an I/O callback if the operation can't
         # be done immediately. Don't use it directly, call sock_recv().
-        fd = sock.fileno()
-        if registered:
+        if registered_fd is not None:
             # Remove the callback early.  It should be rare that the
             # selector says the fd is ready but the call still returns
             # EAGAIN, and I am willing to take a hit in that case in
             # order to simplify the common case.
-            self.remove_reader(fd)
+            self.remove_reader(registered_fd)
         if fut.cancelled():
             return
         try:
             data = sock.recv(n)
         except (BlockingIOError, InterruptedError):
-            self.add_reader(fd, self._sock_recv, fut, True, sock, n)
+            fd = sock.fileno()
+            self.add_reader(fd, self._sock_recv, fut, fd, sock, n)
         except Exception as exc:
             fut.set_exception(exc)
         else:
@@ -405,25 +405,25 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
         if self._debug and sock.gettimeout() != 0:
             raise ValueError("the socket must be non-blocking")
         fut = self.create_future()
-        self._sock_recv_into(fut, False, sock, buf)
+        self._sock_recv_into(fut, None, sock, buf)
         return fut
 
-    def _sock_recv_into(self, fut, registered, sock, buf):
+    def _sock_recv_into(self, fut, registered_fd, sock, buf):
         # _sock_recv_into() can add itself as an I/O callback if the operation
         # can't be done immediately. Don't use it directly, call sock_recv_into().
-        fd = sock.fileno()
-        if registered:
+        if registered_fd is not None:
             # Remove the callback early.  It should be rare that the
             # selector says the fd is ready but the call still returns
             # EAGAIN, and I am willing to take a hit in that case in
             # order to simplify the common case.
-            self.remove_reader(fd)
+            self.remove_reader(registered_fd)
         if fut.cancelled():
             return
         try:
             nbytes = sock.recv_into(buf)
         except (BlockingIOError, InterruptedError):
-            self.add_reader(fd, self._sock_recv_into, fut, True, sock, buf)
+            fd = sock.fileno()
+            self.add_reader(fd, self._sock_recv_into, fut, fd, sock, buf)
         except Exception as exc:
             fut.set_exception(exc)
         else:
@@ -444,16 +444,14 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
             raise ValueError("the socket must be non-blocking")
         fut = self.create_future()
         if data:
-            self._sock_sendall(fut, False, sock, data)
+            self._sock_sendall(fut, None, sock, data)
         else:
             fut.set_result(None)
         return fut
 
-    def _sock_sendall(self, fut, registered, sock, data):
-        fd = sock.fileno()
-
-        if registered:
-            self.remove_writer(fd)
+    def _sock_sendall(self, fut, registered_fd, sock, data):
+        if registered_fd is not None:
+            self.remove_writer(registered_fd)
         if fut.cancelled():
             return
 
@@ -470,7 +468,8 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
         else:
             if n:
                 data = data[n:]
-            self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
+            fd = sock.fileno()
+            self.add_writer(fd, self._sock_sendall, fut, fd, sock, data)
 
     @coroutine
     def sock_connect(self, sock, address):
diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py
index c50b3e49565..a3d118e1881 100644
--- a/Lib/test/test_asyncio/test_selector_events.py
+++ b/Lib/test/test_asyncio/test_selector_events.py
@@ -182,7 +182,27 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
 
         f = self.loop.sock_recv(sock, 1024)
         self.assertIsInstance(f, asyncio.Future)
-        self.loop._sock_recv.assert_called_with(f, False, sock, 1024)
+        self.loop._sock_recv.assert_called_with(f, None, sock, 1024)
+
+    def test_sock_recv_reconnection(self):
+        sock = mock.Mock()
+        sock.fileno.return_value = 10
+        sock.recv.side_effect = BlockingIOError
+
+        self.loop.add_reader = mock.Mock()
+        self.loop.remove_reader = mock.Mock()
+        fut = self.loop.sock_recv(sock, 1024)
+        callback = self.loop.add_reader.call_args[0][1]
+        params = self.loop.add_reader.call_args[0][2:]
+
+        # emulate the old socket has closed, but the new one has
+        # the same fileno, so callback is called with old (closed) socket
+        sock.fileno.return_value = -1
+        sock.recv.side_effect = OSError(9)
+        callback(*params)
+
+        self.assertIsInstance(fut.exception(), OSError)
+        self.assertEqual((10,), self.loop.remove_reader.call_args[0])
 
     def test__sock_recv_canceled_fut(self):
         sock = mock.Mock()
@@ -190,7 +210,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
         f = asyncio.Future(loop=self.loop)
         f.cancel()
 
-        self.loop._sock_recv(f, False, sock, 1024)
+        self.loop._sock_recv(f, None, sock, 1024)
         self.assertFalse(sock.recv.called)
 
     def test__sock_recv_unregister(self):
@@ -201,7 +221,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
         f.cancel()
 
         self.loop.remove_reader = mock.Mock()
-        self.loop._sock_recv(f, True, sock, 1024)
+        self.loop._sock_recv(f, 10, sock, 1024)
         self.assertEqual((10,), self.loop.remove_reader.call_args[0])
 
     def test__sock_recv_tryagain(self):
@@ -211,8 +231,8 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
         sock.recv.side_effect = BlockingIOError
 
         self.loop.add_reader = mock.Mock()
-        self.loop._sock_recv(f, False, sock, 1024)
-        self.assertEqual((10, self.loop._sock_recv, f, True, sock, 1024),
+        self.loop._sock_recv(f, None, sock, 1024)
+        self.assertEqual((10, self.loop._sock_recv, f, 10, sock, 1024),
                          self.loop.add_reader.call_args[0])
 
     def test__sock_recv_exception(self):
@@ -221,7 +241,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
         sock.fileno.return_value = 10
         err = sock.recv.side_effect = OSError()
 
-        self.loop._sock_recv(f, False, sock, 1024)
+        self.loop._sock_recv(f, None, sock, 1024)
         self.assertIs(err, f.exception())
 
     def test_sock_sendall(self):
@@ -231,7 +251,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
         f = self.loop.sock_sendall(sock, b'data')
         self.assertIsInstance(f, asyncio.Future)
         self.assertEqual(
-            (f, False, sock, b'data'),
+            (f, None, sock, b'data'),
             self.loop._sock_sendall.call_args[0])
 
     def test_sock_sendall_nodata(self):
@@ -244,13 +264,33 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
         self.assertIsNone(f.result())
         self.assertFalse(self.loop._sock_sendall.called)
 
+    def test_sock_sendall_reconnection(self):
+        sock = mock.Mock()
+        sock.fileno.return_value = 10
+        sock.send.side_effect = BlockingIOError
+
+        self.loop.add_writer = mock.Mock()
+        self.loop.remove_writer = mock.Mock()
+        fut = self.loop.sock_sendall(sock, b'data')
+        callback = self.loop.add_writer.call_args[0][1]
+        params = self.loop.add_writer.call_args[0][2:]
+
+        # emulate the old socket has closed, but the new one has
+        # the same fileno, so callback is called with old (closed) socket
+        sock.fileno.return_value = -1
+        sock.send.side_effect = OSError(9)
+        callback(*params)
+
+        self.assertIsInstance(fut.exception(), OSError)
+        self.assertEqual((10,), self.loop.remove_writer.call_args[0])
+
     def test__sock_sendall_canceled_fut(self):
         sock = mock.Mock()
 
         f = asyncio.Future(loop=self.loop)
         f.cancel()
 
-        self.loop._sock_sendall(f, False, sock, b'data')
+        self.loop._sock_sendall(f, None, sock, b'data')
         self.assertFalse(sock.send.called)
 
     def test__sock_sendall_unregister(self):
@@ -261,7 +301,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
         f.cancel()
 
         self.loop.remove_writer = mock.Mock()
-        self.loop._sock_sendall(f, True, sock, b'data')
+        self.loop._sock_sendall(f, 10, sock, b'data')
         self.assertEqual((10,), self.loop.remove_writer.call_args[0])
 
     def test__sock_sendall_tryagain(self):
@@ -271,9 +311,9 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
         sock.send.side_effect = BlockingIOError
 
         self.loop.add_writer = mock.Mock()
-        self.loop._sock_sendall(f, False, sock, b'data')
+        self.loop._sock_sendall(f, None, sock, b'data')
         self.assertEqual(
-            (10, self.loop._sock_sendall, f, True, sock, b'data'),
+            (10, self.loop._sock_sendall, f, 10, sock, b'data'),
             self.loop.add_writer.call_args[0])
 
     def test__sock_sendall_interrupted(self):
@@ -283,9 +323,9 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
         sock.send.side_effect = InterruptedError
 
         self.loop.add_writer = mock.Mock()
-        self.loop._sock_sendall(f, False, sock, b'data')
+        self.loop._sock_sendall(f, None, sock, b'data')
         self.assertEqual(
-            (10, self.loop._sock_sendall, f, True, sock, b'data'),
+            (10, self.loop._sock_sendall, f, 10, sock, b'data'),
             self.loop.add_writer.call_args[0])
 
     def test__sock_sendall_exception(self):
@@ -294,7 +334,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
         sock.fileno.return_value = 10
         err = sock.send.side_effect = OSError()
 
-        self.loop._sock_sendall(f, False, sock, b'data')
+        self.loop._sock_sendall(f, None, sock, b'data')
         self.assertIs(f.exception(), err)
 
     def test__sock_sendall(self):
@@ -304,7 +344,7 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
         sock.fileno.return_value = 10
         sock.send.return_value = 4
 
-        self.loop._sock_sendall(f, False, sock, b'data')
+        self.loop._sock_sendall(f, None, sock, b'data')
         self.assertTrue(f.done())
         self.assertIsNone(f.result())
 
@@ -316,10 +356,10 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
         sock.send.return_value = 2
 
         self.loop.add_writer = mock.Mock()
-        self.loop._sock_sendall(f, False, sock, b'data')
+        self.loop._sock_sendall(f, None, sock, b'data')
         self.assertFalse(f.done())
         self.assertEqual(
-            (10, self.loop._sock_sendall, f, True, sock, b'ta'),
+            (10, self.loop._sock_sendall, f, 10, sock, b'ta'),
             self.loop.add_writer.call_args[0])
 
     def test__sock_sendall_none(self):
@@ -330,10 +370,10 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
         sock.send.return_value = 0
 
         self.loop.add_writer = mock.Mock()
-        self.loop._sock_sendall(f, False, sock, b'data')
+        self.loop._sock_sendall(f, None, sock, b'data')
         self.assertFalse(f.done())
         self.assertEqual(
-            (10, self.loop._sock_sendall, f, True, sock, b'data'),
+            (10, self.loop._sock_sendall, f, 10, sock, b'data'),
             self.loop.add_writer.call_args[0])
 
     def test_sock_connect_timeout(self):
diff --git a/Misc/NEWS.d/next/Library/2017-11-13-17-48-33.bpo-32015.4nqRTD.rst b/Misc/NEWS.d/next/Library/2017-11-13-17-48-33.bpo-32015.4nqRTD.rst
new file mode 100644
index 00000000000..6117e5625d7
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2017-11-13-17-48-33.bpo-32015.4nqRTD.rst
@@ -0,0 +1,2 @@
+Fixed the looping of asyncio in the case of reconnection the socket during
+waiting async read/write from/to the socket.
-- 
2.30.9