Commit e6a107c5 authored by David Wilson's avatar David Wilson

core: replace Queue with Latch

On Python 2.x, operations on pthread objects with a timeout set actually
cause internal polling. When polling fails to yield a positive result,
it quickly backs off to a 50ms loop, which results in a huge amount of
latency throughout.

Instead, give up using Queue.Queue.get(timeout=...) and replace it with
the UNIX self-pipe trick. Knocks another 45% off my.yml in the Ansible
examples directory against a local VM.

This has the potential to burn a *lot* of file descriptors, but hell,
it's not the 1940s any more, RAM is all but infinite. I can live with
that.

This gets things down to around 75ms per playbook step, still hunting
for additional sources of latency.
parent 27f1eddb
...@@ -52,7 +52,6 @@ class ContextProxyService(mitogen.service.Service): ...@@ -52,7 +52,6 @@ class ContextProxyService(mitogen.service.Service):
return isinstance(args, dict) return isinstance(args, dict)
def dispatch(self, dct, msg): def dispatch(self, dct, msg):
print dct.get('via')
key = repr(sorted(dct.items())) key = repr(sorted(dct.items()))
if key not in self._context_by_id: if key not in self._context_by_id:
method = getattr(self.router, dct.pop('method')) method = getattr(self.router, dct.pop('method'))
......
...@@ -318,28 +318,6 @@ class Sender(object): ...@@ -318,28 +318,6 @@ class Sender(object):
) )
def _queue_interruptible_get(queue, timeout=None, block=True):
# bool is subclass of int, cannot use isinstance!
assert timeout is None or type(timeout) in (int, long, float)
assert isinstance(block, bool)
if timeout is not None:
timeout += time.time()
msg = None
while msg is None and (timeout is None or timeout > time.time()):
try:
msg = queue.get(block, 0.5)
except Queue.Empty:
if not block:
break
if msg is None:
raise TimeoutError('deadline exceeded.')
return msg
class Receiver(object): class Receiver(object):
notify = None notify = None
raise_channelerror = True raise_channelerror = True
...@@ -350,6 +328,7 @@ class Receiver(object): ...@@ -350,6 +328,7 @@ class Receiver(object):
self.handle = router.add_handler(self._on_receive, handle, self.handle = router.add_handler(self._on_receive, handle,
persist, respondent) persist, respondent)
self._queue = Queue.Queue() self._queue = Queue.Queue()
self._latch = Latch()
def __repr__(self): def __repr__(self):
return 'Receiver(%r, %r)' % (self.router, self.handle) return 'Receiver(%r, %r)' % (self.router, self.handle)
...@@ -358,6 +337,7 @@ class Receiver(object): ...@@ -358,6 +337,7 @@ class Receiver(object):
"""Callback from the Stream; appends data to the internal queue.""" """Callback from the Stream; appends data to the internal queue."""
IOLOG.debug('%r._on_receive(%r)', self, msg) IOLOG.debug('%r._on_receive(%r)', self, msg)
self._queue.put(msg) self._queue.put(msg)
self._latch.wake()
if self.notify: if self.notify:
self.notify(self) self.notify(self)
...@@ -369,9 +349,9 @@ class Receiver(object): ...@@ -369,9 +349,9 @@ class Receiver(object):
def get(self, timeout=None, block=True): def get(self, timeout=None, block=True):
IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block)
self._latch.wait(timeout=timeout)
msg = _queue_interruptible_get(self._queue, timeout, block=block) msg = self._queue.get()
IOLOG.debug('%r.get() got %r', self, msg) #IOLOG.debug('%r.get() got %r', self, msg)
if msg == _DEAD: if msg == _DEAD:
raise ChannelError(ChannelError.local_msg) raise ChannelError(ChannelError.local_msg)
...@@ -807,7 +787,44 @@ def _unpickle_context(router, context_id, name): ...@@ -807,7 +787,44 @@ def _unpickle_context(router, context_id, name):
return router.context_class(router, context_id, name) return router.context_class(router, context_id, name)
class Waker(BasicStream): class Latch(object):
def __init__(self):
rfd, wfd = os.pipe()
set_cloexec(rfd)
set_cloexec(wfd)
self.receive_side = Side(self, rfd)
self.transmit_side = Side(self, wfd)
def close(self):
self.receive_side.close()
self.transmit_side.close()
__del__ = close
def wait(self, timeout=None):
while True:
rfds, _, _ = select.select([self.receive_side], [], [], timeout)
if not rfds:
return False
try:
self.receive_side.read(1)
except OSError, e:
if e[0] == errno.EWOULDBLOCK:
continue
raise
return False
def wake(self):
IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd)
try:
self.transmit_side.write(' ')
except OSError, e:
if e[0] != errno.EBADF:
raise
class Waker(Latch, BasicStream):
""" """
:py:class:`BasicStream` subclass implementing the :py:class:`BasicStream` subclass implementing the
`UNIX self-pipe trick`_. Used internally to wake the IO multiplexer when `UNIX self-pipe trick`_. Used internally to wake the IO multiplexer when
...@@ -816,34 +833,25 @@ class Waker(BasicStream): ...@@ -816,34 +833,25 @@ class Waker(BasicStream):
.. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html .. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html
""" """
def __init__(self, broker): def __init__(self, broker):
super(Waker, self).__init__()
self._broker = broker self._broker = broker
rfd, wfd = os.pipe()
set_cloexec(rfd)
set_cloexec(wfd)
self.receive_side = Side(self, rfd)
self.transmit_side = Side(self, wfd)
def __repr__(self): def __repr__(self):
return 'Waker(%r)' % (self._broker,) return 'Waker(%r)' % (self._broker,)
def on_receive(self, broker):
"""
Read a byte from the self-pipe.
"""
self.receive_side.read(256)
def wake(self): def wake(self):
""" """
Write a byte to the self-pipe, causing the IO multiplexer to wake up. Write a byte to the self-pipe, causing the IO multiplexer to wake up.
Nothing is written if the current thread is the IO multiplexer thread. Nothing is written if the current thread is the IO multiplexer thread.
""" """
IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd)
if threading.currentThread() != self._broker._thread: if threading.currentThread() != self._broker._thread:
try: super(Waker, self).wake()
self.transmit_side.write(' ')
except OSError, e:
if e[0] != errno.EBADF:
raise
def on_receive(self, broker):
"""
Read a byte from the self-pipe.
"""
self.receive_side.read(256)
class IoLogger(BasicStream): class IoLogger(BasicStream):
......
...@@ -127,11 +127,13 @@ class Select(object): ...@@ -127,11 +127,13 @@ class Select(object):
self._receivers = [] self._receivers = []
self._oneshot = oneshot self._oneshot = oneshot
self._queue = Queue.Queue() self._queue = Queue.Queue()
self._latch = mitogen.core.Latch()
for recv in receivers: for recv in receivers:
self.add(recv) self.add(recv)
def _put(self, value): def _put(self, value):
self._queue.put(value) self._queue.put(value)
self._latch.wake()
if self.notify: if self.notify:
self.notify(self) self.notify(self)
...@@ -200,7 +202,8 @@ class Select(object): ...@@ -200,7 +202,8 @@ class Select(object):
raise SelectError(self.empty_msg) raise SelectError(self.empty_msg)
while True: while True:
recv = mitogen.core._queue_interruptible_get(self._queue, timeout) self._latch.wait()
recv = self._queue.get()
try: try:
msg = recv.get(block=False) msg = recv.get(block=False)
if self._oneshot: if self._oneshot:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment