Commit d0a28dee authored by Victor Stinner's avatar Victor Stinner

Issue #23095, asyncio: Rewrite _WaitHandleFuture.cancel()

This change fixes a race conditon related to _WaitHandleFuture.cancel() leading
to Python crash or "GetQueuedCompletionStatus() returned an unexpected event"
logs. Before, the overlapped object was destroyed too early, it was possible
that the wait completed whereas the overlapped object was already destroyed.
Sometimes, a different overlapped was allocated at the same address, leading to
unexpected completition.

_WaitHandleFuture.cancel() now waits until the wait is cancelled to clear its
reference to the overlapped object. To wait until the cancellation is done,
UnregisterWaitEx() is used with an event instead of UnregisterWait().

To wait for this event, a new _WaitCancelFuture class was added. It's a
simplified version of _WaitCancelFuture. For example, its cancel() method calls
UnregisterWait(), not UnregisterWaitEx(). _WaitCancelFuture should not be
cancelled.

The overlapped object is kept alive in _WaitHandleFuture until the wait is
unregistered.

Other changes:

* Add _overlapped.UnregisterWaitEx()
* Remove fast-path in IocpProactor.wait_for_handle() to immediatly set the
  result if the wait already completed. I'm not sure that it's safe to
  call immediatly UnregisterWaitEx() before the completion was signaled.
* Add IocpProactor._unregistered() to forget an overlapped which may never be
  signaled, but may be signaled for the next loop iteration. It avoids to
  block forever IocpProactor.close() if a wait was cancelled, and it may also
  avoid some "... unexpected event ..." warnings.
parent 442b0adc
...@@ -78,20 +78,23 @@ class _OverlappedFuture(futures.Future): ...@@ -78,20 +78,23 @@ class _OverlappedFuture(futures.Future):
self._ov = None self._ov = None
class _WaitHandleFuture(futures.Future): class _BaseWaitHandleFuture(futures.Future):
"""Subclass of Future which represents a wait handle.""" """Subclass of Future which represents a wait handle."""
def __init__(self, iocp, ov, handle, wait_handle, *, loop=None): def __init__(self, ov, handle, wait_handle, *, loop=None):
super().__init__(loop=loop) super().__init__(loop=loop)
if self._source_traceback: if self._source_traceback:
del self._source_traceback[-1] del self._source_traceback[-1]
# iocp and ov are only used by cancel() to notify IocpProactor # Keep a reference to the Overlapped object to keep it alive until the
# that the wait was cancelled # wait is unregistered
self._iocp = iocp
self._ov = ov self._ov = ov
self._handle = handle self._handle = handle
self._wait_handle = wait_handle self._wait_handle = wait_handle
# Should we call UnregisterWaitEx() if the wait completes
# or is cancelled?
self._registered = True
def _poll(self): def _poll(self):
# non-blocking wait: use a timeout of 0 millisecond # non-blocking wait: use a timeout of 0 millisecond
return (_winapi.WaitForSingleObject(self._handle, 0) == return (_winapi.WaitForSingleObject(self._handle, 0) ==
...@@ -99,21 +102,32 @@ class _WaitHandleFuture(futures.Future): ...@@ -99,21 +102,32 @@ class _WaitHandleFuture(futures.Future):
def _repr_info(self): def _repr_info(self):
info = super()._repr_info() info = super()._repr_info()
info.insert(1, 'handle=%#x' % self._handle) info.append('handle=%#x' % self._handle)
if self._wait_handle: if self._handle is not None:
state = 'signaled' if self._poll() else 'waiting' state = 'signaled' if self._poll() else 'waiting'
info.insert(1, 'wait_handle=<%s, %#x>' info.append(state)
% (state, self._wait_handle)) if self._wait_handle is not None:
info.append('wait_handle=%#x' % self._wait_handle)
return info return info
def _unregister_wait_cb(self, fut):
# The wait was unregistered: it's not safe to destroy the Overlapped
# object
self._ov = None
def _unregister_wait(self): def _unregister_wait(self):
if self._wait_handle is None: if not self._registered:
return return
self._registered = False
try: try:
_overlapped.UnregisterWait(self._wait_handle) _overlapped.UnregisterWait(self._wait_handle)
except OSError as exc: except OSError as exc:
# ERROR_IO_PENDING is not an error, the wait was unregistered self._wait_handle = None
if exc.winerror != _overlapped.ERROR_IO_PENDING: if exc.winerror == _overlapped.ERROR_IO_PENDING:
# ERROR_IO_PENDING is not an error, the wait was unregistered
self._unregister_wait_cb(None)
elif exc.winerror != _overlapped.ERROR_IO_PENDING:
context = { context = {
'message': 'Failed to unregister the wait handle', 'message': 'Failed to unregister the wait handle',
'exception': exc, 'exception': exc,
...@@ -122,26 +136,91 @@ class _WaitHandleFuture(futures.Future): ...@@ -122,26 +136,91 @@ class _WaitHandleFuture(futures.Future):
if self._source_traceback: if self._source_traceback:
context['source_traceback'] = self._source_traceback context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context) self._loop.call_exception_handler(context)
self._wait_handle = None else:
self._iocp = None self._wait_handle = None
self._ov = None self._unregister_wait_cb(None)
def cancel(self): def cancel(self):
result = super().cancel()
if self._ov is not None:
# signal the cancellation to the overlapped object
_overlapped.PostQueuedCompletionStatus(self._iocp, True,
0, self._ov.address)
self._unregister_wait() self._unregister_wait()
return result return super().cancel()
def set_exception(self, exception): def set_exception(self, exception):
super().set_exception(exception)
self._unregister_wait() self._unregister_wait()
super().set_exception(exception)
def set_result(self, result): def set_result(self, result):
super().set_result(result)
self._unregister_wait() self._unregister_wait()
super().set_result(result)
class _WaitCancelFuture(_BaseWaitHandleFuture):
"""Subclass of Future which represents a wait for the cancellation of a
_WaitHandleFuture using an event.
"""
def __init__(self, ov, event, wait_handle, *, loop=None):
super().__init__(ov, event, wait_handle, loop=loop)
self._done_callback = None
def _schedule_callbacks(self):
super(_WaitCancelFuture, self)._schedule_callbacks()
if self._done_callback is not None:
self._done_callback(self)
class _WaitHandleFuture(_BaseWaitHandleFuture):
def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
super().__init__(ov, handle, wait_handle, loop=loop)
self._proactor = proactor
self._unregister_proactor = True
self._event = _overlapped.CreateEvent(None, True, False, None)
self._event_fut = None
def _unregister_wait_cb(self, fut):
if self._event is not None:
_winapi.CloseHandle(self._event)
self._event = None
self._event_fut = None
# If the wait was cancelled, the wait may never be signalled, so
# it's required to unregister it. Otherwise, IocpProactor.close() will
# wait forever for an event which will never come.
#
# If the IocpProactor already received the event, it's safe to call
# _unregister() because we kept a reference to the Overlapped object
# which is used as an unique key.
self._proactor._unregister(self._ov)
self._proactor = None
super()._unregister_wait_cb(fut)
def _unregister_wait(self):
if not self._registered:
return
self._registered = False
try:
_overlapped.UnregisterWaitEx(self._wait_handle, self._event)
except OSError as exc:
self._wait_handle = None
if exc.winerror == _overlapped.ERROR_IO_PENDING:
# ERROR_IO_PENDING is not an error, the wait was unregistered
self._unregister_wait_cb(None)
elif exc.winerror != _overlapped.ERROR_IO_PENDING:
context = {
'message': 'Failed to unregister the wait handle',
'exception': exc,
'future': self,
}
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
else:
self._wait_handle = None
self._event_fut = self._proactor._wait_cancel(
self._event,
self._unregister_wait_cb)
class PipeServer(object): class PipeServer(object):
...@@ -291,6 +370,7 @@ class IocpProactor: ...@@ -291,6 +370,7 @@ class IocpProactor:
_overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency) _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
self._cache = {} self._cache = {}
self._registered = weakref.WeakSet() self._registered = weakref.WeakSet()
self._unregistered = []
self._stopped_serving = weakref.WeakSet() self._stopped_serving = weakref.WeakSet()
def __repr__(self): def __repr__(self):
...@@ -438,6 +518,16 @@ class IocpProactor: ...@@ -438,6 +518,16 @@ class IocpProactor:
Return a Future object. The result of the future is True if the wait Return a Future object. The result of the future is True if the wait
completed, or False if the wait did not complete (on timeout). completed, or False if the wait did not complete (on timeout).
""" """
return self._wait_for_handle(handle, timeout, False)
def _wait_cancel(self, event, done_callback):
fut = self._wait_for_handle(event, None, True)
# add_done_callback() cannot be used because the wait may only complete
# in IocpProactor.close(), while the event loop is not running.
fut._done_callback = done_callback
return fut
def _wait_for_handle(self, handle, timeout, _is_cancel):
if timeout is None: if timeout is None:
ms = _winapi.INFINITE ms = _winapi.INFINITE
else: else:
...@@ -447,9 +537,13 @@ class IocpProactor: ...@@ -447,9 +537,13 @@ class IocpProactor:
# We only create ov so we can use ov.address as a key for the cache. # We only create ov so we can use ov.address as a key for the cache.
ov = _overlapped.Overlapped(NULL) ov = _overlapped.Overlapped(NULL)
wh = _overlapped.RegisterWaitWithQueue( wait_handle = _overlapped.RegisterWaitWithQueue(
handle, self._iocp, ov.address, ms) handle, self._iocp, ov.address, ms)
f = _WaitHandleFuture(self._iocp, ov, handle, wh, loop=self._loop) if _is_cancel:
f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
else:
f = _WaitHandleFuture(ov, handle, wait_handle, self,
loop=self._loop)
if f._source_traceback: if f._source_traceback:
del f._source_traceback[-1] del f._source_traceback[-1]
...@@ -462,14 +556,6 @@ class IocpProactor: ...@@ -462,14 +556,6 @@ class IocpProactor:
# False even though we have not timed out. # False even though we have not timed out.
return f._poll() return f._poll()
if f._poll():
try:
result = f._poll()
except OSError as exc:
f.set_exception(exc)
else:
f.set_result(result)
self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle) self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
return f return f
...@@ -521,6 +607,15 @@ class IocpProactor: ...@@ -521,6 +607,15 @@ class IocpProactor:
self._cache[ov.address] = (f, ov, obj, callback) self._cache[ov.address] = (f, ov, obj, callback)
return f return f
def _unregister(self, ov):
"""Unregister an overlapped object.
Call this method when its future has been cancelled. The event can
already be signalled (pending in the proactor event queue). It is also
safe if the event is never signalled (because it was cancelled).
"""
self._unregistered.append(ov)
def _get_accept_socket(self, family): def _get_accept_socket(self, family):
s = socket.socket(family) s = socket.socket(family)
s.settimeout(0) s.settimeout(0)
...@@ -541,7 +636,7 @@ class IocpProactor: ...@@ -541,7 +636,7 @@ class IocpProactor:
while True: while True:
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None: if status is None:
return break
ms = 0 ms = 0
err, transferred, key, address = status err, transferred, key, address = status
...@@ -576,6 +671,11 @@ class IocpProactor: ...@@ -576,6 +671,11 @@ class IocpProactor:
f.set_result(value) f.set_result(value)
self._results.append(f) self._results.append(f)
# Remove unregisted futures
for ov in self._unregistered:
self._cache.pop(ov.address, None)
self._unregistered.clear()
def _stop_serving(self, obj): def _stop_serving(self, obj):
# obj is a socket or pipe handle. It will be closed in # obj is a socket or pipe handle. It will be closed in
# BaseProactorEventLoop._stop_serving() which will make any # BaseProactorEventLoop._stop_serving() which will make any
......
...@@ -309,6 +309,29 @@ overlapped_UnregisterWait(PyObject *self, PyObject *args) ...@@ -309,6 +309,29 @@ overlapped_UnregisterWait(PyObject *self, PyObject *args)
Py_RETURN_NONE; Py_RETURN_NONE;
} }
PyDoc_STRVAR(
UnregisterWaitEx_doc,
"UnregisterWaitEx(WaitHandle, Event) -> None\n\n"
"Unregister wait handle.\n");
static PyObject *
overlapped_UnregisterWaitEx(PyObject *self, PyObject *args)
{
HANDLE WaitHandle, Event;
BOOL ret;
if (!PyArg_ParseTuple(args, F_HANDLE F_HANDLE, &WaitHandle, &Event))
return NULL;
Py_BEGIN_ALLOW_THREADS
ret = UnregisterWaitEx(WaitHandle, Event);
Py_END_ALLOW_THREADS
if (!ret)
return SetFromWindowsErr(0);
Py_RETURN_NONE;
}
/* /*
* Event functions -- currently only used by tests * Event functions -- currently only used by tests
*/ */
...@@ -1319,6 +1342,8 @@ static PyMethodDef overlapped_functions[] = { ...@@ -1319,6 +1342,8 @@ static PyMethodDef overlapped_functions[] = {
METH_VARARGS, RegisterWaitWithQueue_doc}, METH_VARARGS, RegisterWaitWithQueue_doc},
{"UnregisterWait", overlapped_UnregisterWait, {"UnregisterWait", overlapped_UnregisterWait,
METH_VARARGS, UnregisterWait_doc}, METH_VARARGS, UnregisterWait_doc},
{"UnregisterWaitEx", overlapped_UnregisterWaitEx,
METH_VARARGS, UnregisterWaitEx_doc},
{"CreateEvent", overlapped_CreateEvent, {"CreateEvent", overlapped_CreateEvent,
METH_VARARGS, CreateEvent_doc}, METH_VARARGS, CreateEvent_doc},
{"SetEvent", overlapped_SetEvent, {"SetEvent", overlapped_SetEvent,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment