Commit 30b8094f authored by Eddi Linder's avatar Eddi Linder

Merge branch 'master' of https://github.com/gevent/gevent

parents 0ee61b32 5b455137
......@@ -45,6 +45,19 @@ Unreleased
- ``gevent.iwait`` no longer throws ``LoopExit`` if the caller
switches greenlets between return values. Reported and initial patch
in :pr:`467` by Alexey Borzenkov.
- The default threadpool and default threaded resolver work in a
forked child process, such as with ``multiprocessing.Process``.
Previously the child process would hang indefinitely. Reported in
:issue:`230` by Lx Yu.
- Fork watchers are more likely to (eventually) get called in a
multi-threaded program.
- ``gevent.killall`` accepts an arbitrary iterable for the greenlets
to kill. Reported in :issue:`404` by Martin Bachwerk; seen in
combination with older versions of simple-requests.
- ``gevent.local.local`` objects are now eligible for garbage
collection as soon as the greenlet finishes running, matching the
behaviour of the built-in ``threading.local`` (when implemented
natively). Reported in :issue:`387` by AusIV.
1.1a1 (Jun 29, 2015)
====================
......
......@@ -4,11 +4,10 @@ Table Of Contents
.. toctree::
intro
reference
whatsnew_1_0
reference
changelog
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`
......@@ -48,7 +48,7 @@ Being a greenlet__ subclass, :class:`Greenlet` also has ``switch()`` and ``throw
However, these should not be used at the application level. Prefer higher-level safe
classes, like :class:`Event <gevent.event.Event>` and :class:`Queue <gevent.queue.Queue>`, instead.
__ http://codespeak.net/py/0.9.2/greenlet.html
__ http://greenlet.readthedocs.org/en/latest/#instantiation
.. exception:: GreenletExit
......
......@@ -11,5 +11,20 @@ API reference
servers
gevent.local
gevent.monkey
gevent.core
gevent.backdoor
gevent.baseserver
gevent.event
gevent.fileobject
gevent.lock
gevent.os
gevent.pool
gevent.pywsgi
gevent.queue
gevent.select
gevent.server
gevent.socket
gevent.ssl
gevent.subprocess
gevent.thread
gevent.util
gevent.wsgi
......@@ -270,6 +270,7 @@ cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
c_flags = _flags_to_int(flags)
_check_flags(c_flags)
c_flags |= libev.EVFLAG_NOENV
c_flags |= libev.EVFLAG_FORKCHECK
if default is None:
default = True
if _default_loop_destroyed:
......
......@@ -551,6 +551,7 @@ class loop(object):
c_flags = _flags_to_int(flags)
_check_flags(c_flags)
c_flags |= libev.EVFLAG_NOENV
c_flags |= libev.EVFLAG_FORKCHECK
if default is None:
default = True
if _default_loop_destroyed:
......
......@@ -205,7 +205,7 @@ class Greenlet(greenlet):
"""Immediatelly switch into the greenlet and raise an exception in it.
Should only be called from the HUB, otherwise the current greenlet is left unscheduled forever.
To raise an exception in a safely manner from any greenlet, use :meth:`kill`.
To raise an exception in a safe manner from any greenlet, use :meth:`kill`.
If a greenlet was started but never switched to yet, then also
a) cancel the event that will start it
......@@ -266,14 +266,26 @@ class Greenlet(greenlet):
return g
def kill(self, exception=GreenletExit, block=True, timeout=None):
"""Raise the exception in the greenlet.
"""Raise the ``exception`` in the greenlet.
If block is ``True`` (the default), wait until the greenlet dies or the optional timeout expires.
If ``block`` is ``True`` (the default), wait until the greenlet dies or the optional timeout expires.
If block is ``False``, the current greenlet is not unscheduled.
The function always returns ``None`` and never raises an error.
`Changed in version 0.13.0:` *block* is now ``True`` by default.
.. note::
Depending on what this greenlet is executing and the state of the event loop,
the exception may or may not be raised immediately when this greenlet resumes
execution. It may be raised an a subsequent green call, or, if this greenlet
exits before making such a call, it may not be raised at all. As of 1.1, an example
where the exception is raised later is if this greenlet had called ``sleep(0)``; an
example where the exception is raised immediately is if this greenlet had called ``sleep(0.1)``.
See also :func:`gevent.kill`.
.. versionchanged:: 0.13.0
*block* is now ``True`` by default.
"""
# XXX this function should not switch out if greenlet is not started but it does
# XXX fix it (will have to override 'dead' property of greenlet.greenlet)
......@@ -454,7 +466,7 @@ def joinall(greenlets, timeout=None, raise_error=False, count=None):
"""
Wait for the ``greenlets`` to finish.
:param greenlets: A sequence of greenlets to wait for.
:param greenlets: A sequence (supporting :func:`len`) of greenlets to wait for.
:keyword float timeout: If given, the maximum number of seconds to wait.
:return: A sequence of the greenlets that finished before the timeout (if any)
expired.
......@@ -496,6 +508,26 @@ def _killall(greenlets, exception):
def killall(greenlets, exception=GreenletExit, block=True, timeout=None):
"""
Forceably terminate all the ``greenlets`` by causing them to raise ``exception``.
:param greenlets: A bounded iterable of the non-None greenlets to terminate.
*All* the items in this iterable must be greenlets that belong to the same thread.
:keyword exception: The exception to raise in the greenlets. By default this is
:class:`GreenletExit`.
:keyword bool block: If True (the default) then this function only returns when all the
greenlets are dead; the current greenlet is unscheduled during that process.
If greenlets ignore the initial exception raised in them,
then they will be joined (with :func:`gevent.joinall`) and allowed to die naturally.
If False, this function returns immediately and greenlets will raise
the exception asynchronously.
:keyword float timeout: A time in seconds to wait for greenlets to die. If given, it is
only honored when ``block`` is True.
:raises Timeout: If blocking and a timeout is given that elapses before
all the greenlets are dead.
"""
# support non-indexable containers like iterators or set objects
greenlets = list(greenlets)
if not greenlets:
return
loop = greenlets[0].loop
......
......@@ -98,11 +98,15 @@ def idle(priority=0):
def kill(greenlet, exception=GreenletExit):
"""Kill greenlet asynchronously. The current greenlet is not unscheduled.
"""
Kill greenlet asynchronously. The current greenlet is not unscheduled.
.. note::
Note, that :meth:`gevent.Greenlet.kill` method does the same and more. However,
MAIN greenlet - the one that exists initially - does not have ``kill()`` method
so you have to use this function.
The method :meth:`gevent.Greenlet.kill` method does the same and
more (and the same caveats listed there apply here). However, the MAIN
greenlet - the one that exists initially - does not have a
``kill()`` method so you have to use this function.
"""
if not greenlet.dead:
get_hub().loop.run_callback(greenlet.throw, exception)
......@@ -150,9 +154,34 @@ class signal(object):
def reinit():
# An internal, undocumented function. Called by gevent.os.fork
# in the child process. The loop reinit function in turn calls
# libev's ev_loop_fork function.
hub = _get_hub()
if hub is not None:
hub.loop.reinit()
# libev's fork watchers are slow to fire because the only fire
# at the beginning of a loop; due to our use of callbacks that
# run at the end of the loop, that may be too late. The
# threadpool and resolvers depend on the fork handlers being
# run ( specifically, the threadpool will fail in the forked
# child if there were any threads in it, which there will be
# if the resolver_thread was in use (the default) before the
# fork.)
#
# If the forked process wants to use the threadpool or
# resolver immediately, it would hang.
#
# The below is a workaround. Fortunately, both of these
# methods are idempotent and can be called multiple times
# following a fork if the suddenly started working, or were
# already working on some platforms. Other threadpools and fork handlers
# will be called at an arbitrary time later ('soon')
if hasattr(hub.threadpool, '_on_fork'):
hub.threadpool._on_fork()
# resolver_ares also has a fork watcher that's not firing
if hasattr(hub.resolver, '_on_fork'):
hub.resolver._on_fork()
def get_hub_class():
......@@ -637,7 +666,7 @@ def iwait(objects, timeout=None, count=None):
"""
Yield objects as they are ready, until all (or `count`) are ready or `timeout` expired.
:param objects: A list (supporting `len`) containing objects
:param objects: A sequence (supporting :func:`len`) containing objects
implementing the wait protocol (rawlink() and unlink()).
:param count: If not `None`, then a number specifying the maximum number
of objects to wait for.
......@@ -696,7 +725,7 @@ def wait(objects=None, timeout=None, count=None):
- all servers were stopped
- all event loop watchers were stopped.
If ``count`` is ``None`` (the default), wait for all ``object``s
If ``count`` is ``None`` (the default), wait for all ``objects``
to become ready.
If ``count`` is a number, wait for (up to) ``count`` objects to become
......
......@@ -126,112 +126,156 @@ affects what we see:
>>> del mydata
"""
from weakref import WeakKeyDictionary
from copy import copy
from weakref import ref
from contextlib import contextmanager
from gevent.hub import getcurrent, PYPY
from gevent.lock import RLock
__all__ = ["local"]
class _localbase(object):
__slots__ = '_local__args', '_local__lock', '_local__dicts'
class _wrefdict(dict):
"""A dict that can be weak referenced"""
class _localimpl(object):
"""A class managing thread-local dicts"""
__slots__ = 'key', 'dicts', 'localargs', 'locallock', '__weakref__'
def __init__(self):
# The key used in the Thread objects' attribute dicts.
# We keep it a string for speed but make it unlikely to clash with
# a "real" attribute.
self.key = '_threading_local._localimpl.' + str(id(self))
# { id(Thread) -> (ref(Thread), thread-local dict) }
self.dicts = _wrefdict()
def get_dict(self):
"""Return the dict for the current thread. Raises KeyError if none
defined."""
thread = getcurrent()
return self.dicts[id(thread)][1]
def create_dict(self):
"""Create a new dict for the current thread, and return it."""
localdict = {}
key = self.key
thread = getcurrent()
idt = id(thread)
# If we are working with a gevent.greenlet.Greenlet, we can
# pro-actively clear out with a link. Use rawlink to avoid
# spawning any more greenlets
try:
rawlink = thread.rawlink
except AttributeError:
# Otherwise we need to do it with weak refs
def local_deleted(_, key=key):
# When the localimpl is deleted, remove the thread attribute.
thread = wrthread()
if thread is not None:
del thread.__dict__[key]
def thread_deleted(_, idt=idt):
# When the thread is deleted, remove the local dict.
# Note that this is suboptimal if the thread object gets
# caught in a reference loop. We would like to be called
# as soon as the OS-level thread ends instead.
_local = wrlocal()
if _local is not None:
_local.dicts.pop(idt, None)
wrlocal = ref(self, local_deleted)
wrthread = ref(thread, thread_deleted)
thread.__dict__[key] = wrlocal
else:
wrdicts = ref(self.dicts)
def clear(_):
dicts = wrdicts()
if dicts:
dicts.pop(idt, None)
rawlink(clear)
wrthread = None
self.dicts[idt] = wrthread, localdict
return localdict
@contextmanager
def _patch(self):
impl = object.__getattribute__(self, '_local__impl')
orig_dct = object.__getattribute__(self, '__dict__')
try:
dct = impl.get_dict()
except KeyError:
# it's OK to acquire the lock here and not earlier, because the above code won't switch out
# however, subclassed __init__ might switch, so we do need to acquire the lock here
dct = impl.create_dict()
args, kw = impl.localargs
with impl.locallock:
self.__init__(*args, **kw)
with impl.locallock:
object.__setattr__(self, '__dict__', dct)
yield
object.__setattr__(self, '__dict__', orig_dct)
class local(object):
__slots__ = '_local__impl', '__dict__'
def __new__(cls, *args, **kw):
self = object.__new__(cls)
object.__setattr__(self, '_local__args', (args, kw))
object.__setattr__(self, '_local__lock', RLock())
dicts = WeakKeyDictionary()
object.__setattr__(self, '_local__dicts', dicts)
if args or kw:
if (PYPY and cls.__init__ == object.__init__) or (not PYPY and cls.__init__ is object.__init__):
raise TypeError("Initialization arguments are not supported")
# We need to create the greenlet dict in anticipation of
# __init__ being called, to make sure we don't call it again ourselves.
dict = object.__getattribute__(self, '__dict__')
dicts[getcurrent()] = dict
self = object.__new__(cls)
impl = _localimpl()
impl.localargs = (args, kw)
impl.locallock = RLock()
object.__setattr__(self, '_local__impl', impl)
# We need to create the thread dict in anticipation of
# __init__ being called, to make sure we don't call it
# again ourselves.
impl.create_dict()
return self
def _init_locals(self):
d = {}
dicts = object.__getattribute__(self, '_local__dicts')
dicts[getcurrent()] = d
object.__setattr__(self, '__dict__', d)
# we have a new instance dict, so call out __init__ if we have one
cls = type(self)
if cls.__init__ is not object.__init__:
args, kw = object.__getattribute__(self, '_local__args')
cls.__init__(self, *args, **kw)
class local(_localbase):
def __getattribute__(self, name):
d = object.__getattribute__(self, '_local__dicts').get(getcurrent())
if d is None:
# it's OK to acquire the lock here and not earlier, because the above code won't switch out
# however, subclassed __init__ might switch, so we do need to acquire the lock here
lock = object.__getattribute__(self, '_local__lock')
lock.acquire()
try:
_init_locals(self)
return object.__getattribute__(self, name)
finally:
lock.release()
else:
object.__setattr__(self, '__dict__', d)
with _patch(self):
return object.__getattribute__(self, name)
def __setattr__(self, name, value):
if name == '__dict__':
raise AttributeError("%r object attribute '__dict__' is read-only" % self.__class__.__name__)
d = object.__getattribute__(self, '_local__dicts').get(getcurrent())
if d is None:
lock = object.__getattribute__(self, '_local__lock')
lock.acquire()
try:
_init_locals(self)
return object.__setattr__(self, name, value)
finally:
lock.release()
else:
object.__setattr__(self, '__dict__', d)
raise AttributeError(
"%r object attribute '__dict__' is read-only"
% self.__class__.__name__)
with _patch(self):
return object.__setattr__(self, name, value)
def __delattr__(self, name):
if name == '__dict__':
raise AttributeError("%r object attribute '__dict__' is read-only" % self.__class__.__name__)
d = object.__getattribute__(self, '_local__dicts').get(getcurrent())
if d is None:
lock = object.__getattribute__(self, '_local__lock')
lock.acquire()
try:
_init_locals(self)
return object.__delattr__(self, name)
finally:
lock.release()
else:
object.__setattr__(self, '__dict__', d)
raise AttributeError(
"%r object attribute '__dict__' is read-only"
% self.__class__.__name__)
with _patch(self):
return object.__delattr__(self, name)
def __copy__(self):
currentId = getcurrent()
d = object.__getattribute__(self, '_local__dicts').get(currentId)
impl = object.__getattribute__(self, '_local__impl')
current = getcurrent()
currentId = id(current)
d = impl.get_dict()
duplicate = copy(d)
cls = type(self)
if cls.__init__ is not object.__init__:
args, kw = object.__getattribute__(self, '_local__args')
if (PYPY and cls.__init__ != object.__init__) or (not PYPY and cls.__init__ is not object.__init__):
args, kw = impl.localargs
instance = cls(*args, **kw)
else:
instance = cls()
object.__setattr__(instance, '_local__dicts', {
currentId: duplicate
})
new_impl = object.__getattribute__(instance, '_local__impl')
tpl = new_impl.dicts[currentId]
new_impl.dicts[currentId] = (tpl[0], duplicate)
return instance
......@@ -36,6 +36,7 @@ class Resolver(object):
return '<gevent.resolver_ares.Resolver at 0x%x ares=%r>' % (id(self), self.ares)
def _on_fork(self):
# NOTE: See comment in gevent.hub.reinit.
pid = os.getpid()
if pid != self.pid:
self.hub.loop.run_callback(self.ares.destroy)
......
......@@ -86,7 +86,8 @@ class ThreadPool(GroupMappingMixin):
def _on_fork(self):
# fork() only leaves one thread; also screws up locks;
# let's re-create locks and threads
# let's re-create locks and threads.
# NOTE: See comment in gevent.hub.reinit.
pid = os.getpid()
if pid != self.pid:
self.pid = pid
......
......@@ -257,7 +257,7 @@ class TestCaseMetaClass(type):
class TestCase(TestCaseMetaClass("NewBase", (BaseTestCase,), {})):
__timeout__ = 1
__timeout__ = 1 if not os.environ.get('TRAVIS') else 2 # Travis is slow and overloaded
switch_expected = 'default'
error_fatal = True
......
from __future__ import print_function
import gevent.monkey; gevent.monkey.patch_all()
import gevent
import os
hub = gevent.get_hub()
pid = os.getpid()
newpid = None
def on_fork():
global newpid
newpid = os.getpid()
fork_watcher = hub.loop.fork(ref=False)
fork_watcher.start(on_fork)
# fork watchers weren't firing in multi-threading processes.
def run():
# libev only calls fork callbacks at the beginning of
# the loop; we use callbacks extensively so it takes *two*
# calls to sleep (with a timer) to actually get wrapped
# around to the beginning of the loop.
gevent.sleep(0.01)
gevent.sleep(0.01)
q.put(newpid)
import multiprocessing
# Use a thread to make us multi-threaded
hub.threadpool.apply(lambda: None)
q = multiprocessing.Queue()
p = multiprocessing.Process(target=run)
p.start()
p_val = q.get()
p.join()
assert p_val is not None
assert p_val != pid
......@@ -2,6 +2,7 @@ import time
import greentest
import gevent
from gevent import pool
from gevent.timeout import Timeout
DELAY = 0.1
......@@ -121,6 +122,36 @@ class Test(greentest.TestCase):
s = pool.Group([p1, p2])
s.kill()
def test_killall_iterable_argument_non_block(self):
p1 = GreenletSubclass.spawn(lambda: gevent.sleep(0.5))
p2 = GreenletSubclass.spawn(lambda: gevent.sleep(0.5))
s = set()
s.add(p1)
s.add(p2)
gevent.killall(s, block=False)
gevent.sleep(0.5)
for g in s:
assert g.dead
def test_killall_iterable_argument_timeout(self):
def f():
try:
gevent.sleep(1.5)
except:
gevent.sleep(1)
p1 = GreenletSubclass.spawn(f)
p2 = GreenletSubclass.spawn(f)
s = set()
s.add(p1)
s.add(p2)
try:
gevent.killall(s, timeout=0.5)
except Timeout:
for g in s:
assert not g.dead
else:
self.fail("Should raise timeout")
class GreenletSubclass(gevent.Greenlet):
pass
......
import gevent.monkey
gevent.monkey.patch_all()
import socket
import multiprocessing
# Make sure that using the resolver in a forked process
# doesn't hang forever.
def block():
socket.getaddrinfo('localhost', 8001)
def main():
socket.getaddrinfo('localhost', 8001)
p = multiprocessing.Process(target=block)
p.start()
p.join()
if __name__ == '__main__':
main()
......@@ -21,6 +21,22 @@ class A(local):
class Obj(object):
pass
# These next two classes have to be global to avoid the leakchecks
deleted_sentinels = []
created_sentinels = []
class Sentinel(object):
def __del__(self):
deleted_sentinels.append(id(self))
class MyLocal(local):
def __init__(self):
local.__init__(self)
self.sentinel = Sentinel()
created_sentinels.append(id(self.sentinel))
class GeventLocalTestCase(greentest.TestCase):
......@@ -58,5 +74,36 @@ class GeventLocalTestCase(greentest.TestCase):
self.assertNotEqual(a.path, b.path, 'The values in the two objects must be different')
def test_locals_collected_when_greenlet_dead_but_still_referenced(self):
# https://github.com/gevent/gevent/issues/387
import gevent
my_local = MyLocal()
my_local.sentinel = None
if greentest.PYPY:
import gc
gc.collect()
del created_sentinels[:]
del deleted_sentinels[:]
def demonstrate_my_local():
# Get the important parts
getattr(my_local, 'sentinel')
# Create and reference greenlets
greenlets = [gevent.spawn(demonstrate_my_local) for _ in range(5)]
gevent.sleep()
self.assertEqual(len(created_sentinels), len(greenlets))
for g in greenlets:
assert g.dead
gevent.sleep() # let the callbacks run
if greentest.PYPY:
gc.collect()
# The sentinels should be gone too
self.assertEqual(len(deleted_sentinels), len(greenlets))
if __name__ == '__main__':
greentest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment