Commit b6c0c5ba authored by Serhiy Storchaka's avatar Serhiy Storchaka

Merge heads

parents 2bef5857 9b524d59
...@@ -179,6 +179,7 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -179,6 +179,7 @@ class BaseEventLoop(events.AbstractEventLoop):
# In debug mode, if the execution of a callback or a step of a task # In debug mode, if the execution of a callback or a step of a task
# exceed this duration in seconds, the slow callback/task is logged. # exceed this duration in seconds, the slow callback/task is logged.
self.slow_callback_duration = 0.1 self.slow_callback_duration = 0.1
self._current_handle = None
def __repr__(self): def __repr__(self):
return ('<%s running=%s closed=%s debug=%s>' return ('<%s running=%s closed=%s debug=%s>'
...@@ -723,7 +724,13 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -723,7 +724,13 @@ class BaseEventLoop(events.AbstractEventLoop):
logger.debug("Datagram endpoint remote_addr=%r created: " logger.debug("Datagram endpoint remote_addr=%r created: "
"(%r, %r)", "(%r, %r)",
remote_addr, transport, protocol) remote_addr, transport, protocol)
yield from waiter
try:
yield from waiter
except:
transport.close()
raise
return transport, protocol return transport, protocol
@coroutine @coroutine
...@@ -815,7 +822,13 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -815,7 +822,13 @@ class BaseEventLoop(events.AbstractEventLoop):
protocol = protocol_factory() protocol = protocol_factory()
waiter = futures.Future(loop=self) waiter = futures.Future(loop=self)
transport = self._make_read_pipe_transport(pipe, protocol, waiter) transport = self._make_read_pipe_transport(pipe, protocol, waiter)
yield from waiter
try:
yield from waiter
except:
transport.close()
raise
if self._debug: if self._debug:
logger.debug('Read pipe %r connected: (%r, %r)', logger.debug('Read pipe %r connected: (%r, %r)',
pipe.fileno(), transport, protocol) pipe.fileno(), transport, protocol)
...@@ -826,7 +839,13 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -826,7 +839,13 @@ class BaseEventLoop(events.AbstractEventLoop):
protocol = protocol_factory() protocol = protocol_factory()
waiter = futures.Future(loop=self) waiter = futures.Future(loop=self)
transport = self._make_write_pipe_transport(pipe, protocol, waiter) transport = self._make_write_pipe_transport(pipe, protocol, waiter)
yield from waiter
try:
yield from waiter
except:
transport.close()
raise
if self._debug: if self._debug:
logger.debug('Write pipe %r connected: (%r, %r)', logger.debug('Write pipe %r connected: (%r, %r)',
pipe.fileno(), transport, protocol) pipe.fileno(), transport, protocol)
...@@ -937,6 +956,10 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -937,6 +956,10 @@ class BaseEventLoop(events.AbstractEventLoop):
else: else:
exc_info = False exc_info = False
if (self._current_handle is not None
and self._current_handle._source_traceback):
context['handle_traceback'] = self._current_handle._source_traceback
log_lines = [message] log_lines = [message]
for key in sorted(context): for key in sorted(context):
if key in {'message', 'exception'}: if key in {'message', 'exception'}:
...@@ -946,6 +969,10 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -946,6 +969,10 @@ class BaseEventLoop(events.AbstractEventLoop):
tb = ''.join(traceback.format_list(value)) tb = ''.join(traceback.format_list(value))
value = 'Object created at (most recent call last):\n' value = 'Object created at (most recent call last):\n'
value += tb.rstrip() value += tb.rstrip()
elif key == 'handle_traceback':
tb = ''.join(traceback.format_list(value))
value = 'Handle created at (most recent call last):\n'
value += tb.rstrip()
else: else:
value = repr(value) value = repr(value)
log_lines.append('{}: {}'.format(key, value)) log_lines.append('{}: {}'.format(key, value))
...@@ -1103,12 +1130,16 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -1103,12 +1130,16 @@ class BaseEventLoop(events.AbstractEventLoop):
if handle._cancelled: if handle._cancelled:
continue continue
if self._debug: if self._debug:
t0 = self.time() try:
handle._run() self._current_handle = handle
dt = self.time() - t0 t0 = self.time()
if dt >= self.slow_callback_duration: handle._run()
logger.warning('Executing %s took %.3f seconds', dt = self.time() - t0
_format_handle(handle), dt) if dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
finally:
self._current_handle = None
else: else:
handle._run() handle._run()
handle = None # Needed to break cycles when an exception occurs. handle = None # Needed to break cycles when an exception occurs.
......
...@@ -694,12 +694,7 @@ class IocpProactor: ...@@ -694,12 +694,7 @@ class IocpProactor:
def close(self): def close(self):
# Cancel remaining registered operations. # Cancel remaining registered operations.
for address, (fut, ov, obj, callback) in list(self._cache.items()): for address, (fut, ov, obj, callback) in list(self._cache.items()):
if obj is None: if fut.cancelled():
# The operation was started with connect_pipe() which
# queues a task to Windows' thread pool. This cannot
# be cancelled, so just forget it.
del self._cache[address]
elif fut.cancelled():
# Nothing to do with cancelled futures # Nothing to do with cancelled futures
pass pass
elif isinstance(fut, _WaitCancelFuture): elif isinstance(fut, _WaitCancelFuture):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment