Commit 8433fdbe authored by Yury Selivanov's avatar Yury Selivanov

asyncio: Sync with the upstream

parent db9f3355
......@@ -28,6 +28,7 @@ import time
import traceback
import sys
import warnings
import weakref
from . import compat
from . import coroutines
......@@ -242,6 +243,17 @@ class BaseEventLoop(events.AbstractEventLoop):
self._task_factory = None
self._coroutine_wrapper_set = False
if hasattr(sys, 'get_asyncgen_hooks'):
# Python >= 3.6
# A weak set of all asynchronous generators that are
# being iterated by the loop.
self._asyncgens = weakref.WeakSet()
else:
self._asyncgens = None
# Set to True when `loop.shutdown_asyncgens` is called.
self._asyncgens_shutdown_called = False
def __repr__(self):
return ('<%s running=%s closed=%s debug=%s>'
% (self.__class__.__name__, self.is_running(),
......@@ -333,6 +345,48 @@ class BaseEventLoop(events.AbstractEventLoop):
if self._closed:
raise RuntimeError('Event loop is closed')
def _asyncgen_finalizer_hook(self, agen):
self._asyncgens.discard(agen)
if not self.is_closed():
self.create_task(agen.aclose())
def _asyncgen_firstiter_hook(self, agen):
if self._asyncgens_shutdown_called:
warnings.warn(
"asynchronous generator {!r} was scheduled after "
"loop.shutdown_asyncgens() call".format(agen),
ResourceWarning, source=self)
self._asyncgens.add(agen)
@coroutine
def shutdown_asyncgens(self):
"""Shutdown all active asynchronous generators."""
self._asyncgens_shutdown_called = True
if self._asyncgens is None or not len(self._asyncgens):
# If Python version is <3.6 or we don't have any asynchronous
# generators alive.
return
closing_agens = list(self._asyncgens)
self._asyncgens.clear()
shutdown_coro = tasks.gather(
*[ag.aclose() for ag in closing_agens],
return_exceptions=True,
loop=self)
results = yield from shutdown_coro
for result, agen in zip(results, closing_agens):
if isinstance(result, Exception):
self.call_exception_handler({
'message': 'an error occurred during closing of '
'asynchronous generator {!r}'.format(agen),
'exception': result,
'asyncgen': agen
})
def run_forever(self):
"""Run until stop() is called."""
self._check_closed()
......@@ -340,6 +394,10 @@ class BaseEventLoop(events.AbstractEventLoop):
raise RuntimeError('Event loop is running.')
self._set_coroutine_wrapper(self._debug)
self._thread_id = threading.get_ident()
if self._asyncgens is not None:
old_agen_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
finalizer=self._asyncgen_finalizer_hook)
try:
while True:
self._run_once()
......@@ -349,6 +407,8 @@ class BaseEventLoop(events.AbstractEventLoop):
self._stopping = False
self._thread_id = None
self._set_coroutine_wrapper(False)
if self._asyncgens is not None:
sys.set_asyncgen_hooks(*old_agen_hooks)
def run_until_complete(self, future):
"""Run until the Future is done.
......
......@@ -248,6 +248,10 @@ class AbstractEventLoop:
"""
raise NotImplementedError
def shutdown_asyncgens(self):
"""Shutdown all active asynchronous generators."""
raise NotImplementedError
# Methods scheduling callbacks. All these return Handles.
def _timer_handle_cancelled(self, handle):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment