Commit 9f2b1fd9 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1492 from gevent/issue1331

Improve safety for libuv async and idle watchers.
parents 062086d2 6bed079a
...@@ -46,6 +46,11 @@ ...@@ -46,6 +46,11 @@
- Make ``gevent.pywsgi`` support ``Connection: keep-alive`` in - Make ``gevent.pywsgi`` support ``Connection: keep-alive`` in
HTTP/1.0. Based on :pr:`1331` by tanchuhan. HTTP/1.0. Based on :pr:`1331` by tanchuhan.
- Fix a potential crash using ``gevent.idle()`` when using libuv. See
:issue:`1489`.
- Fix some potential crashes using libuv async watchers.
1.5a2 (2019-10-21) 1.5a2 (2019-10-21)
================== ==================
......
...@@ -123,8 +123,8 @@ class AbstractCallbacks(object): ...@@ -123,8 +123,8 @@ class AbstractCallbacks(object):
args = _NOARGS args = _NOARGS
if args and args[0] == GEVENT_CORE_EVENTS: if args and args[0] == GEVENT_CORE_EVENTS:
args = (revents, ) + args[1:] args = (revents, ) + args[1:]
#print("Calling function", the_watcher.callback, args) #_dbg("Calling function", the_watcher.callback, args)
the_watcher.callback(*args) the_watcher.callback(*args) # None here means we weren't started
except: # pylint:disable=bare-except except: # pylint:disable=bare-except
_dbg("Got exception servicing watcher with handle", handle, sys.exc_info()) _dbg("Got exception servicing watcher with handle", handle, sys.exc_info())
# It's possible for ``the_watcher`` to be undefined (UnboundLocalError) # It's possible for ``the_watcher`` to be undefined (UnboundLocalError)
......
...@@ -364,7 +364,6 @@ class watcher(object): ...@@ -364,7 +364,6 @@ class watcher(object):
# may fail if __init__ did; will be harmlessly printed # may fail if __init__ did; will be harmlessly printed
self.close() self.close()
def __repr__(self): def __repr__(self):
formats = self._format() formats = self._format()
result = "<%s at 0x%x%s" % (self.__class__.__name__, id(self), formats) result = "<%s at 0x%x%s" % (self.__class__.__name__, id(self), formats)
...@@ -398,7 +397,7 @@ class watcher(object): ...@@ -398,7 +397,7 @@ class watcher(object):
raise NotImplementedError() raise NotImplementedError()
def _get_callback(self): def _get_callback(self):
return self._callback return self._callback if '_callback' in self.__dict__ else None
def _set_callback(self, cb): def _set_callback(self, cb):
if not callable(cb) and cb is not None: if not callable(cb) and cb is not None:
...@@ -435,15 +434,18 @@ class watcher(object): ...@@ -435,15 +434,18 @@ class watcher(object):
self._watcher_ffi_start_unref() self._watcher_ffi_start_unref()
def stop(self): def stop(self):
if self._callback is None: if self.callback is None:
assert self.loop is None or self not in self.loop._keepaliveset assert self.loop is None or self not in self.loop._keepaliveset
return return
self.callback = None
# Only after setting the signal to make this idempotent do
# we move ahead.
self._watcher_ffi_stop_ref() self._watcher_ffi_stop_ref()
self._watcher_ffi_stop() self._watcher_ffi_stop()
self.loop._keepaliveset.discard(self) self.loop._keepaliveset.discard(self)
self._handle = None self._handle = None
self._watcher_set_data(self._watcher, self._FFI.NULL) # pylint:disable=no-member self._watcher_set_data(self._watcher, self._FFI.NULL) # pylint:disable=no-member
self.callback = None
self.args = None self.args = None
def _get_priority(self): def _get_priority(self):
......
...@@ -25,7 +25,6 @@ start_new_thread, Lock, get_thread_ident, = monkey.get_original(thread_mod_name, ...@@ -25,7 +25,6 @@ start_new_thread, Lock, get_thread_ident, = monkey.get_original(thread_mod_name,
# pylint 2.0.dev2 things collections.dequeue.popleft() doesn't return # pylint 2.0.dev2 things collections.dequeue.popleft() doesn't return
# pylint:disable=assignment-from-no-return # pylint:disable=assignment-from-no-return
class _Condition(object): class _Condition(object):
# pylint:disable=method-hidden # pylint:disable=method-hidden
...@@ -33,21 +32,9 @@ class _Condition(object): ...@@ -33,21 +32,9 @@ class _Condition(object):
self.__lock = lock self.__lock = lock
self.__waiters = [] self.__waiters = []
# If the lock defines _release_save() and/or _acquire_restore(), # No need to special case for _release_save and
# these override the default implementations (which just call # _acquire_restore; those are only used for RLock, and
# release() and acquire() on the lock). Ditto for _is_owned(). # we don't use those.
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
def __enter__(self): def __enter__(self):
return self.__lock.__enter__() return self.__lock.__enter__()
...@@ -58,36 +45,28 @@ class _Condition(object): ...@@ -58,36 +45,28 @@ class _Condition(object):
def __repr__(self): def __repr__(self):
return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters)) return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
def _release_save(self):
self.__lock.release() # No state to save
def _acquire_restore(self, x): # pylint:disable=unused-argument
self.__lock.acquire() # Ignore saved state
def _is_owned(self):
# Return True if lock is owned by current_thread.
# This method is called only if __lock doesn't have _is_owned().
if self.__lock.acquire(0):
self.__lock.release()
return False
return True
def wait(self): def wait(self):
# The condition MUST be owned, but we don't check that.
waiter = Lock()
waiter.acquire()
self.__waiters.append(waiter)
saved_state = self._release_save()
# This variable is for the monitoring utils to know that # This variable is for the monitoring utils to know that
# this is an idle frame and shouldn't be counted. # this is an idle frame and shouldn't be counted.
gevent_threadpool_worker_idle = True # pylint:disable=unused-variable gevent_threadpool_worker_idle = True # pylint:disable=unused-variable
try: # restore state no matter what (e.g., KeyboardInterrupt)
# Our __lock MUST be owned, but we don't check that.
waiter = Lock()
waiter.acquire()
self.__waiters.append(waiter)
self.__lock.release()
try:
waiter.acquire() # Block on the native lock waiter.acquire() # Block on the native lock
finally: finally:
self._acquire_restore(saved_state) self.__lock.acquire()
# just good form to release the lock we're holding before it goes
# out of scope
waiter.release()
def notify_one(self): def notify_one(self):
# The condition MUST be owned, but we don't check that. # The lock SHOULD be owned, but we don't check that.
try: try:
waiter = self.__waiters.pop() waiter = self.__waiters.pop()
except IndexError: except IndexError:
...@@ -154,7 +133,7 @@ class Queue(object): ...@@ -154,7 +133,7 @@ class Queue(object):
def put(self, item): def put(self, item):
"""Put an item into the queue. """Put an item into the queue.
""" """
with self._not_empty: with self._mutex:
self._queue.append(item) self._queue.append(item)
self.unfinished_tasks += 1 self.unfinished_tasks += 1
self._not_empty.notify_one() self._not_empty.notify_one()
...@@ -162,7 +141,7 @@ class Queue(object): ...@@ -162,7 +141,7 @@ class Queue(object):
def get(self): def get(self):
"""Remove and return an item from the queue. """Remove and return an item from the queue.
""" """
with self._not_empty: with self._mutex:
while not self._queue: while not self._queue:
self._not_empty.wait() self._not_empty.wait()
item = self._queue.popleft() item = self._queue.popleft()
......
...@@ -171,10 +171,10 @@ def idle(priority=0): ...@@ -171,10 +171,10 @@ def idle(priority=0):
.. seealso:: :func:`sleep` .. seealso:: :func:`sleep`
""" """
hub = _get_hub_noargs() hub = _get_hub_noargs()
watcher = hub.loop.idle() with hub.loop.idle() as watcher:
if priority: if priority:
watcher.priority = priority watcher.priority = priority
hub.wait(watcher) hub.wait(watcher)
def kill(greenlet, exception=GreenletExit): def kill(greenlet, exception=GreenletExit):
......
...@@ -138,7 +138,6 @@ struct uv_async_s { ...@@ -138,7 +138,6 @@ struct uv_async_s {
struct uv_loop_s* loop; struct uv_loop_s* loop;
uv_handle_type type; uv_handle_type type;
void *data; void *data;
void (*async_cb)(struct uv_async_s *);
GEVENT_STRUCT_DONE _; GEVENT_STRUCT_DONE _;
}; };
......
...@@ -16,6 +16,8 @@ libuv = _corecffi.lib ...@@ -16,6 +16,8 @@ libuv = _corecffi.lib
from gevent._ffi import watcher as _base from gevent._ffi import watcher as _base
from gevent._ffi import _dbg from gevent._ffi import _dbg
# A set of uv_handle_t* CFFI objects. Kept around
# to keep the memory alive until libuv is done with them.
_closing_watchers = set() _closing_watchers = set()
# In debug mode, it would be nice to be able to clear the memory of # In debug mode, it would be nice to be able to clear the memory of
...@@ -27,7 +29,9 @@ _closing_watchers = set() ...@@ -27,7 +29,9 @@ _closing_watchers = set()
# crash) suggesting either that we're writing on memory that doesn't # crash) suggesting either that we're writing on memory that doesn't
# belong to us, somehow, or that we haven't actually lost all # belong to us, somehow, or that we haven't actually lost all
# references... # references...
_uv_close_callback = ffi.def_extern(name='_uv_close_callback')(_closing_watchers.remove) _uv_close_callback = ffi.def_extern(name='_uv_close_callback')(
_closing_watchers.remove
)
_events = [(libuv.UV_READABLE, "READ"), _events = [(libuv.UV_READABLE, "READ"),
...@@ -125,6 +129,8 @@ class watcher(_base.watcher): ...@@ -125,6 +129,8 @@ class watcher(_base.watcher):
# but that don't in CFFI without a cast. But be careful what we use the cast # but that don't in CFFI without a cast. But be careful what we use the cast
# for, don't pass it back to C. # for, don't pass it back to C.
ffi_handle_watcher = cls._FFI.cast('uv_handle_t*', ffi_watcher) ffi_handle_watcher = cls._FFI.cast('uv_handle_t*', ffi_watcher)
ffi_handle_watcher.data = ffi.NULL
if ffi_handle_watcher.type and not libuv.uv_is_closing(ffi_watcher): if ffi_handle_watcher.type and not libuv.uv_is_closing(ffi_watcher):
# If the type isn't set, we were never properly initialized, # If the type isn't set, we were never properly initialized,
# and trying to close it results in libuv terminating the process. # and trying to close it results in libuv terminating the process.
...@@ -133,9 +139,6 @@ class watcher(_base.watcher): ...@@ -133,9 +139,6 @@ class watcher(_base.watcher):
_closing_watchers.add(ffi_watcher) _closing_watchers.add(ffi_watcher)
libuv.uv_close(ffi_watcher, libuv._uv_close_callback) libuv.uv_close(ffi_watcher, libuv._uv_close_callback)
ffi_handle_watcher.data = ffi.NULL
def _watcher_ffi_set_init_ref(self, ref): def _watcher_ffi_set_init_ref(self, ref):
self.ref = ref self.ref = ref
...@@ -548,33 +551,39 @@ class child(_SimulatedWithAsyncMixin, ...@@ -548,33 +551,39 @@ class child(_SimulatedWithAsyncMixin,
class async_(_base.AsyncMixin, watcher): class async_(_base.AsyncMixin, watcher):
_watcher_callback_name = '_gevent_async_callback0' _watcher_callback_name = '_gevent_async_callback0'
# libuv async watchers are different than all other watchers:
# They don't have a separate start/stop method (presumably
# because of race conditions). Simply initing them places them
# into the active queue.
#
# In the past, we sent a NULL C callback to the watcher, trusting
# that no one would call send() without actually starting us (or after
# closing us); doing so would crash. But we don't want to delay
# initing the struct because it will crash in uv_close() when we get GC'd,
# and send() will also crash. Plus that complicates our lifecycle (managing
# the memory).
#
# Now, we always init the correct C callback, and use a dummy
# Python callback that gets replaced when we are started and
# stopped. This prevents mistakes from being crashes.
_callback = lambda: None
def _watcher_ffi_init(self, args): def _watcher_ffi_init(self, args):
# It's dangerous to have a raw, non-initted struct
# around; it will crash in uv_close() when we get GC'd,
# and send() will also crash.
# NOTE: uv_async_init is NOT idempotent. Calling it more than # NOTE: uv_async_init is NOT idempotent. Calling it more than
# once adds the uv_async_t to the internal queue multiple times, # once adds the uv_async_t to the internal queue multiple times,
# and uv_close only cleans up one of them, meaning that we tend to # and uv_close only cleans up one of them, meaning that we tend to
# crash. Thus we have to be very careful not to allow that. # crash. Thus we have to be very careful not to allow that.
return self._watcher_init(self.loop.ptr, self._watcher, ffi.NULL) return self._watcher_init(self.loop.ptr, self._watcher,
self._watcher_callback)
def _watcher_ffi_start(self): def _watcher_ffi_start(self):
# we're created in a started state, but we didn't provide a pass
# callback (because if we did and we don't have a value in our
# callback attribute, then python_callback would crash.) Note that
# uv_async_t->async_cb is not technically documented as public.
self._watcher.async_cb = self._watcher_callback
def _watcher_ffi_stop(self): def _watcher_ffi_stop(self):
self._watcher.async_cb = ffi.NULL pass
# We have to unref this because we're setting the cb behind libuv's
# back, basically: once a async watcher is started, it can't ever be
# stopped through libuv interfaces, so it would never lose its active
# status, and thus if it stays reffed it would keep the event loop
# from exiting.
self._watcher_ffi_unref()
def send(self): def send(self):
assert self._callback is not async_._callback, "Sending to a closed watcher"
if libuv.uv_is_closing(self._watcher): if libuv.uv_is_closing(self._watcher):
raise Exception("Closing handle") raise Exception("Closing handle")
libuv.uv_async_send(self._watcher) libuv.uv_async_send(self._watcher)
......
...@@ -201,7 +201,10 @@ class ThreadPool(GroupMappingMixin): ...@@ -201,7 +201,10 @@ class ThreadPool(GroupMappingMixin):
""" """
Add a new task to the threadpool that will run ``func(*args, **kwargs)``. Add a new task to the threadpool that will run ``func(*args, **kwargs)``.
Waits until a slot is available. Creates a new thread if necessary. Waits until a slot is available. Creates a new native thread if necessary.
This must only be called from the native thread that owns this object's
hub.
:return: A :class:`gevent.event.AsyncResult`. :return: A :class:`gevent.event.AsyncResult`.
""" """
...@@ -219,12 +222,13 @@ class ThreadPool(GroupMappingMixin): ...@@ -219,12 +222,13 @@ class ThreadPool(GroupMappingMixin):
# we get LoopExit (why?). Previously it was done with a rawlink on the # we get LoopExit (why?). Previously it was done with a rawlink on the
# AsyncResult and the comment that it is "competing for order with get(); this is not # AsyncResult and the comment that it is "competing for order with get(); this is not
# good, just make ThreadResult release the semaphore before doing anything else" # good, just make ThreadResult release the semaphore before doing anything else"
assert self.hub == get_hub()
thread_result = ThreadResult(result, self.hub, semaphore.release) thread_result = ThreadResult(result, self.hub, semaphore.release)
task_queue.put((func, args, kwargs, thread_result)) task_queue.put((func, args, kwargs, thread_result))
self.adjust() self.adjust()
except: except:
if thread_result is not None: if thread_result is not None:
thread_result.destroy() thread_result.destroy_in_main_thread()
semaphore.release() semaphore.release()
raise raise
return result return result
...@@ -370,8 +374,14 @@ class ThreadResult(object): ...@@ -370,8 +374,14 @@ class ThreadResult(object):
return self.exc_info[1] if self.exc_info else None return self.exc_info[1] if self.exc_info else None
def _on_async(self): def _on_async(self):
self.async_watcher.stop() # Called in the hub thread.
self.async_watcher.close()
aw = self.async_watcher
self.async_watcher = _FakeAsync
aw.stop()
aw.close()
# Typically this is pool.semaphore.release and we have to # Typically this is pool.semaphore.release and we have to
# call this in the Hub; if we don't we get the dreaded # call this in the Hub; if we don't we get the dreaded
...@@ -393,7 +403,10 @@ class ThreadResult(object): ...@@ -393,7 +403,10 @@ class ThreadResult(object):
if self.exc_info: if self.exc_info:
self.exc_info = (self.exc_info[0], self.exc_info[1], None) self.exc_info = (self.exc_info[0], self.exc_info[1], None)
def destroy(self): def destroy_in_main_thread(self):
"""
This must only be called from the thread running the hub.
"""
self.async_watcher.stop() self.async_watcher.stop()
self.async_watcher.close() self.async_watcher.close()
self.async_watcher = _FakeAsync self.async_watcher = _FakeAsync
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment