Commit bb2fc5b2 authored by Victor Stinner's avatar Victor Stinner

Issue #21326: Add a new is_closed() method to asyncio.BaseEventLoop

Add BaseEventLoop._closed attribute and use it to check if the event loop was
closed or not, instead of checking different attributes in each subclass of
BaseEventLoop.

run_forever() and run_until_complete() methods now raise a RuntimeError('Event loop is
closed') exception if the event loop was closed.

BaseProactorEventLoop.close() now also cancels "accept futures".
parent 15386652
...@@ -119,6 +119,12 @@ Run an event loop ...@@ -119,6 +119,12 @@ Run an event loop
Callback scheduled after :meth:`stop` is called won't. However, those Callback scheduled after :meth:`stop` is called won't. However, those
callbacks will run if :meth:`run_forever` is called again later. callbacks will run if :meth:`run_forever` is called again later.
.. method:: BaseEventLoop.is_closed()
Returns ``True`` if the event loop was closed.
.. versionadded:: 3.4.2
.. method:: BaseEventLoop.close() .. method:: BaseEventLoop.close()
Close the event loop. The loop should not be running. Close the event loop. The loop should not be running.
......
...@@ -119,6 +119,7 @@ class Server(events.AbstractServer): ...@@ -119,6 +119,7 @@ class Server(events.AbstractServer):
class BaseEventLoop(events.AbstractEventLoop): class BaseEventLoop(events.AbstractEventLoop):
def __init__(self): def __init__(self):
self._closed = False
self._ready = collections.deque() self._ready = collections.deque()
self._scheduled = [] self._scheduled = []
self._default_executor = None self._default_executor = None
...@@ -128,6 +129,11 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -128,6 +129,11 @@ class BaseEventLoop(events.AbstractEventLoop):
self._exception_handler = None self._exception_handler = None
self._debug = False self._debug = False
def __repr__(self):
return ('<%s running=%s closed=%s debug=%s>'
% (self.__class__.__name__, self.is_running(),
self.is_closed(), self.get_debug()))
def _make_socket_transport(self, sock, protocol, waiter=None, *, def _make_socket_transport(self, sock, protocol, waiter=None, *,
extra=None, server=None): extra=None, server=None):
"""Create socket transport.""" """Create socket transport."""
...@@ -173,8 +179,13 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -173,8 +179,13 @@ class BaseEventLoop(events.AbstractEventLoop):
"""Process selector events.""" """Process selector events."""
raise NotImplementedError raise NotImplementedError
def _check_closed(self):
if self._closed:
raise RuntimeError('Event loop is closed')
def run_forever(self): def run_forever(self):
"""Run until stop() is called.""" """Run until stop() is called."""
self._check_closed()
if self._running: if self._running:
raise RuntimeError('Event loop is running.') raise RuntimeError('Event loop is running.')
self._running = True self._running = True
...@@ -198,6 +209,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -198,6 +209,7 @@ class BaseEventLoop(events.AbstractEventLoop):
Return the Future's result, or raise its exception. Return the Future's result, or raise its exception.
""" """
self._check_closed()
future = tasks.async(future, loop=self) future = tasks.async(future, loop=self)
future.add_done_callback(_raise_stop_error) future.add_done_callback(_raise_stop_error)
self.run_forever() self.run_forever()
...@@ -222,6 +234,9 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -222,6 +234,9 @@ class BaseEventLoop(events.AbstractEventLoop):
This clears the queues and shuts down the executor, This clears the queues and shuts down the executor,
but does not wait for the executor to finish. but does not wait for the executor to finish.
""" """
if self._closed:
return
self._closed = True
self._ready.clear() self._ready.clear()
self._scheduled.clear() self._scheduled.clear()
executor = self._default_executor executor = self._default_executor
...@@ -229,6 +244,10 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -229,6 +244,10 @@ class BaseEventLoop(events.AbstractEventLoop):
self._default_executor = None self._default_executor = None
executor.shutdown(wait=False) executor.shutdown(wait=False)
def is_closed(self):
"""Returns True if the event loop was closed."""
return self._closed
def is_running(self): def is_running(self):
"""Returns running status of event loop.""" """Returns running status of event loop."""
return self._running return self._running
......
...@@ -353,13 +353,14 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): ...@@ -353,13 +353,14 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
sock, protocol, waiter, extra) sock, protocol, waiter, extra)
def close(self): def close(self):
if self._proactor is not None: if self.is_closed():
self._close_self_pipe() return
self._proactor.close() self._stop_accept_futures()
self._proactor = None self._close_self_pipe()
self._selector = None self._proactor.close()
super().close() self._proactor = None
self._accept_futures.clear() self._selector = None
super().close()
def sock_recv(self, sock, n): def sock_recv(self, sock, n):
return self._proactor.recv(sock, n) return self._proactor.recv(sock, n)
...@@ -428,6 +429,8 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): ...@@ -428,6 +429,8 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
self._make_socket_transport( self._make_socket_transport(
conn, protocol, conn, protocol,
extra={'peername': addr}, server=server) extra={'peername': addr}, server=server)
if self.is_closed():
return
f = self._proactor.accept(sock) f = self._proactor.accept(sock)
except OSError as exc: except OSError as exc:
if sock.fileno() != -1: if sock.fileno() != -1:
...@@ -448,8 +451,12 @@ class BaseProactorEventLoop(base_events.BaseEventLoop): ...@@ -448,8 +451,12 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
def _process_events(self, event_list): def _process_events(self, event_list):
pass # XXX hard work currently done in poll pass # XXX hard work currently done in poll
def _stop_serving(self, sock): def _stop_accept_futures(self):
for future in self._accept_futures.values(): for future in self._accept_futures.values():
future.cancel() future.cancel()
self._accept_futures.clear()
def _stop_serving(self, sock):
self._stop_accept_futures()
self._proactor._stop_serving(sock) self._proactor._stop_serving(sock)
sock.close() sock.close()
...@@ -55,11 +55,13 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): ...@@ -55,11 +55,13 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
return _SelectorDatagramTransport(self, sock, protocol, address, extra) return _SelectorDatagramTransport(self, sock, protocol, address, extra)
def close(self): def close(self):
if self.is_closed():
return
self._close_self_pipe()
if self._selector is not None: if self._selector is not None:
self._close_self_pipe()
self._selector.close() self._selector.close()
self._selector = None self._selector = None
super().close() super().close()
def _socketpair(self): def _socketpair(self):
raise NotImplementedError raise NotImplementedError
...@@ -143,8 +145,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): ...@@ -143,8 +145,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
def add_reader(self, fd, callback, *args): def add_reader(self, fd, callback, *args):
"""Add a reader callback.""" """Add a reader callback."""
if self._selector is None: self._check_closed()
raise RuntimeError('Event loop is closed')
handle = events.Handle(callback, args, self) handle = events.Handle(callback, args, self)
try: try:
key = self._selector.get_key(fd) key = self._selector.get_key(fd)
...@@ -160,7 +161,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): ...@@ -160,7 +161,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
def remove_reader(self, fd): def remove_reader(self, fd):
"""Remove a reader callback.""" """Remove a reader callback."""
if self._selector is None: if self.is_closed():
return False return False
try: try:
key = self._selector.get_key(fd) key = self._selector.get_key(fd)
...@@ -182,8 +183,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): ...@@ -182,8 +183,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
def add_writer(self, fd, callback, *args): def add_writer(self, fd, callback, *args):
"""Add a writer callback..""" """Add a writer callback.."""
if self._selector is None: self._check_closed()
raise RuntimeError('Event loop is closed')
handle = events.Handle(callback, args, self) handle = events.Handle(callback, args, self)
try: try:
key = self._selector.get_key(fd) key = self._selector.get_key(fd)
...@@ -199,7 +199,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): ...@@ -199,7 +199,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
def remove_writer(self, fd): def remove_writer(self, fd):
"""Remove a writer callback.""" """Remove a writer callback."""
if self._selector is None: if self.is_closed():
return False return False
try: try:
key = self._selector.get_key(fd) key = self._selector.get_key(fd)
......
...@@ -52,6 +52,20 @@ class BaseEventLoopTests(unittest.TestCase): ...@@ -52,6 +52,20 @@ class BaseEventLoopTests(unittest.TestCase):
gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m) gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m)
self.assertRaises(NotImplementedError, next, iter(gen)) self.assertRaises(NotImplementedError, next, iter(gen))
def test_close(self):
self.assertFalse(self.loop.is_closed())
self.loop.close()
self.assertTrue(self.loop.is_closed())
# it should be possible to call close() more than once
self.loop.close()
self.loop.close()
# operation blocked when the loop is closed
f = asyncio.Future(loop=self.loop)
self.assertRaises(RuntimeError, self.loop.run_forever)
self.assertRaises(RuntimeError, self.loop.run_until_complete, f)
def test__add_callback_handle(self): def test__add_callback_handle(self):
h = asyncio.Handle(lambda: False, (), self.loop) h = asyncio.Handle(lambda: False, (), self.loop)
......
...@@ -80,7 +80,10 @@ class BaseSelectorEventLoopTests(unittest.TestCase): ...@@ -80,7 +80,10 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.loop._selector.close() self.loop._selector.close()
self.loop._selector = selector = mock.Mock() self.loop._selector = selector = mock.Mock()
self.assertFalse(self.loop.is_closed())
self.loop.close() self.loop.close()
self.assertTrue(self.loop.is_closed())
self.assertIsNone(self.loop._selector) self.assertIsNone(self.loop._selector)
self.assertIsNone(self.loop._csock) self.assertIsNone(self.loop._csock)
self.assertIsNone(self.loop._ssock) self.assertIsNone(self.loop._ssock)
...@@ -89,9 +92,20 @@ class BaseSelectorEventLoopTests(unittest.TestCase): ...@@ -89,9 +92,20 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
csock.close.assert_called_with() csock.close.assert_called_with()
remove_reader.assert_called_with(7) remove_reader.assert_called_with(7)
# it should be possible to call close() more than once
self.loop.close() self.loop.close()
self.loop.close() self.loop.close()
# operation blocked when the loop is closed
f = asyncio.Future(loop=self.loop)
self.assertRaises(RuntimeError, self.loop.run_forever)
self.assertRaises(RuntimeError, self.loop.run_until_complete, f)
fd = 0
def callback():
pass
self.assertRaises(RuntimeError, self.loop.add_reader, fd, callback)
self.assertRaises(RuntimeError, self.loop.add_writer, fd, callback)
def test_close_no_selector(self): def test_close_no_selector(self):
ssock = self.loop._ssock ssock = self.loop._ssock
csock = self.loop._csock csock = self.loop._csock
...@@ -101,9 +115,6 @@ class BaseSelectorEventLoopTests(unittest.TestCase): ...@@ -101,9 +115,6 @@ class BaseSelectorEventLoopTests(unittest.TestCase):
self.loop._selector = None self.loop._selector = None
self.loop.close() self.loop.close()
self.assertIsNone(self.loop._selector) self.assertIsNone(self.loop._selector)
self.assertFalse(ssock.close.called)
self.assertFalse(csock.close.called)
self.assertFalse(remove_reader.called)
def test_socketpair(self): def test_socketpair(self):
self.assertRaises(NotImplementedError, self.loop._socketpair) self.assertRaises(NotImplementedError, self.loop._socketpair)
......
...@@ -22,6 +22,10 @@ Core and Builtins ...@@ -22,6 +22,10 @@ Core and Builtins
Library Library
------- -------
- Issue #21326: Add a new is_closed() method to asyncio.BaseEventLoop.
run_forever() and run_until_complete() methods of asyncio.BaseEventLoop now
raise an exception if the event loop was closed.
- Issue #21310: Fixed possible resource leak in failed open(). - Issue #21310: Fixed possible resource leak in failed open().
- Issue #21677: Fixed chaining nonnormalized exceptions in io close() methods. - Issue #21677: Fixed chaining nonnormalized exceptions in io close() methods.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment