Commit e4a04e89 authored by Jason Madden's avatar Jason Madden

Fix destroying the loop and properly close handles.

Sadly this exposes the fact that libuv doesn't allow multiple handles
for a given poll fd. Nor does it raise clear errors until you try to
close them in an order different from the inverse of the one they were
created in. Doing so simply aborts the process when libuv realises that
the watcher it has registered is not the watcher you're closing.

This further means that multiple watchers wont' get the right set of
events delivered, I think (though most of the tests were passing until I
discovered this crash).
parent 5d0cc66a
......@@ -150,10 +150,13 @@ class watcher(object):
def _watcher_create(self, ref): # pylint:disable=unused-argument
self._handle = type(self).new_handle(self)
self._watcher = type(self).new(self._watcher_struct_pointer_type)
self._watcher = self._watcher_new()
# XXX: Must GC the _watche in libuv: uv_close()
self._watcher.data = self._handle
def _watcher_new(self):
return type(self).new(self._watcher_struct_pointer_type)
def _watcher_ffi_set_priority(self, priority):
pass
......
......@@ -3,6 +3,9 @@
#define GEVENT_ST_NLINK_T int
#define GEVENT_UV_OS_SOCK_T int
#define UV_EBUSY ...
typedef enum {
UV_RUN_DEFAULT = 0,
UV_RUN_ONCE,
......@@ -115,7 +118,7 @@ typedef struct uv_fs_poll_s uv_fs_poll_t;
// callbacks with the same signature
typedef void (*uv_close_cb)(void *handle);
typedef void (*uv_close_cb)(uv_handle_t *handle);
typedef void (*uv_idle_cb)(void *handle);
typedef void (*uv_timer_cb)(void *handle);
typedef void (*uv_check_cb)(void* handle);
......
......@@ -89,7 +89,27 @@ class loop(AbstractLoop):
if self._ptr:
ptr = self._ptr
super(loop, self).destroy()
libuv.uv_loop_close(ptr)
libuv.uv_stop(ptr)
closed_failed = libuv.uv_loop_close(ptr)
if closed_failed:
assert closed_failed == libuv.UV_EBUSY
# Walk the open handlers, close them, then
# run the loop once to clear them out and
# close again.
def walk(handle, _arg):
libuv.uv_close(handle, ffi.NULL)
libuv.uv_walk(ptr,
ffi.callback("void(*)(uv_handle_t*,void*)",
walk),
ffi.NULL)
ran_has_more_callbacks = libuv.uv_run(ptr, libuv.UV_RUN_ONCE)
if ran_has_more_callbacks:
libuv.uv_run(ptr, libuv.UV_RUN_NOWAIT)
closed_failed = libuv.uv_loop_close(ptr)
assert not closed_failed, closed_failed
def ref(self):
pass
......@@ -134,6 +154,17 @@ class loop(AbstractLoop):
if nowait:
mode = libuv.UV_RUN_NOWAIT
# if mode == libuv.UV_RUN_DEFAULT:
# print("looping in python")
# ptr = self._ptr
# ran_error = 0
# while ran_error == 0:
# ran_error = libuv.uv_run(ptr, libuv.UV_RUN_ONCE)
# if ran_error != 0:
# print("Error running loop", libuv.uv_err_name(ran_error),
# libuv.uv_strerror(ran_error))
# return ran_error
return libuv.uv_run(self._ptr, mode)
def now(self):
......
# pylint: disable=too-many-lines, protected-access, redefined-outer-name, not-callable
from __future__ import absolute_import, print_function
import os
import sys
import gevent.libuv._corecffi as _corecffi # pylint:disable=no-name-in-module,import-error
ffi = _corecffi.ffi # pylint:disable=no-member
......@@ -12,6 +9,12 @@ libuv = _corecffi.lib # pylint:disable=no-member
from gevent._ffi import watcher as _base
_closing_handles = set()
@ffi.callback("void(*)(uv_handle_t*)")
def _uv_close_callback(handle):
_closing_handles.remove(handle)
class watcher(_base.watcher):
_FFI = ffi
......@@ -21,6 +24,31 @@ class watcher(_base.watcher):
_watcher_struct_pattern = '%s_t'
_watcher_callback_name = '_gevent_generic_callback0'
def __del__(self):
# Managing the lifetime of _watcher is tricky.
# They have to be uv_close()'d, but that only
# queues them to be closed in the *next* loop iteration.
# The memory most stay valid for at least that long,
# or assert errors are triggered. We can't use a ffi.gc()
# pointer to queue the uv_close, because by the time the
# destructor is called, there's no way to keep the memory alive
# and it could be re-used.
# So here we resort to resurrecting the pointer object out
# of our scope, keeping it alive past this object's lifetime.
# We then use the uv_close callback to handle removing that
# reference. There's no context passed to the closs callback,
# so we have to do this globally.
# Sadly, doing this causes crashes if there were multiple
# watchers for a given FD. See https://github.com/gevent/gevent/issues/790#issuecomment-208076604
#print("Del", ffi.cast('void*', self._watcher), 'started', libuv.uv_is_active(self._watcher), type(self), id(self))
#if hasattr(self, '_fd'):
# print("FD", self._fd)
if not libuv.uv_is_closing(self._watcher):
self._watcher.data = ffi.NULL
_closing_handles.add(self._watcher)
libuv.uv_close(self._watcher, _uv_close_callback)
self._watcher = None
def _watcher_ffi_set_priority(self, priority):
# libuv has no concept of priority
......
......@@ -21,4 +21,5 @@ timer = loop.timer(0.5)
timer.start(lambda: None)
loop.run()
loop.destroy()
del loop
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment