Commit 7d70244f authored by Jim Fulton's avatar Jim Fulton

Avoid waiting on futures using timeouts

On Python 2, waiting on a future result with a timeout is very
expensive. (Because conditions with timeouts are very expensive.)

The thing is, we still want to use timeouts when we're disconnected.
We don't want requests to fail right away of the disconnection is
short.

We do a bit of a dance to only use a timeout if we're disconnected.
parent e5318884
...@@ -4,8 +4,11 @@ Changelog ...@@ -4,8 +4,11 @@ Changelog
5.0.2 (unreleased) 5.0.2 (unreleased)
------------------ ------------------
- Provide much better performance on Python 2.
- Provide better error messages when pip tries to install ZEO on an - Provide better error messages when pip tries to install ZEO on an
unsupported Python version. See `issue 75 <https://github.com/zopefoundation/ZEO/issues/75>`_. unsupported Python version. See `issue 75
<https://github.com/zopefoundation/ZEO/issues/75>`_.
5.0.1 (2016-09-06) 5.0.1 (2016-09-06)
------------------ ------------------
......
...@@ -547,7 +547,7 @@ class Client(object): ...@@ -547,7 +547,7 @@ class Client(object):
def get_peername(self): def get_peername(self):
return self.protocol.get_peername() return self.protocol.get_peername()
def call_async_threadsafe(self, future, method, args): def call_async_threadsafe(self, future, _, method, args):
if self.ready: if self.ready:
self.protocol.call_async(method, args) self.protocol.call_async(method, args)
future.set_result(None) future.set_result(None)
...@@ -557,7 +557,7 @@ class Client(object): ...@@ -557,7 +557,7 @@ class Client(object):
def call_async_from_same_thread(self, method, *args): def call_async_from_same_thread(self, method, *args):
return self.protocol.call_async(method, args) return self.protocol.call_async(method, args)
def call_async_iter_threadsafe(self, future, it): def call_async_iter_threadsafe(self, future, _, it):
if self.ready: if self.ready:
self.protocol.call_async_iter(it) self.protocol.call_async_iter(it)
future.set_result(None) future.set_result(None)
...@@ -581,16 +581,19 @@ class Client(object): ...@@ -581,16 +581,19 @@ class Client(object):
else: else:
self._when_ready(func, result_future, *args) self._when_ready(func, result_future, *args)
def call_threadsafe(self, future, method, args): def call_threadsafe(self, future, wait_ready, method, args):
if self.ready: if self.ready:
self.protocol.call(future, method, args) self.protocol.call(future, method, args)
elif wait_ready:
self._when_ready(
self.call_threadsafe, future, wait_ready, method, args)
else: else:
self._when_ready(self.call_threadsafe, future, method, args) future.set_exception(ClientDisconnected())
# Special methods because they update the cache. # Special methods because they update the cache.
@future_generator @future_generator
def load_before_threadsafe(self, future, oid, tid): def load_before_threadsafe(self, future, wait_ready, oid, tid):
data = self.cache.loadBefore(oid, tid) data = self.cache.loadBefore(oid, tid)
if data is not None: if data is not None:
future.set_result(data) future.set_result(data)
...@@ -604,8 +607,11 @@ class Client(object): ...@@ -604,8 +607,11 @@ class Client(object):
if data: if data:
data, start, end = data data, start, end = data
self.cache.store(oid, start, end, data) self.cache.store(oid, start, end, data)
elif wait_ready:
self._when_ready(
self.load_before_threadsafe, future, wait_ready, oid, tid)
else: else:
self._when_ready(self.load_before_threadsafe, future, oid, tid) future.set_exception(ClientDisconnected())
@future_generator @future_generator
def _prefetch(self, oid, tid): def _prefetch(self, oid, tid):
...@@ -617,7 +623,7 @@ class Client(object): ...@@ -617,7 +623,7 @@ class Client(object):
except Exception: except Exception:
logger.exception("prefetch %r %r" % (oid, tid)) logger.exception("prefetch %r %r" % (oid, tid))
def prefetch(self, future, oids, tid): def prefetch(self, future, _, oids, tid):
if self.ready: if self.ready:
for oid in oids: for oid in oids:
if self.cache.loadBefore(oid, tid) is None: if self.cache.loadBefore(oid, tid) is None:
...@@ -628,7 +634,7 @@ class Client(object): ...@@ -628,7 +634,7 @@ class Client(object):
future.set_exception(ClientDisconnected()) future.set_exception(ClientDisconnected())
@future_generator @future_generator
def tpc_finish_threadsafe(self, future, tid, updates, f): def tpc_finish_threadsafe(self, future, _, tid, updates, f):
if self.ready: if self.ready:
try: try:
tid = yield self.protocol.fut('tpc_finish', tid) tid = yield self.protocol.fut('tpc_finish', tid)
...@@ -652,7 +658,7 @@ class Client(object): ...@@ -652,7 +658,7 @@ class Client(object):
else: else:
future.set_exception(ClientDisconnected()) future.set_exception(ClientDisconnected())
def close_threadsafe(self, future): def close_threadsafe(self, future, _):
self.close() self.close()
future.set_result(None) future.set_result(None)
...@@ -720,15 +726,30 @@ class ClientRunner(object): ...@@ -720,15 +726,30 @@ class ClientRunner(object):
def call(meth, *args, **kw): def call(meth, *args, **kw):
timeout = kw.pop('timeout', None) timeout = kw.pop('timeout', None)
assert not kw assert not kw
# Some explanation of the code below.
# Timeouts on Python 2 are expensive, so we try to avoid
# them if we're connected. The 3rd argument below is a
# wait flag. If false, and we're disconnected, we fail
# immediately. If that happens, then we try again with the
# wait flag set to True and wait with the default timeout.
result = Future() result = Future()
call_soon_threadsafe(meth, result, *args) call_soon_threadsafe(meth, result, timeout is not None, *args)
try:
return self.wait_for_result(result, timeout) return self.wait_for_result(result, timeout)
except ClientDisconnected:
if timeout is None:
result = Future()
call_soon_threadsafe(meth, result, True, *args)
return self.wait_for_result(result, self.timeout)
else:
raise
self.__call = call self.__call = call
def wait_for_result(self, future, timeout): def wait_for_result(self, future, timeout):
try: try:
return future.result(self.timeout if timeout is None else timeout) return future.result(timeout)
except concurrent.futures.TimeoutError: except concurrent.futures.TimeoutError:
if not self.client.ready: if not self.client.ready:
raise ClientDisconnected("timed out waiting for connection") raise ClientDisconnected("timed out waiting for connection")
...@@ -742,7 +763,7 @@ class ClientRunner(object): ...@@ -742,7 +763,7 @@ class ClientRunner(object):
# for tests # for tests
result = concurrent.futures.Future() result = concurrent.futures.Future()
self.loop.call_soon_threadsafe( self.loop.call_soon_threadsafe(
self.call_threadsafe, result, method, args) self.call_threadsafe, result, True, method, args)
return result return result
def async(self, method, *args): def async(self, method, *args):
...@@ -783,7 +804,7 @@ class ClientRunner(object): ...@@ -783,7 +804,7 @@ class ClientRunner(object):
self.__call = call_closed self.__call = call_closed
def apply_threadsafe(self, future, func, *args): def apply_threadsafe(self, future, _, func, *args):
try: try:
future.set_result(func(*args)) future.set_result(func(*args))
except Exception as exc: except Exception as exc:
......
...@@ -113,6 +113,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -113,6 +113,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
sized(self.encode(message_id, False, '.reply', result))) sized(self.encode(message_id, False, '.reply', result)))
def wait_for_result(self, future, timeout): def wait_for_result(self, future, timeout):
if future.done() and future.exception() is not None:
raise future.exception()
return future return future
def testClientBasics(self): def testClientBasics(self):
...@@ -145,8 +147,7 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -145,8 +147,7 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
# connecting, we get an error. This is because some dufus # connecting, we get an error. This is because some dufus
# decided to create a client storage without waiting for it to # decided to create a client storage without waiting for it to
# connect. # connect.
f1 = self.call('foo', 1, 2) self.assertRaises(ClientDisconnected, self.call, 'foo', 1, 2)
self.assertTrue(isinstance(f1.exception(), ClientDisconnected))
# When the client is reconnecting, it's ready flag is set to False and # When the client is reconnecting, it's ready flag is set to False and
# it queues calls: # it queues calls:
...@@ -155,8 +156,7 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -155,8 +156,7 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
self.assertFalse(f1.done()) self.assertFalse(f1.done())
# If we try to make an async call, we get an immediate error: # If we try to make an async call, we get an immediate error:
f2 = self.async('bar', 3, 4) self.assertRaises(ClientDisconnected, self.async, 'bar', 3, 4)
self.assert_(isinstance(f2.exception(), ClientDisconnected))
# The wrapper object (ClientStorage) hasn't been notified: # The wrapper object (ClientStorage) hasn't been notified:
self.assertFalse(wrapper.notify_connected.called) self.assertFalse(wrapper.notify_connected.called)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment