Commit 54df0039 authored by Jason Madden's avatar Jason Madden

Optimize GeventSelector for large numbers of FDs that aren't ready at the same time.

Similar to ideas in https://github.com/gevent/gevent/pull/1523
parent a51e0937
Add ``gevent.selectors`` containing ``GeventSelector``.
Add ``gevent.selectors`` containing ``GeventSelector``. This selector
implementation uses gevent details to attempt to reduce overhead when
polling many file descriptors, only some of which become ready at any
given time.
This is monkey-patched as ``selectors.DefaultSelector`` by default.
......
......@@ -475,8 +475,8 @@ elif hasattr(__socket__, 'socketpair'):
# cooperatively automatically if we're monkey-patched,
# else we must do it ourself.
_orig_socketpair = __socket__.socketpair
def socketpair(*args, **kwargs):
one, two = _orig_socketpair(*args, **kwargs)
def socketpair(family=_socket.AF_INET, type=_socket.SOCK_STREAM, proto=0):
one, two = _orig_socketpair(family, type, proto)
if not isinstance(one, socket):
one = socket(_sock=one)
two = socket(_sock=two)
......
......@@ -14,7 +14,8 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
from collections import defaultdict
try:
import selectors as __selectors__
except ImportError:
......@@ -22,12 +23,16 @@ except ImportError:
import selectors2 as __selectors__
__target__ = 'selectors2'
from gevent.hub import _get_hub_noargs as get_hub
from gevent import sleep
from gevent._compat import iteritems
from gevent._compat import itervalues
from gevent._util import copy_globals
from gevent._util import Lazy
from gevent.select import poll as Poll
from gevent.select import POLLIN
from gevent.select import POLLOUT
from gevent.event import Event
from gevent.select import _EV_READ
from gevent.select import _EV_WRITE
__implements__ = [
'DefaultSelector',
......@@ -44,7 +49,7 @@ __imports__ = copy_globals(
dunder_names_to_keep=('__all__',)
)
_POLL_ALL = POLLIN | POLLOUT
_POLL_ALL = _EV_READ | _EV_WRITE
EVENT_READ = __selectors__.EVENT_READ
EVENT_WRITE = __selectors__.EVENT_WRITE
......@@ -64,23 +69,69 @@ _BaseSelectorImpl = getattr(
class GeventSelector(_BaseSelectorImpl):
"""
A selector implementation using gevent primitives.
This is a type of :class:`selectors.BaseSelector`, so the documentation
for that class applies here.
.. caution::
As the base class indicates, it is critically important to
unregister file objects before closing them. (Or close the selector
they are registered with before closing them.) Failure to do so
may crash the process or have other unintended results.
"""
def __init__(self):
self._poll = Poll()
self._poll._get_started_watchers = self._get_started_watchers
# Notes on the approach:
#
# It's easy to wrap a selector implementation around
# ``gevent.select.poll``; in fact that's what happens by default
# when monkey-patching in Python 3. But the problem with that is
# each call to ``selector.select()`` will result in creating and
# then destroying new kernel-level polling resources, as nothing
# in ``gevent.select`` can keep watchers around (because the underlying
# file could be closed at any time). This ends up producing a large
# number of syscalls that are unnecessary.
#
# So here, we take advantage of the fact that it is documented and
# required that files not be closed while they are registered.
# This lets us persist watchers. Indeed, it lets us continually
# accrue events in the background before a call to ``select()`` is even
# made. We can take advantage of this to return results immediately, without
# a syscall, if we have them.
#
# We create watchers in ``register()`` and destroy them in
# ``unregister()``. They do not get started until the first call
# to ``select()``, though. Once they are started, they don't get
# stopped until they deliver an event.
# Lifecycle:
# register() -> inactive_watchers
# select() -> inactive_watchers -> active_watchers;
# active_watchers -> inactive_watchers
def __init__(self, hub=None):
if hub is not None:
self.hub = hub
# {fd: watcher}
self._watchers = {}
self._active_watchers = {}
self._inactive_watchers = {}
# {fd: EVENT_READ|EVENT_WRITE}
self._accumulated_events = defaultdict(int)
self._ready = Event()
super(GeventSelector, self).__init__()
def _get_started_watchers(self, watcher_cb):
for fd, watcher in iteritems(self._watchers):
watcher.start(watcher_cb, fd, pass_events=True)
return list(self._watchers.values())
def __callback(self, events, fd):
if events > 0:
cur_event_for_fd = self._accumulated_events[fd]
if events & _EV_READ:
cur_event_for_fd |= EVENT_READ
if events & _EV_WRITE:
cur_event_for_fd |= EVENT_WRITE
self._accumulated_events[fd] = cur_event_for_fd
@property
def loop(self):
return self._poll.loop
self._ready.set()
@Lazy
def hub(self): # pylint:disable=method-hidden
return get_hub()
def register(self, fileobj, events, data=None):
key = _BaseSelectorImpl.register(self, fileobj, events, data)
......@@ -88,24 +139,28 @@ class GeventSelector(_BaseSelectorImpl):
if events == _ALL_EVENTS:
flags = _POLL_ALL
elif events == EVENT_READ:
flags = POLLIN
flags = _EV_READ
else:
flags = POLLOUT
flags = _EV_WRITE
self._poll.register(key.fd, flags)
loop = self.loop
loop = self.hub.loop
io = loop.io
MAXPRI = loop.MAXPRI
self._watchers[key.fd] = watcher = io(key.fd, self._poll.fds[key.fd])
self._inactive_watchers[key.fd] = watcher = io(key.fd, flags)
watcher.priority = MAXPRI
return key
def unregister(self, fileobj):
key = _BaseSelectorImpl.unregister(self, fileobj)
self._poll.unregister(key.fd)
self._watchers.pop(key.fd)
if key.fd in self._active_watchers:
watcher = self._active_watchers.pop(key.fd)
else:
watcher = self._inactive_watchers.pop(key.fd)
watcher.stop()
watcher.close()
self._accumulated_events.pop(key.fd, None)
return key
# XXX: Can we implement ``modify`` more efficiently than
......@@ -114,75 +169,77 @@ class GeventSelector(_BaseSelectorImpl):
# do that.
def select(self, timeout=None):
# In https://github.com/gevent/gevent/pull/1523/, it was
# proposed to (essentially) keep the watchers started even
# after the select() call returned *if* the watcher hadn't fired.
# (If it fired, it was stopped). Watchers were started as soon as they
# were registered.
#
# The goal was to minimize the amount of time spent adjusting the
# underlying kernel (epoll) data structures as watchers are started and
# stopped. Events were just collected continually in the background
# in the hopes that they would be retrieved by a future call to
# ```select()``. This method used an ``Event`` to communicate with
# the background ongoing collection of results.
#
# That becomes a problem if the file descriptor is closed while the watcher
# is still active. Certain backends will crash in that case.
# However, the selectors documentation says that files must be
# unregistered before closing, so that's theoretically not a concern
# here.
#
# Also, stopping the watchers if they fired here was said to be
# because "if we did not, someone could call, e.g., gevent.time.sleep and
# any unconsumed bytes on our watched fd would prevent the process from
# sleeping correctly." It's not clear to me (JAM) why that would be the case
# only in the ``select`` method, and not after the watcher was started in
# ``register()``. Actually, it's not clear why it would be a problem at any
# point.
"""
Poll for I/O.
Note that, like the built-in selectors, this will block
indefinitely if no timeout is given and no files have been
registered.
"""
# timeout > 0 : block seconds
# timeout <= 0 : No blocking.
# timeout = None: Block forever
#
# Meanwhile, for poll():
# timeout None: block forever
# timeout omitted: block forever
# timeout < 0: block forever
# timeout anything else: block that long in *milliseconds*
if timeout is not None:
if timeout <= 0:
# Asked not to block.
# Event.wait doesn't deal with negative values
if timeout is not None and timeout < 0:
timeout = 0
else:
# Convert seconds to ms.
# poll() has a resolution of 1 millisecond, round away from
# zero to wait *at least* timeout seconds.
timeout = math.ceil(timeout * 1e3)
poll_events = self._poll.poll(timeout)
# Start any watchers that need started. Note that they may
# not actually get a chance to do anything yet if we already had
# events set.
for fd, watcher in iteritems(self._inactive_watchers):
watcher.start(self.__callback, fd, pass_events=True)
self._active_watchers.update(self._inactive_watchers)
self._inactive_watchers.clear()
# The _ready event is either already set (in which case
# there are some results waiting in _accumulated_events) or
# not set, in which case we have to block. But to make the two cases
# behave the same, we will always yield to the event loop.
if self._ready.is_set():
sleep()
self._ready.wait(timeout)
self._ready.clear()
# TODO: If we have nothing ready, but they ask us not to block,
# should we make an effort to actually spin the event loop and let
# it check for events?
result = []
for fd, event in poll_events:
for fd, event in iteritems(self._accumulated_events):
key = self._key_from_fd(fd)
if not key:
continue
watcher = self._active_watchers.pop(fd)
events = 0
if event & POLLOUT:
events |= EVENT_WRITE
if event & POLLIN:
events |= EVENT_READ
## The below is taken without comment from
## https://github.com/gevent/gevent/pull/1523/files and
## hasn't been checked:
#
# Since we are emulating an epoll object within another epoll object,
# once a watcher has fired, we must deactivate it until poll is called
# next. If we did not, someone else could call, e.g., gevent.time.sleep
# and any unconsumed bytes on our watched fd would prevent the process
# from sleeping correctly.
watcher.stop()
if key:
result.append((key, event & key.events))
self._inactive_watchers[fd] = watcher
else: # pragma: no cover
# If the key was gone, then somehow we've been unregistered.
# Don't put it back in inactive, close it.
watcher.close()
result.append((key, events & key.events))
self._accumulated_events.clear()
return result
def close(self):
self._poll = None # Nothing to do, just drop it
for watcher in self._watchers.values() if self._watchers else ():
for d in self._active_watchers, self._inactive_watchers:
if d is None:
continue # already closed
for watcher in itervalues(d):
watcher.stop()
watcher.close()
self._watchers = None
self._active_watchers = self._inactive_watchers = None
self._accumulated_events = None
self.hub = None
_BaseSelectorImpl.close(self)
......@@ -246,4 +303,5 @@ def _gevent_do_monkey_patch(patch_request):
# (importing 'platform' does it: platform -> subprocess -> selectors),
# so we need to clean that up.
if hasattr(selectors, 'PollSelector') and hasattr(selectors.PollSelector, '_selector_cls'):
selectors.PollSelector._selector_cls = Poll
from gevent.select import poll
selectors.PollSelector._selector_cls = poll
......@@ -10,10 +10,10 @@ import gevent.testing as greentest
class SelectorTestMixin(object):
@staticmethod
def run_selector_once(sel):
def run_selector_once(sel, timeout=3):
# Run in a background greenlet, leaving the main
# greenlet free to send data.
events = sel.select(timeout=3)
events = sel.select(timeout=timeout)
for key, mask in events:
key.data(sel, key.fileobj, mask)
gevent.sleep()
......@@ -56,8 +56,12 @@ class GeventSelectorTest(SelectorTestMixin,
self._check_selector(sel)
def test_select_many_sockets(self):
try:
AF_UNIX = socket.AF_UNIX
except AttributeError:
AF_UNIX = None
pairs = [socket.socketpair() for _ in range(10)]
clients = [s[1] for s in pairs]
try:
server_sel = selectors.GeventSelector()
......@@ -68,12 +72,22 @@ class GeventSelectorTest(SelectorTestMixin,
self.read_from_ready_socket_and_reply)
client_sel.register(client, selectors.EVENT_READ, i)
# Prime them all to be ready at once.
for i, client in enumerate(clients):
data = str(i).encode('ascii')
client.send(data)
# Read and reply to all the clients
self.run_selector_once(server_sel)
# Read and reply to all the clients..
# Everyone should be ready, so we ask not to block.
# The call to gevent.idle() is there to make sure that
# all event loop implementations (looking at you, libuv)
# get a chance to poll for IO. Without it, libuv
# doesn't find any results here.
# Not blocking only works for AF_UNIX sockets, though.
# If we got AF_INET (Windows) the data may need some time to
# traverse through the layers.
gevent.idle()
self.run_selector_once(
server_sel,
timeout=-1 if pairs[0][0].family == AF_UNIX else 3)
found = 0
for key, _ in client_sel.select(timeout=3):
......
......@@ -308,6 +308,7 @@ class TestDefaultSpawn(TestCase):
with self.assertRaises(TypeError):
self.ServerClass(self.get_listener(), backlog=25)
@greentest.skipOnLibuvOnCIOnPyPy("Sometimes times out")
def test_backlog_is_accepted_for_address(self):
self.server = self.ServerSubClass((greentest.DEFAULT_BIND_ADDR, 0), backlog=25)
self.assertConnectionRefused()
......
......@@ -743,8 +743,9 @@ else:
future = self._threadpool.spawn(fn, *args, **kwargs)
return _FutureProxy(future)
def shutdown(self, wait=True):
super(ThreadPoolExecutor, self).shutdown(wait)
def shutdown(self, wait=True, **kwargs): # pylint:disable=arguments-differ
# In 3.9, this added ``cancel_futures=False``
super(ThreadPoolExecutor, self).shutdown(wait, **kwargs)
# XXX: We don't implement wait properly
kill = getattr(self._threadpool, 'kill', None)
if kill: # pylint:disable=using-constant-test
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment