Commit 48efe66c authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1081 from gevent/issue1072

Try to adhere to scheduling deadlines when running callbacks

[skip ci]
parents a609eaca 2aaf265f
......@@ -160,6 +160,16 @@ Other Changes
- gevent now uses cffi's "extern 'Python'" callbacks. These should be
faster and more stable. This requires at least cffi 1.4.0. See :issue:`1049`.
- gevent now approximately tries to stick to a scheduling interval
when running callbacks, instead of simply running a count of
callbacks. The interval is determined by
:func:`gevent.getswitchinterval`. On Python 3, this is the same as
the thread switch interval. On Python 2, this defaults to 0.005s and
can be changed with :func:`gevent.setswitchinterval`. This should
result in more fair "scheduling" of greenlets, especially when
``gevent.sleep(0)`` or other busy callbacks are in use. The interval
is checked every 50 callbacks to keep overhead low. See
:issue:`1072`. With thanks to Arcadiy Ivanov and Antonio Cuni.
libuv
-----
......@@ -239,6 +249,8 @@ libuv
libev has also been changed to follow this behaviour.
Also see :issue:`1072`.
- Timers of zero duration do not necessarily cause the event loop to
cycle, as they do in libev. Instead, they may be called
immediately. If zero duration timers are added from other zero
......
......@@ -22,24 +22,28 @@ version_info = _version_info(1, 3, 0, 'dev', 0)
__version__ = '1.3.0.dev0'
__all__ = ['get_hub',
'Greenlet',
'GreenletExit',
'spawn',
'spawn_later',
'spawn_raw',
'iwait',
'wait',
'killall',
'Timeout',
'with_timeout',
'getcurrent',
'sleep',
'idle',
'kill',
'signal',
'fork',
'reinit']
__all__ = [
'get_hub',
'Greenlet',
'GreenletExit',
'spawn',
'spawn_later',
'spawn_raw',
'iwait',
'wait',
'killall',
'Timeout',
'with_timeout',
'getcurrent',
'sleep',
'idle',
'kill',
'signal',
'fork',
'reinit',
'getswitchinterval',
'setswitchinterval',
]
import sys
......@@ -48,6 +52,26 @@ if sys.platform == 'win32':
import socket # pylint:disable=unused-import,useless-suppression
del socket
try:
# Floating point number, in number of seconds,
# like time.time
getswitchinterval = sys.getswitchinterval
setswitchinterval = sys.setswitchinterval
except AttributeError:
# Running on Python 2
_switchinterval = 0.005
def getswitchinterval():
return _switchinterval
def setswitchinterval(interval):
# Weed out None and non-numbers. This is not
# exactly exception compatible with the Python 3
# versions.
if interval > 0:
global _switchinterval
_switchinterval = interval
from gevent.hub import get_hub, iwait, wait
from gevent.greenlet import Greenlet, joinall, killall
joinall = joinall # export for pylint
......
......@@ -3,6 +3,8 @@ Basic loop implementation for ffi-based cores.
"""
# pylint: disable=too-many-lines, protected-access, redefined-outer-name, not-callable
from __future__ import absolute_import, print_function
from collections import deque
import sys
import os
import traceback
......@@ -13,6 +15,8 @@ from gevent._ffi import TRACE
from gevent._ffi import CRITICAL
from gevent._ffi.callback import callback
from gevent import getswitchinterval
__all__ = [
'AbstractLoop',
'assign_standard_callbacks',
......@@ -292,6 +296,7 @@ _default_loop_destroyed = False
_NOARGS = ()
CALLBACK_CHECK_COUNT = 50
class AbstractLoop(object):
# pylint:disable=too-many-public-methods,too-many-instance-attributes
......@@ -305,6 +310,8 @@ class AbstractLoop(object):
_PREPARE_POINTER = None
starting_timer_may_update_loop_time = False
def __init__(self, ffi, lib, watchers, flags=None, default=None):
self._ffi = ffi
self._lib = lib
......@@ -312,7 +319,7 @@ class AbstractLoop(object):
self._handle_to_self = self._ffi.new_handle(self) # XXX: Reference cycle?
self._watchers = watchers
self._in_callback = False
self._callbacks = []
self._callbacks = deque()
# Stores python watcher objects while they are started
self._keepaliveset = set()
self._init_loop_and_aux_watchers(flags, default)
......@@ -379,24 +386,34 @@ class AbstractLoop(object):
def _check_callback_handle_error(self, t, v, tb):
self.handle_error(None, t, v, tb)
def _run_callbacks(self):
# libuv and libev have different signatures for their prepare/check/timer
# watchers, which is what calls this. We should probably have different methods.
count = 1000
self._stop_callback_timer()
while self._callbacks and count > 0:
callbacks = self._callbacks
self._callbacks = []
for cb in callbacks:
def _run_callbacks(self): # pylint:disable=too-many-branches
# When we're running callbacks, its safe for timers to
# update the notion of the current time (because if we're here,
# we're not running in a timer callback that may let other timers
# run; this is mostly an issue for libuv).
# That's actually a bit of a lie: on libev, self._timer0 really is
# a timer, and so sometimes this is running in a timer callback, not
# a prepare callback. But that's OK, libev doesn't suffer from cascading
# timer expiration and its safe to update the loop time at any
# moment there.
self.starting_timer_may_update_loop_time = True
try:
count = CALLBACK_CHECK_COUNT
now = self.now()
expiration = now + getswitchinterval()
self._stop_callback_timer()
while self._callbacks:
cb = self._callbacks.popleft()
count -= 1
self.unref() # XXX: libuv doesn't have a global ref count!
callback = cb.callback
cb.callback = None
args = cb.args
if callback is None or args is None:
# it's been stopped
continue
cb.callback = None
try:
callback(*args)
except: # pylint:disable=bare-except
......@@ -428,9 +445,26 @@ class AbstractLoop(object):
# the callback class so that bool(cb) of a callback that has been run
# becomes False
cb.args = None
count -= 1
if self._callbacks:
self._start_callback_timer()
# We've finished running one group of callbacks
# but we may have more, so before looping check our
# switch interval.
if count == 0 and self._callbacks:
count = CALLBACK_CHECK_COUNT
self.update_now()
if self.now() >= expiration:
now = 0
break
# Update the time before we start going again, if we didn't
# just do so.
if now != 0:
self.update_now()
if self._callbacks:
self._start_callback_timer()
finally:
self.starting_timer_may_update_loop_time = False
def _stop_aux_watchers(self):
raise NotImplementedError()
......@@ -507,7 +541,13 @@ class AbstractLoop(object):
pass
def now(self):
"Return the loop's notion of the current time."
"""
Return the loop's notion of the current time.
This may not necessarily be related to :func:`time.time` (it
may have a different starting point), but it must be expressed
in fractional seconds (the same *units* used by :func:`time.time`).
"""
raise NotImplementedError()
def update_now(self):
......
......@@ -499,8 +499,6 @@ class IoMixin(object):
class TimerMixin(object):
_watcher_type = 'timer'
update_loop_time_on_start = False
def __init__(self, loop, after=0.0, repeat=0.0, ref=True, priority=None):
if repeat < 0.0:
raise ValueError("repeat must be positive or zero: %r" % repeat)
......@@ -509,7 +507,7 @@ class TimerMixin(object):
super(TimerMixin, self).__init__(loop, ref=ref, priority=priority, args=(after, repeat))
def start(self, callback, *args, **kw):
update = kw.get("update", self.update_loop_time_on_start)
update = kw.get("update", self.loop.starting_timer_may_update_loop_time)
if update:
# Quoth the libev doc: "This is a costly operation and is
# usually done automatically within ev_run(). This
......@@ -517,9 +515,12 @@ class TimerMixin(object):
# runs for a very long time without entering the event
# loop, updating libev's idea of the current time is a
# good idea."
# 1.3 changed the default for this to False. Note that
# 1.3 changed the default for this to False *unless* the loop is
# running a callback; see libuv for details. Note that
# starting Timeout objects internally still sets this to true.
self.loop.update()
self.loop.update_now()
super(TimerMixin, self).start(callback, *args)
def again(self, callback, *args, **kw):
......
......@@ -157,6 +157,10 @@ def sleep(seconds=0, ref=True):
If *ref* is False, the greenlet running ``sleep()`` will not prevent :func:`gevent.wait`
from exiting.
.. versionchanged:: 1.3a1
Sleeping with a value of 0 will now be bounded to approximately block the
loop for no longer than :func:`gevent.getswitchinterval`.
.. seealso:: :func:`idle`
"""
hub = get_hub()
......
This diff is collapsed.
......@@ -199,17 +199,6 @@ _default_loop_destroyed = False
from gevent._ffi.loop import AbstractLoop
# from gevent.libev.watcher import watcher
# from gevent.libev.watcher import io
# from gevent.libev.watcher import timer
# from gevent.libev.watcher import signal
# from gevent.libev.watcher import idle
# from gevent.libev.watcher import prepare
# from gevent.libev.watcher import check
# from gevent.libev.watcher import fork
# from gevent.libev.watcher import async
# from gevent.libev.watcher import child
# from gevent.libev.watcher import stat
from gevent.libev import watcher as _watchers
_events_to_str = _watchers._events_to_str # exported
......
......@@ -146,7 +146,9 @@ cdef extern from "libev.h" nogil:
unsigned int ev_recommended_backends()
unsigned int ev_embeddable_backends()
double ev_time()
ctypedef double ev_tstamp
ev_tstamp ev_time()
void ev_set_syserr_cb(void *)
int ev_priority(void*)
......@@ -209,7 +211,7 @@ cdef extern from "libev.h" nogil:
void ev_verify(ev_loop*)
void ev_run(ev_loop*, int flags) nogil
double ev_now(ev_loop*)
ev_tstamp ev_now(ev_loop*)
void ev_now_update(ev_loop*)
void ev_ref(ev_loop*)
......
......@@ -177,9 +177,6 @@ class io(_base.IoMixin, watcher):
class timer(_base.TimerMixin, watcher):
def _update_now(self):
libev.ev_now_update(self.loop._ptr)
@property
def at(self):
return self._watcher.at
......
......@@ -382,7 +382,11 @@ class loop(AbstractLoop):
return libuv.uv_run(self._ptr, mode)
def now(self):
return libuv.uv_now(self._ptr)
# libuv's now is expressed as an integer number of
# milliseconds, so to get it compatible with time.time units
# that this method is supposed to return, we have to divide by 1000.0
now = libuv.uv_now(self._ptr)
return now / 1000.0
def update_now(self):
libuv.uv_update_time(self._ptr)
......
......@@ -618,8 +618,6 @@ class timer(_base.TimerMixin, watcher):
# This ensures the loop cycles. Of course, the 'again' method does
# nothing on them and doesn't exist. In practice that's not an issue.
update_loop_time_on_start = False
_again = False
def _watcher_ffi_init(self, args):
......
......@@ -5,6 +5,7 @@ import gevent
from gevent import os
from greentest import TestCase, main, LARGE_TIMEOUT
from gevent import Greenlet, joinall
from greentest.skipping import skipOnLibuvOnPyPyOnWin
class TestOS_tp(TestCase):
......@@ -20,6 +21,7 @@ class TestOS_tp(TestCase):
def write(self, *args):
return os.tp_write(*args)
@skipOnLibuvOnPyPyOnWin("Sometimes times out")
def _test_if_pipe_blocks(self, buffer_class):
r, w = self.pipe()
# set nbytes such that for sure it is > maximum pipe buffer
......
......@@ -295,6 +295,15 @@ def final_sleep():
TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.082, 0.035, 0.14
SMALL_RANGE = 10
LARGE_RANGE = 1000
if greentest.PYPY and greentest.WIN:
# See comments in test__threadpool.py.
LARGE_RANGE = 50
elif greentest.RUNNING_ON_CI or greentest.EXPECT_POOR_TIMER_RESOLUTION:
LARGE_RANGE = 100
class TestPool(greentest.TestCase):
__timeout__ = greentest.LARGE_TIMEOUT
size = 1
......@@ -313,7 +322,7 @@ class TestPool(greentest.TestCase):
def test_map(self):
pmap = self.pool.map
self.assertEqual(pmap(sqr, range(10)), list(map(squared, range(10))))
self.assertEqual(pmap(sqr, range(SMALL_RANGE)), list(map(squared, range(SMALL_RANGE))))
self.assertEqual(pmap(sqr, range(100)), list(map(squared, range(100))))
def test_async(self):
......@@ -328,7 +337,7 @@ class TestPool(greentest.TestCase):
get = TimingWrapper(res.get)
self.assertEqual(get(), 49)
self.assertTimeoutAlmostEqual(get.elapsed, TIMEOUT1, 1)
gevent.sleep(0) # let's the callback run
gevent.sleep(0) # lets the callback run
assert result == [49], result
def test_async_timeout(self):
......@@ -338,34 +347,36 @@ class TestPool(greentest.TestCase):
self.assertTimeoutAlmostEqual(get.elapsed, TIMEOUT2, 1)
self.pool.join()
def test_imap(self):
it = self.pool.imap(sqr, range(10))
self.assertEqual(list(it), list(map(squared, range(10))))
def test_imap_list_small(self):
it = self.pool.imap(sqr, range(SMALL_RANGE))
self.assertEqual(list(it), list(map(sqr, range(SMALL_RANGE))))
it = self.pool.imap(sqr, range(10))
for i in range(10):
def test_imap_it_small(self):
it = self.pool.imap(sqr, range(SMALL_RANGE))
for i in range(SMALL_RANGE):
self.assertEqual(six.advance_iterator(it), i * i)
self.assertRaises(StopIteration, lambda: six.advance_iterator(it))
it = self.pool.imap(sqr, range(1000))
for i in range(1000):
def test_imap_it_large(self):
it = self.pool.imap(sqr, range(LARGE_RANGE))
for i in range(LARGE_RANGE):
self.assertEqual(six.advance_iterator(it), i * i)
self.assertRaises(StopIteration, lambda: six.advance_iterator(it))
def test_imap_random(self):
it = self.pool.imap(sqr_random_sleep, range(10))
self.assertEqual(list(it), list(map(squared, range(10))))
it = self.pool.imap(sqr_random_sleep, range(SMALL_RANGE))
self.assertEqual(list(it), list(map(squared, range(SMALL_RANGE))))
def test_imap_unordered(self):
it = self.pool.imap_unordered(sqr, range(1000))
self.assertEqual(sorted(it), list(map(squared, range(1000))))
it = self.pool.imap_unordered(sqr, range(LARGE_RANGE))
self.assertEqual(sorted(it), list(map(squared, range(LARGE_RANGE))))
it = self.pool.imap_unordered(sqr, range(1000))
self.assertEqual(sorted(it), list(map(squared, range(1000))))
it = self.pool.imap_unordered(sqr, range(LARGE_RANGE))
self.assertEqual(sorted(it), list(map(squared, range(LARGE_RANGE))))
def test_imap_unordered_random(self):
it = self.pool.imap_unordered(sqr_random_sleep, range(10))
self.assertEqual(sorted(it), list(map(squared, range(10))))
it = self.pool.imap_unordered(sqr_random_sleep, range(SMALL_RANGE))
self.assertEqual(sorted(it), list(map(squared, range(SMALL_RANGE))))
def test_empty(self):
it = self.pool.imap_unordered(sqr, [])
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment