Commit 44d1a591 authored by Andrew Svetlov's avatar Andrew Svetlov Committed by GitHub

bpo-32250: Implement asyncio.current_task() and asyncio.all_tasks() (#4799)

parent 95084026
...@@ -528,6 +528,28 @@ Task functions ...@@ -528,6 +528,28 @@ Task functions
the event loop object used by the underlying task or coroutine. If it's the event loop object used by the underlying task or coroutine. If it's
not provided, the default event loop is used. not provided, the default event loop is used.
.. function:: current_task(loop=None):
Return the current running :class:`Task` instance or ``None``, if
no task is running.
If *loop* is ``None`` :func:`get_running_loop` is used to get
the current loop.
.. versionadded:: 3.7
.. function:: all_tasks(loop=None):
Return a set of :class:`Task` objects created for the loop.
If *loop* is ``None`` :func:`get_event_loop` is used for getting
current loop.
.. versionadded:: 3.7
.. function:: as_completed(fs, \*, loop=None, timeout=None) .. function:: as_completed(fs, \*, loop=None, timeout=None)
Return an iterator whose values, when waited for, are :class:`Future` Return an iterator whose values, when waited for, are :class:`Future`
......
...@@ -5,6 +5,8 @@ __all__ = ( ...@@ -5,6 +5,8 @@ __all__ = (
'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
'wait', 'wait_for', 'as_completed', 'sleep', 'wait', 'wait_for', 'as_completed', 'sleep',
'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
'current_task', 'all_tasks',
'_register_task', '_unregister_task', '_enter_task', '_leave_task',
) )
import concurrent.futures import concurrent.futures
...@@ -21,6 +23,20 @@ from . import futures ...@@ -21,6 +23,20 @@ from . import futures
from .coroutines import coroutine from .coroutines import coroutine
def current_task(loop=None):
"""Return a currently executed task."""
if loop is None:
loop = events.get_running_loop()
return _current_tasks.get(loop)
def all_tasks(loop=None):
"""Return a set of all tasks for the loop."""
if loop is None:
loop = events.get_event_loop()
return {t for t, l in _all_tasks.items() if l is loop}
class Task(futures.Future): class Task(futures.Future):
"""A coroutine wrapped in a Future.""" """A coroutine wrapped in a Future."""
...@@ -33,13 +49,6 @@ class Task(futures.Future): ...@@ -33,13 +49,6 @@ class Task(futures.Future):
# _wakeup(). When _fut_waiter is not None, one of its callbacks # _wakeup(). When _fut_waiter is not None, one of its callbacks
# must be _wakeup(). # must be _wakeup().
# Weak set containing all tasks alive.
_all_tasks = weakref.WeakSet()
# Dictionary containing tasks that are currently active in
# all running event loops. {EventLoop: Task}
_current_tasks = {}
# If False, don't log a message if the task is destroyed whereas its # If False, don't log a message if the task is destroyed whereas its
# status is still pending # status is still pending
_log_destroy_pending = True _log_destroy_pending = True
...@@ -52,9 +61,13 @@ class Task(futures.Future): ...@@ -52,9 +61,13 @@ class Task(futures.Future):
None is returned when called not in the context of a Task. None is returned when called not in the context of a Task.
""" """
warnings.warn("Task.current_task() is deprecated, "
"use asyncio.current_task() instead",
PendingDeprecationWarning,
stacklevel=2)
if loop is None: if loop is None:
loop = events.get_event_loop() loop = events.get_event_loop()
return cls._current_tasks.get(loop) return current_task(loop)
@classmethod @classmethod
def all_tasks(cls, loop=None): def all_tasks(cls, loop=None):
...@@ -62,9 +75,11 @@ class Task(futures.Future): ...@@ -62,9 +75,11 @@ class Task(futures.Future):
By default all tasks for the current event loop are returned. By default all tasks for the current event loop are returned.
""" """
if loop is None: warnings.warn("Task.all_tasks() is deprecated, "
loop = events.get_event_loop() "use asyncio.all_tasks() instead",
return {t for t in cls._all_tasks if t._loop is loop} PendingDeprecationWarning,
stacklevel=2)
return all_tasks(loop)
def __init__(self, coro, *, loop=None): def __init__(self, coro, *, loop=None):
super().__init__(loop=loop) super().__init__(loop=loop)
...@@ -81,7 +96,7 @@ class Task(futures.Future): ...@@ -81,7 +96,7 @@ class Task(futures.Future):
self._coro = coro self._coro = coro
self._loop.call_soon(self._step) self._loop.call_soon(self._step)
self.__class__._all_tasks.add(self) _register_task(self._loop, self)
def __del__(self): def __del__(self):
if self._state == futures._PENDING and self._log_destroy_pending: if self._state == futures._PENDING and self._log_destroy_pending:
...@@ -173,7 +188,7 @@ class Task(futures.Future): ...@@ -173,7 +188,7 @@ class Task(futures.Future):
coro = self._coro coro = self._coro
self._fut_waiter = None self._fut_waiter = None
self.__class__._current_tasks[self._loop] = self _enter_task(self._loop, self)
# Call either coro.throw(exc) or coro.send(None). # Call either coro.throw(exc) or coro.send(None).
try: try:
if exc is None: if exc is None:
...@@ -237,7 +252,7 @@ class Task(futures.Future): ...@@ -237,7 +252,7 @@ class Task(futures.Future):
new_exc = RuntimeError(f'Task got bad yield: {result!r}') new_exc = RuntimeError(f'Task got bad yield: {result!r}')
self._loop.call_soon(self._step, new_exc) self._loop.call_soon(self._step, new_exc)
finally: finally:
self.__class__._current_tasks.pop(self._loop) _leave_task(self._loop, self)
self = None # Needed to break cycles when an exception occurs. self = None # Needed to break cycles when an exception occurs.
def _wakeup(self, future): def _wakeup(self, future):
...@@ -715,3 +730,61 @@ def run_coroutine_threadsafe(coro, loop): ...@@ -715,3 +730,61 @@ def run_coroutine_threadsafe(coro, loop):
loop.call_soon_threadsafe(callback) loop.call_soon_threadsafe(callback)
return future return future
# WeakKeyDictionary of {Task: EventLoop} containing all tasks alive.
# Task should be a weak reference to remove entry on task garbage
# collection, EventLoop is required
# to not access to private task._loop attribute.
_all_tasks = weakref.WeakKeyDictionary()
# Dictionary containing tasks that are currently active in
# all running event loops. {EventLoop: Task}
_current_tasks = {}
def _register_task(loop, task):
"""Register a new task in asyncio as executed by loop.
Returns None.
"""
_all_tasks[task] = loop
def _enter_task(loop, task):
current_task = _current_tasks.get(loop)
if current_task is not None:
raise RuntimeError(f"Cannot enter into task {task!r} while another "
f"task {current_task!r} is being executed.")
_current_tasks[loop] = task
def _leave_task(loop, task):
current_task = _current_tasks.get(loop)
if current_task is not task:
raise RuntimeError(f"Leaving task {task!r} does not match "
f"the current task {current_task!r}.")
del _current_tasks[loop]
def _unregister_task(loop, task):
_all_tasks.pop(task, None)
_py_register_task = _register_task
_py_unregister_task = _unregister_task
_py_enter_task = _enter_task
_py_leave_task = _leave_task
try:
from _asyncio import (_register_task, _unregister_task,
_enter_task, _leave_task,
_all_tasks, _current_tasks)
except ImportError:
pass
else:
_c_register_task = _register_task
_c_unregister_task = _unregister_task
_c_enter_task = _enter_task
_c_leave_task = _leave_task
This diff is collapsed.
Implement ``asyncio.current_task()`` and ``asyncio.all_tasks()``. Add
helpers intended to be used by alternative task implementations:
``asyncio._register_task``, ``asyncio._enter_task``, ``asyncio._leave_task``
and ``asyncio._unregister_task``. Deprecate ``asyncio.Task.current_task()``
and ``asyncio.Task.all_tasks()``.
This diff is collapsed.
...@@ -595,4 +595,142 @@ _asyncio_get_running_loop(PyObject *module, PyObject *Py_UNUSED(ignored)) ...@@ -595,4 +595,142 @@ _asyncio_get_running_loop(PyObject *module, PyObject *Py_UNUSED(ignored))
{ {
return _asyncio_get_running_loop_impl(module); return _asyncio_get_running_loop_impl(module);
} }
/*[clinic end generated code: output=21e5424c3a5572b0 input=a9049054013a1b77]*/
PyDoc_STRVAR(_asyncio__register_task__doc__,
"_register_task($module, /, loop, task)\n"
"--\n"
"\n"
"Register a new task in asyncio as executed by loop.\n"
"\n"
"Returns None.");
#define _ASYNCIO__REGISTER_TASK_METHODDEF \
{"_register_task", (PyCFunction)_asyncio__register_task, METH_FASTCALL|METH_KEYWORDS, _asyncio__register_task__doc__},
static PyObject *
_asyncio__register_task_impl(PyObject *module, PyObject *loop,
PyObject *task);
static PyObject *
_asyncio__register_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
{
PyObject *return_value = NULL;
static const char * const _keywords[] = {"loop", "task", NULL};
static _PyArg_Parser _parser = {"OO:_register_task", _keywords, 0};
PyObject *loop;
PyObject *task;
if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser,
&loop, &task)) {
goto exit;
}
return_value = _asyncio__register_task_impl(module, loop, task);
exit:
return return_value;
}
PyDoc_STRVAR(_asyncio__unregister_task__doc__,
"_unregister_task($module, /, loop, task)\n"
"--\n"
"\n"
"Unregister a task.\n"
"\n"
"Returns None.");
#define _ASYNCIO__UNREGISTER_TASK_METHODDEF \
{"_unregister_task", (PyCFunction)_asyncio__unregister_task, METH_FASTCALL|METH_KEYWORDS, _asyncio__unregister_task__doc__},
static PyObject *
_asyncio__unregister_task_impl(PyObject *module, PyObject *loop,
PyObject *task);
static PyObject *
_asyncio__unregister_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
{
PyObject *return_value = NULL;
static const char * const _keywords[] = {"loop", "task", NULL};
static _PyArg_Parser _parser = {"OO:_unregister_task", _keywords, 0};
PyObject *loop;
PyObject *task;
if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser,
&loop, &task)) {
goto exit;
}
return_value = _asyncio__unregister_task_impl(module, loop, task);
exit:
return return_value;
}
PyDoc_STRVAR(_asyncio__enter_task__doc__,
"_enter_task($module, /, loop, task)\n"
"--\n"
"\n"
"Enter into task execution or resume suspended task.\n"
"\n"
"Task belongs to loop.\n"
"\n"
"Returns None.");
#define _ASYNCIO__ENTER_TASK_METHODDEF \
{"_enter_task", (PyCFunction)_asyncio__enter_task, METH_FASTCALL|METH_KEYWORDS, _asyncio__enter_task__doc__},
static PyObject *
_asyncio__enter_task_impl(PyObject *module, PyObject *loop, PyObject *task);
static PyObject *
_asyncio__enter_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
{
PyObject *return_value = NULL;
static const char * const _keywords[] = {"loop", "task", NULL};
static _PyArg_Parser _parser = {"OO:_enter_task", _keywords, 0};
PyObject *loop;
PyObject *task;
if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser,
&loop, &task)) {
goto exit;
}
return_value = _asyncio__enter_task_impl(module, loop, task);
exit:
return return_value;
}
PyDoc_STRVAR(_asyncio__leave_task__doc__,
"_leave_task($module, /, loop, task)\n"
"--\n"
"\n"
"Leave task execution or suspend a task.\n"
"\n"
"Task belongs to loop.\n"
"\n"
"Returns None.");
#define _ASYNCIO__LEAVE_TASK_METHODDEF \
{"_leave_task", (PyCFunction)_asyncio__leave_task, METH_FASTCALL|METH_KEYWORDS, _asyncio__leave_task__doc__},
static PyObject *
_asyncio__leave_task_impl(PyObject *module, PyObject *loop, PyObject *task);
static PyObject *
_asyncio__leave_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
{
PyObject *return_value = NULL;
static const char * const _keywords[] = {"loop", "task", NULL};
static _PyArg_Parser _parser = {"OO:_leave_task", _keywords, 0};
PyObject *loop;
PyObject *task;
if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser,
&loop, &task)) {
goto exit;
}
return_value = _asyncio__leave_task_impl(module, loop, task);
exit:
return return_value;
}
/*[clinic end generated code: output=0033af17965b51b4 input=a9049054013a1b77]*/
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment