Commit f951d28a authored by Victor Stinner's avatar Victor Stinner

asyncio: sync with Tulip, add a new asyncio.coroutines module

parent 61f32cb5
...@@ -18,6 +18,7 @@ if sys.platform == 'win32': ...@@ -18,6 +18,7 @@ if sys.platform == 'win32':
import _overlapped # Will also be exported. import _overlapped # Will also be exported.
# This relies on each of the submodules having an __all__ variable. # This relies on each of the submodules having an __all__ variable.
from .coroutines import *
from .events import * from .events import *
from .futures import * from .futures import *
from .locks import * from .locks import *
...@@ -34,7 +35,8 @@ else: ...@@ -34,7 +35,8 @@ else:
from .unix_events import * # pragma: no cover from .unix_events import * # pragma: no cover
__all__ = (events.__all__ + __all__ = (coroutines.__all__ +
events.__all__ +
futures.__all__ + futures.__all__ +
locks.__all__ + locks.__all__ +
protocols.__all__ + protocols.__all__ +
......
...@@ -26,9 +26,11 @@ import time ...@@ -26,9 +26,11 @@ import time
import os import os
import sys import sys
from . import coroutines
from . import events from . import events
from . import futures from . import futures
from . import tasks from . import tasks
from .coroutines import coroutine
from .log import logger from .log import logger
...@@ -118,7 +120,7 @@ class Server(events.AbstractServer): ...@@ -118,7 +120,7 @@ class Server(events.AbstractServer):
if not waiter.done(): if not waiter.done():
waiter.set_result(waiter) waiter.set_result(waiter)
@tasks.coroutine @coroutine
def wait_closed(self): def wait_closed(self):
if self.sockets is None or self.waiters is None: if self.sockets is None or self.waiters is None:
return return
...@@ -175,7 +177,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -175,7 +177,7 @@ class BaseEventLoop(events.AbstractEventLoop):
"""Create write pipe transport.""" """Create write pipe transport."""
raise NotImplementedError raise NotImplementedError
@tasks.coroutine @coroutine
def _make_subprocess_transport(self, protocol, args, shell, def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize, stdin, stdout, stderr, bufsize,
extra=None, **kwargs): extra=None, **kwargs):
...@@ -298,7 +300,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -298,7 +300,7 @@ class BaseEventLoop(events.AbstractEventLoop):
def call_at(self, when, callback, *args): def call_at(self, when, callback, *args):
"""Like call_later(), but uses an absolute time.""" """Like call_later(), but uses an absolute time."""
if tasks.iscoroutinefunction(callback): if coroutines.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with call_at()") raise TypeError("coroutines cannot be used with call_at()")
if self._debug: if self._debug:
self._assert_is_current_event_loop() self._assert_is_current_event_loop()
...@@ -324,7 +326,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -324,7 +326,7 @@ class BaseEventLoop(events.AbstractEventLoop):
return handle return handle
def _call_soon(self, callback, args, check_loop): def _call_soon(self, callback, args, check_loop):
if tasks.iscoroutinefunction(callback): if coroutines.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with call_soon()") raise TypeError("coroutines cannot be used with call_soon()")
if self._debug and check_loop: if self._debug and check_loop:
self._assert_is_current_event_loop() self._assert_is_current_event_loop()
...@@ -361,7 +363,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -361,7 +363,7 @@ class BaseEventLoop(events.AbstractEventLoop):
return handle return handle
def run_in_executor(self, executor, callback, *args): def run_in_executor(self, executor, callback, *args):
if tasks.iscoroutinefunction(callback): if coroutines.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with run_in_executor()") raise TypeError("coroutines cannot be used with run_in_executor()")
if isinstance(callback, events.Handle): if isinstance(callback, events.Handle):
assert not args assert not args
...@@ -389,7 +391,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -389,7 +391,7 @@ class BaseEventLoop(events.AbstractEventLoop):
def getnameinfo(self, sockaddr, flags=0): def getnameinfo(self, sockaddr, flags=0):
return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags) return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
@tasks.coroutine @coroutine
def create_connection(self, protocol_factory, host=None, port=None, *, def create_connection(self, protocol_factory, host=None, port=None, *,
ssl=None, family=0, proto=0, flags=0, sock=None, ssl=None, family=0, proto=0, flags=0, sock=None,
local_addr=None, server_hostname=None): local_addr=None, server_hostname=None):
...@@ -505,7 +507,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -505,7 +507,7 @@ class BaseEventLoop(events.AbstractEventLoop):
sock, protocol_factory, ssl, server_hostname) sock, protocol_factory, ssl, server_hostname)
return transport, protocol return transport, protocol
@tasks.coroutine @coroutine
def _create_connection_transport(self, sock, protocol_factory, ssl, def _create_connection_transport(self, sock, protocol_factory, ssl,
server_hostname): server_hostname):
protocol = protocol_factory() protocol = protocol_factory()
...@@ -521,7 +523,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -521,7 +523,7 @@ class BaseEventLoop(events.AbstractEventLoop):
yield from waiter yield from waiter
return transport, protocol return transport, protocol
@tasks.coroutine @coroutine
def create_datagram_endpoint(self, protocol_factory, def create_datagram_endpoint(self, protocol_factory,
local_addr=None, remote_addr=None, *, local_addr=None, remote_addr=None, *,
family=0, proto=0, flags=0): family=0, proto=0, flags=0):
...@@ -593,7 +595,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -593,7 +595,7 @@ class BaseEventLoop(events.AbstractEventLoop):
transport = self._make_datagram_transport(sock, protocol, r_addr) transport = self._make_datagram_transport(sock, protocol, r_addr)
return transport, protocol return transport, protocol
@tasks.coroutine @coroutine
def create_server(self, protocol_factory, host=None, port=None, def create_server(self, protocol_factory, host=None, port=None,
*, *,
family=socket.AF_UNSPEC, family=socket.AF_UNSPEC,
...@@ -672,7 +674,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -672,7 +674,7 @@ class BaseEventLoop(events.AbstractEventLoop):
self._start_serving(protocol_factory, sock, ssl, server) self._start_serving(protocol_factory, sock, ssl, server)
return server return server
@tasks.coroutine @coroutine
def connect_read_pipe(self, protocol_factory, pipe): def connect_read_pipe(self, protocol_factory, pipe):
protocol = protocol_factory() protocol = protocol_factory()
waiter = futures.Future(loop=self) waiter = futures.Future(loop=self)
...@@ -680,7 +682,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -680,7 +682,7 @@ class BaseEventLoop(events.AbstractEventLoop):
yield from waiter yield from waiter
return transport, protocol return transport, protocol
@tasks.coroutine @coroutine
def connect_write_pipe(self, protocol_factory, pipe): def connect_write_pipe(self, protocol_factory, pipe):
protocol = protocol_factory() protocol = protocol_factory()
waiter = futures.Future(loop=self) waiter = futures.Future(loop=self)
...@@ -688,7 +690,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -688,7 +690,7 @@ class BaseEventLoop(events.AbstractEventLoop):
yield from waiter yield from waiter
return transport, protocol return transport, protocol
@tasks.coroutine @coroutine
def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE, def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=False, shell=True, bufsize=0, universal_newlines=False, shell=True, bufsize=0,
...@@ -706,7 +708,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -706,7 +708,7 @@ class BaseEventLoop(events.AbstractEventLoop):
protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
return transport, protocol return transport, protocol
@tasks.coroutine @coroutine
def subprocess_exec(self, protocol_factory, program, *args, def subprocess_exec(self, protocol_factory, program, *args,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=False, stderr=subprocess.PIPE, universal_newlines=False,
......
...@@ -2,8 +2,8 @@ import collections ...@@ -2,8 +2,8 @@ import collections
import subprocess import subprocess
from . import protocols from . import protocols
from . import tasks
from . import transports from . import transports
from .coroutines import coroutine
class BaseSubprocessTransport(transports.SubprocessTransport): class BaseSubprocessTransport(transports.SubprocessTransport):
...@@ -65,7 +65,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport): ...@@ -65,7 +65,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
def kill(self): def kill(self):
self._proc.kill() self._proc.kill()
@tasks.coroutine @coroutine
def _post_init(self): def _post_init(self):
proc = self._proc proc = self._proc
loop = self._loop loop = self._loop
......
__all__ = ['coroutine',
'iscoroutinefunction', 'iscoroutine']
import functools
import inspect
import os
import sys
import traceback
from . import events
from . import futures
from .log import logger
# If you set _DEBUG to true, @coroutine will wrap the resulting
# generator objects in a CoroWrapper instance (defined below). That
# instance will log a message when the generator is never iterated
# over, which may happen when you forget to use "yield from" with a
# coroutine call. Note that the value of the _DEBUG flag is taken
# when the decorator is used, so to be of any use it must be set
# before you define your coroutines. A downside of using this feature
# is that tracebacks show entries for the CoroWrapper.__next__ method
# when _DEBUG is true.
_DEBUG = (not sys.flags.ignore_environment
and bool(os.environ.get('PYTHONASYNCIODEBUG')))
_PY35 = (sys.version_info >= (3, 5))
class CoroWrapper:
# Wrapper for coroutine in _DEBUG mode.
def __init__(self, gen, func):
assert inspect.isgenerator(gen), gen
self.gen = gen
self.func = func
self._source_traceback = traceback.extract_stack(sys._getframe(1))
def __iter__(self):
return self
def __next__(self):
return next(self.gen)
def send(self, *value):
# We use `*value` because of a bug in CPythons prior
# to 3.4.1. See issue #21209 and test_yield_from_corowrapper
# for details. This workaround should be removed in 3.5.0.
if len(value) == 1:
value = value[0]
return self.gen.send(value)
def throw(self, exc):
return self.gen.throw(exc)
def close(self):
return self.gen.close()
@property
def gi_frame(self):
return self.gen.gi_frame
@property
def gi_running(self):
return self.gen.gi_running
@property
def gi_code(self):
return self.gen.gi_code
def __del__(self):
# Be careful accessing self.gen.frame -- self.gen might not exist.
gen = getattr(self, 'gen', None)
frame = getattr(gen, 'gi_frame', None)
if frame is not None and frame.f_lasti == -1:
func = events._format_callback(self.func, ())
tb = ''.join(traceback.format_list(self._source_traceback))
message = ('Coroutine %s was never yielded from\n'
'Coroutine object created at (most recent call last):\n'
'%s'
% (func, tb.rstrip()))
logger.error(message)
def coroutine(func):
"""Decorator to mark coroutines.
If the coroutine is not yielded from before it is destroyed,
an error message is logged.
"""
if inspect.isgeneratorfunction(func):
coro = func
else:
@functools.wraps(func)
def coro(*args, **kw):
res = func(*args, **kw)
if isinstance(res, futures.Future) or inspect.isgenerator(res):
res = yield from res
return res
if not _DEBUG:
wrapper = coro
else:
@functools.wraps(func)
def wrapper(*args, **kwds):
w = CoroWrapper(coro(*args, **kwds), func)
if w._source_traceback:
del w._source_traceback[-1]
w.__name__ = func.__name__
if _PY35:
w.__qualname__ = func.__qualname__
w.__doc__ = func.__doc__
return w
wrapper._is_coroutine = True # For iscoroutinefunction().
return wrapper
def iscoroutinefunction(func):
"""Return True if func is a decorated coroutine function."""
return getattr(func, '_is_coroutine', False)
def iscoroutine(obj):
"""Return True if obj is a coroutine object."""
return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
def _format_coroutine(coro):
assert iscoroutine(coro)
if _PY35:
coro_name = coro.__qualname__
else:
coro_name = coro.__name__
filename = coro.gi_code.co_filename
if coro.gi_frame is not None:
lineno = coro.gi_frame.f_lineno
return '%s() at %s:%s' % (coro_name, filename, lineno)
else:
lineno = coro.gi_code.co_firstlineno
return '%s() done at %s:%s' % (coro_name, filename, lineno)
...@@ -6,7 +6,7 @@ import collections ...@@ -6,7 +6,7 @@ import collections
from . import events from . import events
from . import futures from . import futures
from . import tasks from .coroutines import coroutine
class _ContextManager: class _ContextManager:
...@@ -112,7 +112,7 @@ class Lock: ...@@ -112,7 +112,7 @@ class Lock:
"""Return True if lock is acquired.""" """Return True if lock is acquired."""
return self._locked return self._locked
@tasks.coroutine @coroutine
def acquire(self): def acquire(self):
"""Acquire a lock. """Acquire a lock.
...@@ -225,7 +225,7 @@ class Event: ...@@ -225,7 +225,7 @@ class Event:
to true again.""" to true again."""
self._value = False self._value = False
@tasks.coroutine @coroutine
def wait(self): def wait(self):
"""Block until the internal flag is true. """Block until the internal flag is true.
...@@ -278,7 +278,7 @@ class Condition: ...@@ -278,7 +278,7 @@ class Condition:
extra = '{},waiters:{}'.format(extra, len(self._waiters)) extra = '{},waiters:{}'.format(extra, len(self._waiters))
return '<{} [{}]>'.format(res[1:-1], extra) return '<{} [{}]>'.format(res[1:-1], extra)
@tasks.coroutine @coroutine
def wait(self): def wait(self):
"""Wait until notified. """Wait until notified.
...@@ -306,7 +306,7 @@ class Condition: ...@@ -306,7 +306,7 @@ class Condition:
finally: finally:
yield from self.acquire() yield from self.acquire()
@tasks.coroutine @coroutine
def wait_for(self, predicate): def wait_for(self, predicate):
"""Wait until a predicate becomes true. """Wait until a predicate becomes true.
...@@ -402,7 +402,7 @@ class Semaphore: ...@@ -402,7 +402,7 @@ class Semaphore:
"""Returns True if semaphore can not be acquired immediately.""" """Returns True if semaphore can not be acquired immediately."""
return self._value == 0 return self._value == 0
@tasks.coroutine @coroutine
def acquire(self): def acquire(self):
"""Acquire a semaphore. """Acquire a semaphore.
......
...@@ -10,10 +10,12 @@ import socket ...@@ -10,10 +10,12 @@ import socket
if hasattr(socket, 'AF_UNIX'): if hasattr(socket, 'AF_UNIX'):
__all__.extend(['open_unix_connection', 'start_unix_server']) __all__.extend(['open_unix_connection', 'start_unix_server'])
from . import coroutines
from . import events from . import events
from . import futures from . import futures
from . import protocols from . import protocols
from . import tasks from . import tasks
from .coroutines import coroutine
_DEFAULT_LIMIT = 2**16 _DEFAULT_LIMIT = 2**16
...@@ -33,7 +35,7 @@ class IncompleteReadError(EOFError): ...@@ -33,7 +35,7 @@ class IncompleteReadError(EOFError):
self.expected = expected self.expected = expected
@tasks.coroutine @coroutine
def open_connection(host=None, port=None, *, def open_connection(host=None, port=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds): loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""A wrapper for create_connection() returning a (reader, writer) pair. """A wrapper for create_connection() returning a (reader, writer) pair.
...@@ -63,7 +65,7 @@ def open_connection(host=None, port=None, *, ...@@ -63,7 +65,7 @@ def open_connection(host=None, port=None, *,
return reader, writer return reader, writer
@tasks.coroutine @coroutine
def start_server(client_connected_cb, host=None, port=None, *, def start_server(client_connected_cb, host=None, port=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds): loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""Start a socket server, call back for each client connected. """Start a socket server, call back for each client connected.
...@@ -102,7 +104,7 @@ def start_server(client_connected_cb, host=None, port=None, *, ...@@ -102,7 +104,7 @@ def start_server(client_connected_cb, host=None, port=None, *,
if hasattr(socket, 'AF_UNIX'): if hasattr(socket, 'AF_UNIX'):
# UNIX Domain Sockets are supported on this platform # UNIX Domain Sockets are supported on this platform
@tasks.coroutine @coroutine
def open_unix_connection(path=None, *, def open_unix_connection(path=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds): loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""Similar to `open_connection` but works with UNIX Domain Sockets.""" """Similar to `open_connection` but works with UNIX Domain Sockets."""
...@@ -116,7 +118,7 @@ if hasattr(socket, 'AF_UNIX'): ...@@ -116,7 +118,7 @@ if hasattr(socket, 'AF_UNIX'):
return reader, writer return reader, writer
@tasks.coroutine @coroutine
def start_unix_server(client_connected_cb, path=None, *, def start_unix_server(client_connected_cb, path=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds): loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""Similar to `start_server` but works with UNIX Domain Sockets.""" """Similar to `start_server` but works with UNIX Domain Sockets."""
...@@ -210,7 +212,7 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): ...@@ -210,7 +212,7 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
self._loop) self._loop)
res = self._client_connected_cb(self._stream_reader, res = self._client_connected_cb(self._stream_reader,
self._stream_writer) self._stream_writer)
if tasks.iscoroutine(res): if coroutines.iscoroutine(res):
tasks.Task(res, loop=self._loop) tasks.Task(res, loop=self._loop)
def connection_lost(self, exc): def connection_lost(self, exc):
...@@ -373,7 +375,7 @@ class StreamReader: ...@@ -373,7 +375,7 @@ class StreamReader:
'already waiting for incoming data' % func_name) 'already waiting for incoming data' % func_name)
return futures.Future(loop=self._loop) return futures.Future(loop=self._loop)
@tasks.coroutine @coroutine
def readline(self): def readline(self):
if self._exception is not None: if self._exception is not None:
raise self._exception raise self._exception
...@@ -410,7 +412,7 @@ class StreamReader: ...@@ -410,7 +412,7 @@ class StreamReader:
self._maybe_resume_transport() self._maybe_resume_transport()
return bytes(line) return bytes(line)
@tasks.coroutine @coroutine
def read(self, n=-1): def read(self, n=-1):
if self._exception is not None: if self._exception is not None:
raise self._exception raise self._exception
...@@ -449,7 +451,7 @@ class StreamReader: ...@@ -449,7 +451,7 @@ class StreamReader:
self._maybe_resume_transport() self._maybe_resume_transport()
return data return data
@tasks.coroutine @coroutine
def readexactly(self, n): def readexactly(self, n):
if self._exception is not None: if self._exception is not None:
raise self._exception raise self._exception
......
...@@ -8,6 +8,7 @@ from . import futures ...@@ -8,6 +8,7 @@ from . import futures
from . import protocols from . import protocols
from . import streams from . import streams
from . import tasks from . import tasks
from .coroutines import coroutine
PIPE = subprocess.PIPE PIPE = subprocess.PIPE
...@@ -94,7 +95,7 @@ class Process: ...@@ -94,7 +95,7 @@ class Process:
def returncode(self): def returncode(self):
return self._transport.get_returncode() return self._transport.get_returncode()
@tasks.coroutine @coroutine
def wait(self): def wait(self):
"""Wait until the process exit and return the process return code.""" """Wait until the process exit and return the process return code."""
returncode = self._transport.get_returncode() returncode = self._transport.get_returncode()
...@@ -122,17 +123,17 @@ class Process: ...@@ -122,17 +123,17 @@ class Process:
self._check_alive() self._check_alive()
self._transport.kill() self._transport.kill()
@tasks.coroutine @coroutine
def _feed_stdin(self, input): def _feed_stdin(self, input):
self.stdin.write(input) self.stdin.write(input)
yield from self.stdin.drain() yield from self.stdin.drain()
self.stdin.close() self.stdin.close()
@tasks.coroutine @coroutine
def _noop(self): def _noop(self):
return None return None
@tasks.coroutine @coroutine
def _read_stream(self, fd): def _read_stream(self, fd):
transport = self._transport.get_pipe_transport(fd) transport = self._transport.get_pipe_transport(fd)
if fd == 2: if fd == 2:
...@@ -144,7 +145,7 @@ class Process: ...@@ -144,7 +145,7 @@ class Process:
transport.close() transport.close()
return output return output
@tasks.coroutine @coroutine
def communicate(self, input=None): def communicate(self, input=None):
if input: if input:
stdin = self._feed_stdin(input) stdin = self._feed_stdin(input)
...@@ -164,7 +165,7 @@ class Process: ...@@ -164,7 +165,7 @@ class Process:
return (stdout, stderr) return (stdout, stderr)
@tasks.coroutine @coroutine
def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
loop=None, limit=streams._DEFAULT_LIMIT, **kwds): loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
if loop is None: if loop is None:
...@@ -178,7 +179,7 @@ def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, ...@@ -178,7 +179,7 @@ def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
yield from protocol.waiter yield from protocol.waiter
return Process(transport, protocol, loop) return Process(transport, protocol, loop)
@tasks.coroutine @coroutine
def create_subprocess_exec(program, *args, stdin=None, stdout=None, def create_subprocess_exec(program, *args, stdin=None, stdout=None,
stderr=None, loop=None, stderr=None, loop=None,
limit=streams._DEFAULT_LIMIT, **kwds): limit=streams._DEFAULT_LIMIT, **kwds):
......
"""Support for tasks, coroutines and the scheduler.""" """Support for tasks, coroutines and the scheduler."""
__all__ = ['coroutine', 'Task', __all__ = ['Task',
'iscoroutinefunction', 'iscoroutine',
'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
'wait', 'wait_for', 'as_completed', 'sleep', 'async', 'wait', 'wait_for', 'as_completed', 'sleep', 'async',
'gather', 'shield', 'gather', 'shield',
...@@ -11,146 +10,20 @@ import concurrent.futures ...@@ -11,146 +10,20 @@ import concurrent.futures
import functools import functools
import inspect import inspect
import linecache import linecache
import os
import sys import sys
import traceback import traceback
import weakref import weakref
from . import coroutines
from . import events from . import events
from . import futures from . import futures
from .coroutines import coroutine
from .log import logger from .log import logger
# If you set _DEBUG to true, @coroutine will wrap the resulting
# generator objects in a CoroWrapper instance (defined below). That
# instance will log a message when the generator is never iterated
# over, which may happen when you forget to use "yield from" with a
# coroutine call. Note that the value of the _DEBUG flag is taken
# when the decorator is used, so to be of any use it must be set
# before you define your coroutines. A downside of using this feature
# is that tracebacks show entries for the CoroWrapper.__next__ method
# when _DEBUG is true.
_DEBUG = (not sys.flags.ignore_environment
and bool(os.environ.get('PYTHONASYNCIODEBUG')))
_PY34 = (sys.version_info >= (3, 4)) _PY34 = (sys.version_info >= (3, 4))
_PY35 = (sys.version_info >= (3, 5)) _PY35 = (sys.version_info >= (3, 5))
class CoroWrapper:
# Wrapper for coroutine in _DEBUG mode.
def __init__(self, gen, func):
assert inspect.isgenerator(gen), gen
self.gen = gen
self.func = func
self._source_traceback = traceback.extract_stack(sys._getframe(1))
def __iter__(self):
return self
def __next__(self):
return next(self.gen)
def send(self, *value):
# We use `*value` because of a bug in CPythons prior
# to 3.4.1. See issue #21209 and test_yield_from_corowrapper
# for details. This workaround should be removed in 3.5.0.
if len(value) == 1:
value = value[0]
return self.gen.send(value)
def throw(self, exc):
return self.gen.throw(exc)
def close(self):
return self.gen.close()
@property
def gi_frame(self):
return self.gen.gi_frame
@property
def gi_running(self):
return self.gen.gi_running
@property
def gi_code(self):
return self.gen.gi_code
def __del__(self):
# Be careful accessing self.gen.frame -- self.gen might not exist.
gen = getattr(self, 'gen', None)
frame = getattr(gen, 'gi_frame', None)
if frame is not None and frame.f_lasti == -1:
func = events._format_callback(self.func, ())
tb = ''.join(traceback.format_list(self._source_traceback))
message = ('Coroutine %s was never yielded from\n'
'Coroutine object created at (most recent call last):\n'
'%s'
% (func, tb.rstrip()))
logger.error(message)
def coroutine(func):
"""Decorator to mark coroutines.
If the coroutine is not yielded from before it is destroyed,
an error message is logged.
"""
if inspect.isgeneratorfunction(func):
coro = func
else:
@functools.wraps(func)
def coro(*args, **kw):
res = func(*args, **kw)
if isinstance(res, futures.Future) or inspect.isgenerator(res):
res = yield from res
return res
if not _DEBUG:
wrapper = coro
else:
@functools.wraps(func)
def wrapper(*args, **kwds):
w = CoroWrapper(coro(*args, **kwds), func)
if w._source_traceback:
del w._source_traceback[-1]
w.__name__ = func.__name__
if _PY35:
w.__qualname__ = func.__qualname__
w.__doc__ = func.__doc__
return w
wrapper._is_coroutine = True # For iscoroutinefunction().
return wrapper
def iscoroutinefunction(func):
"""Return True if func is a decorated coroutine function."""
return getattr(func, '_is_coroutine', False)
def iscoroutine(obj):
"""Return True if obj is a coroutine object."""
return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
def _format_coroutine(coro):
assert iscoroutine(coro)
if _PY35:
coro_name = coro.__qualname__
else:
coro_name = coro.__name__
filename = coro.gi_code.co_filename
if coro.gi_frame is not None:
lineno = coro.gi_frame.f_lineno
return '%s() at %s:%s' % (coro_name, filename, lineno)
else:
lineno = coro.gi_code.co_firstlineno
return '%s() done at %s:%s' % (coro_name, filename, lineno)
class Task(futures.Future): class Task(futures.Future):
"""A coroutine wrapped in a Future.""" """A coroutine wrapped in a Future."""
...@@ -193,7 +66,7 @@ class Task(futures.Future): ...@@ -193,7 +66,7 @@ class Task(futures.Future):
return {t for t in cls._all_tasks if t._loop is loop} return {t for t in cls._all_tasks if t._loop is loop}
def __init__(self, coro, *, loop=None): def __init__(self, coro, *, loop=None):
assert iscoroutine(coro), repr(coro) # Not a coroutine function! assert coroutines.iscoroutine(coro), repr(coro) # Not a coroutine function!
super().__init__(loop=loop) super().__init__(loop=loop)
if self._source_traceback: if self._source_traceback:
del self._source_traceback[-1] del self._source_traceback[-1]
...@@ -225,7 +98,7 @@ class Task(futures.Future): ...@@ -225,7 +98,7 @@ class Task(futures.Future):
else: else:
info.append(self._state.lower()) info.append(self._state.lower())
info.append(_format_coroutine(self._coro)) info.append(coroutines._format_coroutine(self._coro))
if self._state == futures._FINISHED: if self._state == futures._FINISHED:
info.append(self._format_result()) info.append(self._format_result())
...@@ -444,7 +317,7 @@ def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): ...@@ -444,7 +317,7 @@ def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Note: This does not raise TimeoutError! Futures that aren't done Note: This does not raise TimeoutError! Futures that aren't done
when the timeout occurs are returned in the second set. when the timeout occurs are returned in the second set.
""" """
if isinstance(fs, futures.Future) or iscoroutine(fs): if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__) raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
if not fs: if not fs:
raise ValueError('Set of coroutines/Futures is empty.') raise ValueError('Set of coroutines/Futures is empty.')
...@@ -566,7 +439,7 @@ def as_completed(fs, *, loop=None, timeout=None): ...@@ -566,7 +439,7 @@ def as_completed(fs, *, loop=None, timeout=None):
Note: The futures 'f' are not necessarily members of fs. Note: The futures 'f' are not necessarily members of fs.
""" """
if isinstance(fs, futures.Future) or iscoroutine(fs): if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__) raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
loop = loop if loop is not None else events.get_event_loop() loop = loop if loop is not None else events.get_event_loop()
todo = {async(f, loop=loop) for f in set(fs)} todo = {async(f, loop=loop) for f in set(fs)}
...@@ -624,7 +497,7 @@ def async(coro_or_future, *, loop=None): ...@@ -624,7 +497,7 @@ def async(coro_or_future, *, loop=None):
if loop is not None and loop is not coro_or_future._loop: if loop is not None and loop is not coro_or_future._loop:
raise ValueError('loop argument must agree with Future') raise ValueError('loop argument must agree with Future')
return coro_or_future return coro_or_future
elif iscoroutine(coro_or_future): elif coroutines.iscoroutine(coro_or_future):
task = Task(coro_or_future, loop=loop) task = Task(coro_or_future, loop=loop)
if task._source_traceback: if task._source_traceback:
del task._source_traceback[-1] del task._source_traceback[-1]
......
...@@ -27,6 +27,7 @@ from . import events ...@@ -27,6 +27,7 @@ from . import events
from . import futures from . import futures
from . import selectors from . import selectors
from . import tasks from . import tasks
from .coroutines import coroutine
if sys.platform == 'win32': # pragma: no cover if sys.platform == 'win32': # pragma: no cover
...@@ -43,7 +44,7 @@ def dummy_ssl_context(): ...@@ -43,7 +44,7 @@ def dummy_ssl_context():
def run_briefly(loop): def run_briefly(loop):
@tasks.coroutine @coroutine
def once(): def once():
pass pass
gen = once() gen = once()
......
...@@ -16,8 +16,8 @@ from . import base_subprocess ...@@ -16,8 +16,8 @@ from . import base_subprocess
from . import constants from . import constants
from . import events from . import events
from . import selector_events from . import selector_events
from . import tasks
from . import transports from . import transports
from .coroutines import coroutine
from .log import logger from .log import logger
...@@ -147,7 +147,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): ...@@ -147,7 +147,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
extra=None): extra=None):
return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra) return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
@tasks.coroutine @coroutine
def _make_subprocess_transport(self, protocol, args, shell, def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize, stdin, stdout, stderr, bufsize,
extra=None, **kwargs): extra=None, **kwargs):
...@@ -164,7 +164,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): ...@@ -164,7 +164,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
def _child_watcher_callback(self, pid, returncode, transp): def _child_watcher_callback(self, pid, returncode, transp):
self.call_soon_threadsafe(transp._process_exited, returncode) self.call_soon_threadsafe(transp._process_exited, returncode)
@tasks.coroutine @coroutine
def create_unix_connection(self, protocol_factory, path, *, def create_unix_connection(self, protocol_factory, path, *,
ssl=None, sock=None, ssl=None, sock=None,
server_hostname=None): server_hostname=None):
...@@ -199,7 +199,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): ...@@ -199,7 +199,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
sock, protocol_factory, ssl, server_hostname) sock, protocol_factory, ssl, server_hostname)
return transport, protocol return transport, protocol
@tasks.coroutine @coroutine
def create_unix_server(self, protocol_factory, path=None, *, def create_unix_server(self, protocol_factory, path=None, *,
sock=None, backlog=100, ssl=None): sock=None, backlog=100, ssl=None):
if isinstance(ssl, bool): if isinstance(ssl, bool):
......
...@@ -14,8 +14,9 @@ from . import proactor_events ...@@ -14,8 +14,9 @@ from . import proactor_events
from . import selector_events from . import selector_events
from . import tasks from . import tasks
from . import windows_utils from . import windows_utils
from .log import logger
from . import _overlapped from . import _overlapped
from .coroutines import coroutine
from .log import logger
__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor', __all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
...@@ -129,7 +130,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop): ...@@ -129,7 +130,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
def _socketpair(self): def _socketpair(self):
return windows_utils.socketpair() return windows_utils.socketpair()
@tasks.coroutine @coroutine
def create_pipe_connection(self, protocol_factory, address): def create_pipe_connection(self, protocol_factory, address):
f = self._proactor.connect_pipe(address) f = self._proactor.connect_pipe(address)
pipe = yield from f pipe = yield from f
...@@ -138,7 +139,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop): ...@@ -138,7 +139,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
extra={'addr': address}) extra={'addr': address})
return trans, protocol return trans, protocol
@tasks.coroutine @coroutine
def start_serving_pipe(self, protocol_factory, address): def start_serving_pipe(self, protocol_factory, address):
server = PipeServer(address) server = PipeServer(address)
...@@ -172,7 +173,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop): ...@@ -172,7 +173,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
self.call_soon(loop) self.call_soon(loop)
return [server] return [server]
@tasks.coroutine @coroutine
def _make_subprocess_transport(self, protocol, args, shell, def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize, stdin, stdout, stderr, bufsize,
extra=None, **kwargs): extra=None, **kwargs):
...@@ -258,7 +259,7 @@ class IocpProactor: ...@@ -258,7 +259,7 @@ class IocpProactor:
conn.settimeout(listener.gettimeout()) conn.settimeout(listener.gettimeout())
return conn, conn.getpeername() return conn, conn.getpeername()
@tasks.coroutine @coroutine
def accept_coro(future, conn): def accept_coro(future, conn):
# Coroutine closing the accept socket if the future is cancelled # Coroutine closing the accept socket if the future is cancelled
try: try:
......
...@@ -11,7 +11,7 @@ from test.script_helper import assert_python_ok ...@@ -11,7 +11,7 @@ from test.script_helper import assert_python_ok
from unittest import mock from unittest import mock
import asyncio import asyncio
from asyncio import tasks from asyncio import coroutines
from asyncio import test_utils from asyncio import test_utils
...@@ -193,7 +193,7 @@ class TaskTests(test_utils.TestCase): ...@@ -193,7 +193,7 @@ class TaskTests(test_utils.TestCase):
# attribute). # attribute).
coro_name = 'notmuch' coro_name = 'notmuch'
coro_qualname = 'TaskTests.test_task_repr_coro_decorator.<locals>.notmuch' coro_qualname = 'TaskTests.test_task_repr_coro_decorator.<locals>.notmuch'
elif tasks._DEBUG: elif coroutines._DEBUG:
# In debug mode, @coroutine decorator uses CoroWrapper which gets # In debug mode, @coroutine decorator uses CoroWrapper which gets
# its name (__name__ attribute) from the wrapped coroutine # its name (__name__ attribute) from the wrapped coroutine
# function. # function.
...@@ -1475,23 +1475,23 @@ class TaskTests(test_utils.TestCase): ...@@ -1475,23 +1475,23 @@ class TaskTests(test_utils.TestCase):
self.assertIsNone(gen.gi_frame) self.assertIsNone(gen.gi_frame)
# Save debug flag. # Save debug flag.
old_debug = asyncio.tasks._DEBUG old_debug = asyncio.coroutines._DEBUG
try: try:
# Test with debug flag cleared. # Test with debug flag cleared.
asyncio.tasks._DEBUG = False asyncio.coroutines._DEBUG = False
check() check()
# Test with debug flag set. # Test with debug flag set.
asyncio.tasks._DEBUG = True asyncio.coroutines._DEBUG = True
check() check()
finally: finally:
# Restore original debug flag. # Restore original debug flag.
asyncio.tasks._DEBUG = old_debug asyncio.coroutines._DEBUG = old_debug
def test_yield_from_corowrapper(self): def test_yield_from_corowrapper(self):
old_debug = asyncio.tasks._DEBUG old_debug = asyncio.coroutines._DEBUG
asyncio.tasks._DEBUG = True asyncio.coroutines._DEBUG = True
try: try:
@asyncio.coroutine @asyncio.coroutine
def t1(): def t1():
...@@ -1511,7 +1511,7 @@ class TaskTests(test_utils.TestCase): ...@@ -1511,7 +1511,7 @@ class TaskTests(test_utils.TestCase):
val = self.loop.run_until_complete(task) val = self.loop.run_until_complete(task)
self.assertEqual(val, (1, 2, 3)) self.assertEqual(val, (1, 2, 3))
finally: finally:
asyncio.tasks._DEBUG = old_debug asyncio.coroutines._DEBUG = old_debug
def test_yield_from_corowrapper_send(self): def test_yield_from_corowrapper_send(self):
def foo(): def foo():
...@@ -1519,7 +1519,7 @@ class TaskTests(test_utils.TestCase): ...@@ -1519,7 +1519,7 @@ class TaskTests(test_utils.TestCase):
return a return a
def call(arg): def call(arg):
cw = asyncio.tasks.CoroWrapper(foo(), foo) cw = asyncio.coroutines.CoroWrapper(foo(), foo)
cw.send(None) cw.send(None)
try: try:
cw.send(arg) cw.send(arg)
...@@ -1534,7 +1534,7 @@ class TaskTests(test_utils.TestCase): ...@@ -1534,7 +1534,7 @@ class TaskTests(test_utils.TestCase):
def test_corowrapper_weakref(self): def test_corowrapper_weakref(self):
wd = weakref.WeakValueDictionary() wd = weakref.WeakValueDictionary()
def foo(): yield from [] def foo(): yield from []
cw = asyncio.tasks.CoroWrapper(foo(), foo) cw = asyncio.coroutines.CoroWrapper(foo(), foo)
wd['cw'] = cw # Would fail without __weakref__ slot. wd['cw'] = cw # Would fail without __weakref__ slot.
cw.gen = None # Suppress warning from __del__. cw.gen = None # Suppress warning from __del__.
...@@ -1580,16 +1580,16 @@ class TaskTests(test_utils.TestCase): ...@@ -1580,16 +1580,16 @@ class TaskTests(test_utils.TestCase):
}) })
mock_handler.reset_mock() mock_handler.reset_mock()
@mock.patch('asyncio.tasks.logger') @mock.patch('asyncio.coroutines.logger')
def test_coroutine_never_yielded(self, m_log): def test_coroutine_never_yielded(self, m_log):
debug = asyncio.tasks._DEBUG debug = asyncio.coroutines._DEBUG
try: try:
asyncio.tasks._DEBUG = True asyncio.coroutines._DEBUG = True
@asyncio.coroutine @asyncio.coroutine
def coro_noop(): def coro_noop():
pass pass
finally: finally:
asyncio.tasks._DEBUG = debug asyncio.coroutines._DEBUG = debug
tb_filename = __file__ tb_filename = __file__
tb_lineno = sys._getframe().f_lineno + 1 tb_lineno = sys._getframe().f_lineno + 1
...@@ -1695,8 +1695,8 @@ class GatherTestsBase: ...@@ -1695,8 +1695,8 @@ class GatherTestsBase:
def test_env_var_debug(self): def test_env_var_debug(self):
code = '\n'.join(( code = '\n'.join((
'import asyncio.tasks', 'import asyncio.coroutines',
'print(asyncio.tasks._DEBUG)')) 'print(asyncio.coroutines._DEBUG)'))
# Test with -E to not fail if the unit test was run with # Test with -E to not fail if the unit test was run with
# PYTHONASYNCIODEBUG set to a non-empty string # PYTHONASYNCIODEBUG set to a non-empty string
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment