Commit 629b0667 authored by Jim Fulton's avatar Jim Fulton

Removed the "sync" mode for ClientStorage. Previously, a

ClientStorage could be in either "sync" mode or "async" mode.  Now
there is just "async" mode.  There is now a dedicicated asyncore main
loop dedicated to ZEO clients.

This addresses a test failure on Mac OS X,
http://www.zope.org/Collectors/Zope3-dev/650, that I believe was due
to a bug in sync mode. Some asyncore-based code was being called from
multiple threads that didn't expect to be.

Converting to always-async mode revealed some bugs that weren't caught
before because the tests ran in sync mode.  These problems could
explain some problems we've seen at times with clients taking a long
time to reconnect after a disconnect.

Added a partial heart beat to try to detect lost connections that
aren't otherwise caught,
http://mail.zope.org/pipermail/zodb-dev/2005-June/008951.html, by
perioidically writing to all connections during periods of inactivity.
parent bf49338b
What's new in ZODB3 3.7a1? What's new on ZODB 3.7b2?
========================== =========================
Release date: DD-MMM-200Y
ClientStorage
-------------
- (3.7b2) Removed the "sync" mode for ClientStorage.
Previously, a ClientStorage could be in either "sync" mode or "async"
mode. Now there is just "async" mode. There is now a dedicicated
asyncore main loop dedicated to ZEO clients.
Applications no-longer need to run an asyncore main loop to cause
client storages to run in async mode. Even if an application runs an
asyncore main loop, it is independent of the loop used by client
storages.
Following is combined news from internal releases (to support ongoing This addresses a test failure on Mac OS X,
Zope development). These are the dates of the internal releases: http://www.zope.org/Collectors/Zope3-dev/650, that I believe was due
to a bug in sync mode. Some asyncore-based code was being called from
multiple threads that didn't expect to be.
- 3.7a1 DD-MMM-200Y Converting to always-async mode revealed some bugs that weren't caught
before because the tests ran in sync mode. These problems could
explain some problems we've seen at times with clients taking a long
time to reconnect after a disconnect.
Added a partial heart beat to try to detect lost connections that
aren't otherwise caught,
http://mail.zope.org/pipermail/zodb-dev/2005-June/008951.html, by
perioidically writing to all connections during periods of inactivity.
Connection management Connection management
--------------------- ---------------------
...@@ -34,6 +56,14 @@ IPersistent ...@@ -34,6 +56,14 @@ IPersistent
- (3.7a1) The documentation for ``_p_oid`` now specifies the concrete - (3.7a1) The documentation for ``_p_oid`` now specifies the concrete
type of oids (in short, an oid is either None or a non-empty string). type of oids (in short, an oid is either None or a non-empty string).
Testing
-------
- (3.7b2) Fixed test-runner output truncation.
A bug was fixed in the test runner that caused result summaries to be
omitted when running on Windows.
Tools Tools
----- -----
......
...@@ -339,43 +339,16 @@ class ClientStorage(object): ...@@ -339,43 +339,16 @@ class ClientStorage(object):
# still be going on. This code must wait until validation # still be going on. This code must wait until validation
# finishes, but if the connection isn't a zrpc async # finishes, but if the connection isn't a zrpc async
# connection it also needs to poll for input. # connection it also needs to poll for input.
if self._connection.is_async(): assert self._connection.is_async()
while 1: while 1:
self._ready.wait(30) self._ready.wait(30)
if self._ready.isSet(): if self._ready.isSet():
break break
if timeout and time.time() > deadline: if timeout and time.time() > deadline:
log2("Timed out waiting for connection", log2("Timed out waiting for connection",
level=logging.WARNING) level=logging.WARNING)
break
log2("Waiting for cache verification to finish")
else:
self._wait_sync(deadline)
def _wait_sync(self, deadline=None):
# Log no more than one "waiting" message per LOG_THROTTLE seconds.
LOG_THROTTLE = 300 # 5 minutes
next_log_time = time.time()
while not self._ready.isSet():
now = time.time()
if deadline and now > deadline:
log2("Timed out waiting for connection", level=logging.WARNING)
break break
if now >= next_log_time: log2("Waiting for cache verification to finish")
log2("Waiting for cache verification to finish")
next_log_time = now + LOG_THROTTLE
if self._connection is None:
# If the connection was closed while we were
# waiting for it to become ready, start over.
if deadline is None:
timeout = None
else:
timeout = deadline - now
return self._wait(timeout)
# No mainloop ia running, so we need to call something fancy to
# handle asyncore events.
self._connection.pending(30)
def close(self): def close(self):
"""Storage API: finalize the storage, releasing external resources.""" """Storage API: finalize the storage, releasing external resources."""
...@@ -403,17 +376,8 @@ class ClientStorage(object): ...@@ -403,17 +376,8 @@ class ClientStorage(object):
return self._ready.isSet() return self._ready.isSet()
def sync(self): def sync(self):
"""Handle any pending invalidation messages. # The separate async thread should keep us up to date
pass
This is called by the sync method in ZODB.Connection.
"""
# If there is no connection, return immediately. Technically,
# there are no pending invalidations so they are all handled.
# There doesn't seem to be much benefit to raising an exception.
cn = self._connection
if cn is not None:
cn.pending()
def doAuth(self, protocol, stub): def doAuth(self, protocol, stub):
if not (self._username and self._password): if not (self._username and self._password):
...@@ -517,11 +481,17 @@ class ClientStorage(object): ...@@ -517,11 +481,17 @@ class ClientStorage(object):
stub = self.StorageServerStubClass(conn) stub = self.StorageServerStubClass(conn)
self._oids = [] self._oids = []
self._info.update(stub.get_info())
self.verify_cache(stub) self.verify_cache(stub)
if not conn.is_async():
log2("Waiting for cache verification to finish") # It's important to call get_info after calling verify_cache.
self._wait_sync() # If we end up doing a full-verification, we need to wait till
# it's done. By doing a synchonous call, we are guarenteed
# that the verification will be done because operations are
# handled in order.
self._info.update(stub.get_info())
assert conn.is_async()
self._handle_extensions() self._handle_extensions()
def _handle_extensions(self): def _handle_extensions(self):
......
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
############################################################################## ##############################################################################
"""RPC stubs for interface exported by StorageServer.""" """RPC stubs for interface exported by StorageServer."""
import time
## ##
# ZEO storage server. # ZEO storage server.
# <p> # <p>
...@@ -44,9 +46,11 @@ class StorageServer: ...@@ -44,9 +46,11 @@ class StorageServer:
zrpc.connection.Connection class. zrpc.connection.Connection class.
""" """
self.rpc = rpc self.rpc = rpc
# Wait until we know what version the other side is using. # Wait until we know what version the other side is using.
while rpc.peer_protocol_version is None: while rpc.peer_protocol_version is None:
rpc.pending() time.sleep(0.1)
if rpc.peer_protocol_version == 'Z200': if rpc.peer_protocol_version == 'Z200':
self.lastTransaction = lambda: None self.lastTransaction = lambda: None
self.getInvalidations = lambda tid: None self.getInvalidations = lambda tid: None
......
...@@ -35,10 +35,9 @@ class WorkerThread(TestThread): ...@@ -35,10 +35,9 @@ class WorkerThread(TestThread):
# run the entire test in a thread so that the blocking call for # run the entire test in a thread so that the blocking call for
# tpc_vote() doesn't hang the test suite. # tpc_vote() doesn't hang the test suite.
def __init__(self, storage, trans, method="tpc_finish"): def __init__(self, storage, trans):
self.storage = storage self.storage = storage
self.trans = trans self.trans = trans
self.method = method
self.ready = threading.Event() self.ready = threading.Event()
TestThread.__init__(self) TestThread.__init__(self)
...@@ -52,10 +51,7 @@ class WorkerThread(TestThread): ...@@ -52,10 +51,7 @@ class WorkerThread(TestThread):
p = zodb_pickle(MinPO("c")) p = zodb_pickle(MinPO("c"))
self.storage.store(oid, ZERO, p, '', self.trans) self.storage.store(oid, ZERO, p, '', self.trans)
self.myvote() self.myvote()
if self.method == "tpc_finish": self.storage.tpc_finish(self.trans)
self.storage.tpc_finish(self.trans)
else:
self.storage.tpc_abort(self.trans)
except ClientDisconnected: except ClientDisconnected:
pass pass
...@@ -120,7 +116,7 @@ class CommitLockTests: ...@@ -120,7 +116,7 @@ class CommitLockTests:
t.start() t.start()
t.ready.wait() t.ready.wait()
# Close on the connections abnormally to test server response # Close one of the connections abnormally to test server response
if i == 0: if i == 0:
storage.close() storage.close()
else: else:
...@@ -237,7 +233,6 @@ class CommitLockUndoTests(CommitLockTests): ...@@ -237,7 +233,6 @@ class CommitLockUndoTests(CommitLockTests):
trans_id = self._get_trans_id() trans_id = self._get_trans_id()
oid, txn = self._start_txn() oid, txn = self._start_txn()
msgid = self._begin_undo(trans_id, txn) msgid = self._begin_undo(trans_id, txn)
self._begin_threads() self._begin_threads()
self._finish_undo(msgid) self._finish_undo(msgid)
......
...@@ -55,6 +55,12 @@ class TestClientStorage(ClientStorage): ...@@ -55,6 +55,12 @@ class TestClientStorage(ClientStorage):
StorageServerStubClass = TestServerStub StorageServerStubClass = TestServerStub
connection_count_for_tests = 0
def notifyConnected(self, conn):
ClientStorage.notifyConnected(self, conn)
self.connection_count_for_tests += 1
def verify_cache(self, stub): def verify_cache(self, stub):
self.end_verify = threading.Event() self.end_verify = threading.Event()
self.verify_result = ClientStorage.verify_cache(self, stub) self.verify_result = ClientStorage.verify_cache(self, stub)
...@@ -959,40 +965,39 @@ class TimeoutTests(CommonSetupTearDown): ...@@ -959,40 +965,39 @@ class TimeoutTests(CommonSetupTearDown):
storage.close() storage.close()
def checkTimeoutAfterVote(self): def checkTimeoutAfterVote(self):
raises = self.assertRaises
unless = self.failUnless
self._storage = storage = self.openClientStorage() self._storage = storage = self.openClientStorage()
# Assert that the zeo cache is empty # Assert that the zeo cache is empty
unless(not list(storage._cache.contents())) self.assert_(not list(storage._cache.contents()))
# Create the object # Create the object
oid = storage.new_oid() oid = storage.new_oid()
obj = MinPO(7) obj = MinPO(7)
# Now do a store, sleeping before the finish so as to cause a timeout # Now do a store, sleeping before the finish so as to cause a timeout
t = Transaction() t = Transaction()
old_connection_count = storage.connection_count_for_tests
storage.tpc_begin(t) storage.tpc_begin(t)
revid1 = storage.store(oid, ZERO, zodb_pickle(obj), '', t) revid1 = storage.store(oid, ZERO, zodb_pickle(obj), '', t)
storage.tpc_vote(t) storage.tpc_vote(t)
# Now sleep long enough for the storage to time out # Now sleep long enough for the storage to time out
time.sleep(3) time.sleep(3)
storage.sync() self.assert_(
unless(not storage.is_connected()) (not storage.is_connected())
or
(storage.connection_count_for_tests > old_connection_count)
)
storage._wait() storage._wait()
unless(storage.is_connected()) self.assert_(storage.is_connected())
# We expect finish to fail # We expect finish to fail
raises(ClientDisconnected, storage.tpc_finish, t) self.assertRaises(ClientDisconnected, storage.tpc_finish, t)
# The cache should still be empty # The cache should still be empty
unless(not list(storage._cache.contents())) self.assert_(not list(storage._cache.contents()))
# Load should fail since the object should not be in either the cache # Load should fail since the object should not be in either the cache
# or the server. # or the server.
raises(KeyError, storage.load, oid, '') self.assertRaises(KeyError, storage.load, oid, '')
def checkTimeoutProvokingConflicts(self): def checkTimeoutProvokingConflicts(self):
eq = self.assertEqual
raises = self.assertRaises
require = self.assert_
self._storage = storage = self.openClientStorage() self._storage = storage = self.openClientStorage()
# Assert that the zeo cache is empty. # Assert that the zeo cache is empty.
require(not list(storage._cache.contents())) self.assert_(not list(storage._cache.contents()))
# Create the object # Create the object
oid = storage.new_oid() oid = storage.new_oid()
obj = MinPO(7) obj = MinPO(7)
...@@ -1007,6 +1012,7 @@ class TimeoutTests(CommonSetupTearDown): ...@@ -1007,6 +1012,7 @@ class TimeoutTests(CommonSetupTearDown):
# Now do a store, sleeping before the finish so as to cause a timeout. # Now do a store, sleeping before the finish so as to cause a timeout.
obj.value = 8 obj.value = 8
t = Transaction() t = Transaction()
old_connection_count = storage.connection_count_for_tests
storage.tpc_begin(t) storage.tpc_begin(t)
revid2a = storage.store(oid, revid1, zodb_pickle(obj), '', t) revid2a = storage.store(oid, revid1, zodb_pickle(obj), '', t)
revid2b = storage.tpc_vote(t) revid2b = storage.tpc_vote(t)
...@@ -1020,17 +1026,21 @@ class TimeoutTests(CommonSetupTearDown): ...@@ -1020,17 +1026,21 @@ class TimeoutTests(CommonSetupTearDown):
# of 3). # of 3).
deadline = time.time() + 60 # wait up to a minute deadline = time.time() + 60 # wait up to a minute
while time.time() < deadline: while time.time() < deadline:
if storage.is_connected(): if (storage.is_connected() and
(storage.connection_count_for_tests == old_connection_count)
):
time.sleep(self.timeout / 1.8) time.sleep(self.timeout / 1.8)
storage.sync()
else: else:
break break
storage.sync() self.assert_(
require(not storage.is_connected()) (not storage.is_connected())
or
(storage.connection_count_for_tests > old_connection_count)
)
storage._wait() storage._wait()
require(storage.is_connected()) self.assert_(storage.is_connected())
# We expect finish to fail. # We expect finish to fail.
raises(ClientDisconnected, storage.tpc_finish, t) self.assertRaises(ClientDisconnected, storage.tpc_finish, t)
# Now we think we've committed the second transaction, but we really # Now we think we've committed the second transaction, but we really
# haven't. A third one should produce a POSKeyError on the server, # haven't. A third one should produce a POSKeyError on the server,
# which manifests as a ConflictError on the client. # which manifests as a ConflictError on the client.
...@@ -1038,7 +1048,7 @@ class TimeoutTests(CommonSetupTearDown): ...@@ -1038,7 +1048,7 @@ class TimeoutTests(CommonSetupTearDown):
t = Transaction() t = Transaction()
storage.tpc_begin(t) storage.tpc_begin(t)
storage.store(oid, revid2, zodb_pickle(obj), '', t) storage.store(oid, revid2, zodb_pickle(obj), '', t)
raises(ConflictError, storage.tpc_vote, t) self.assertRaises(ConflictError, storage.tpc_vote, t)
# Even aborting won't help. # Even aborting won't help.
storage.tpc_abort(t) storage.tpc_abort(t)
storage.tpc_finish(t) storage.tpc_finish(t)
...@@ -1048,7 +1058,7 @@ class TimeoutTests(CommonSetupTearDown): ...@@ -1048,7 +1058,7 @@ class TimeoutTests(CommonSetupTearDown):
storage.tpc_begin(t) storage.tpc_begin(t)
storage.store(oid, revid2, zodb_pickle(obj), '', t) storage.store(oid, revid2, zodb_pickle(obj), '', t)
# Even aborting won't help. # Even aborting won't help.
raises(ConflictError, storage.tpc_vote, t) self.assertRaises(ConflictError, storage.tpc_vote, t)
# Abort this one and try a transaction that should succeed. # Abort this one and try a transaction that should succeed.
storage.tpc_abort(t) storage.tpc_abort(t)
storage.tpc_finish(t) storage.tpc_finish(t)
...@@ -1062,8 +1072,8 @@ class TimeoutTests(CommonSetupTearDown): ...@@ -1062,8 +1072,8 @@ class TimeoutTests(CommonSetupTearDown):
storage.tpc_finish(t) storage.tpc_finish(t)
# Now load the object and verify that it has a value of 11. # Now load the object and verify that it has a value of 11.
data, revid = storage.load(oid, '') data, revid = storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(11)) self.assertEqual(zodb_unpickle(data), MinPO(11))
eq(revid, revid2) self.assertEqual(revid, revid2)
class MSTThread(threading.Thread): class MSTThread(threading.Thread):
......
...@@ -95,10 +95,11 @@ class AuthTest(CommonSetupTearDown): ...@@ -95,10 +95,11 @@ class AuthTest(CommonSetupTearDown):
def testUnauthenticatedMessage(self): def testUnauthenticatedMessage(self):
# Test that an unauthenticated message is rejected by the server # Test that an unauthenticated message is rejected by the server
# if it was sent after the connection was authenticated. # if it was sent after the connection was authenticated.
# Sleep for 0.2 seconds to give the server some time to start up
# seems to be needed before and after creating the storage
self._storage = self.openClientStorage(wait=0, username="foo", self._storage = self.openClientStorage(wait=0, username="foo",
password="bar", realm=self.realm) password="bar", realm=self.realm)
# Sleep for 0.2 seconds to give the server some time to start up
# seems to be needed before and after creating the storage
self.wait() self.wait()
self._storage.versions() self._storage.versions()
# Manually clear the state of the hmac connection # Manually clear the state of the hmac connection
......
...@@ -14,13 +14,15 @@ ...@@ -14,13 +14,15 @@
"""Test suite for ZEO based on ZODB.tests.""" """Test suite for ZEO based on ZODB.tests."""
# System imports # System imports
import asyncore
import logging
import os import os
import random import random
import signal
import socket import socket
import asyncore
import tempfile import tempfile
import time
import unittest import unittest
import logging
# ZODB test support # ZODB test support
import ZODB import ZODB
...@@ -36,8 +38,13 @@ from ZODB.tests import StorageTestBase, BasicStorage, VersionStorage, \ ...@@ -36,8 +38,13 @@ from ZODB.tests import StorageTestBase, BasicStorage, VersionStorage, \
from ZODB.tests.testDemoStorage import DemoStorageWrappedBase from ZODB.tests.testDemoStorage import DemoStorageWrappedBase
from ZEO.ClientStorage import ClientStorage from ZEO.ClientStorage import ClientStorage
import ZEO.zrpc.connection
from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests
import ZEO.tests.ConnectionTests
logger = logging.getLogger('ZEO.tests.testZEO') logger = logging.getLogger('ZEO.tests.testZEO')
class DummyDB: class DummyDB:
...@@ -70,14 +77,19 @@ class MiscZEOTests: ...@@ -70,14 +77,19 @@ class MiscZEOTests:
self.assertEqual(zodb_unpickle(data), MinPO('first')) self.assertEqual(zodb_unpickle(data), MinPO('first'))
self.assertEqual(serial, revid1) self.assertEqual(serial, revid1)
revid2 = self._dostore(oid, data=MinPO('second'), revid=revid1) revid2 = self._dostore(oid, data=MinPO('second'), revid=revid1)
for n in range(3):
# Let the server and client talk for a moment. # Now, storage 2 should eventually get the new data. It
# Is there a better way to do this? # will take some time, although hopefully not much.
asyncore.poll(0.1) # We'll poll till we get it and whine if we time out:
data, serial = storage2.load(oid, '') for n in range(30):
self.assertEqual(zodb_unpickle(data), MinPO('second'), time.sleep(.1)
'Invalidation message was not sent!') data, serial = storage2.load(oid, '')
self.assertEqual(serial, revid2) if (serial == revid2 and
zodb_unpickle(data) == MinPO('second')
):
break
else:
raise AssertionError('Invalidation message was not sent!')
finally: finally:
storage2.close() storage2.close()
...@@ -198,6 +210,67 @@ class MappingStorageTests(GenericTests): ...@@ -198,6 +210,67 @@ class MappingStorageTests(GenericTests):
def getConfig(self): def getConfig(self):
return """<mappingstorage 1/>""" return """<mappingstorage 1/>"""
class HeartbeatTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
"""Make sure a heartbeat is being sent and that it does no harm
This is really hard to test properly because we can't see the data
flow between the client and server and we can't really tell what's
going on in the server very well. :(
"""
def setUp(self):
# Crank down the select frequency
self.__old_client_timeout = ZEO.zrpc.connection.client_timeout
ZEO.zrpc.connection.client_timeout = 0.1
ZEO.zrpc.connection.client_trigger.pull_trigger()
ZEO.tests.ConnectionTests.CommonSetupTearDown.setUp(self)
def tearDown(self):
ZEO.zrpc.connection.client_timeout = self.__old_client_timeout
ZEO.zrpc.connection.client_trigger.pull_trigger()
ZEO.tests.ConnectionTests.CommonSetupTearDown.tearDown(self)
def getConfig(self, path, create, read_only):
return """<mappingstorage 1/>"""
def checkHeartbeatWithServerClose(self):
# This is a minimal test that mainly tests that the heartbeat
# function does no harm.
client_timeout_count = ZEO.zrpc.connection.client_timeout_count
self._storage = self.openClientStorage()
time.sleep(1) # allow some time for the select loop to fire a few times
self.assert_(ZEO.zrpc.connection.client_timeout_count
> client_timeout_count)
self._dostore()
if hasattr(os, 'kill'):
# Kill server violently, in hopes of provoking problem
os.kill(self._pids[0], signal.SIGKILL)
self._servers[0] = None
else:
self.shutdownServer()
for i in range(91):
# wait for disconnection
if not self._storage.is_connected():
break
time.sleep(0.1)
else:
raise AssertionError("Didn't detect server shutdown in 5 seconds")
def checkHeartbeatWithClientClose(self):
# This is a minimal test that mainly tests that the heartbeat
# function does no harm.
client_timeout_count = ZEO.zrpc.connection.client_timeout_count
self._storage = self.openClientStorage()
self._storage.close()
time.sleep(1) # allow some time for the select loop to fire a few times
self.assert_(ZEO.zrpc.connection.client_timeout_count
> client_timeout_count)
class DemoStorageWrappedAroundClientStorage(DemoStorageWrappedBase): class DemoStorageWrappedAroundClientStorage(DemoStorageWrappedBase):
def getConfig(self): def getConfig(self):
...@@ -233,6 +306,7 @@ test_classes = [OneTimeTests, ...@@ -233,6 +306,7 @@ test_classes = [OneTimeTests,
FileStorageTests, FileStorageTests,
MappingStorageTests, MappingStorageTests,
DemoStorageWrappedAroundClientStorage, DemoStorageWrappedAroundClientStorage,
HeartbeatTests,
] ]
def test_suite(): def test_suite():
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
# FOR A PARTICULAR PURPOSE # FOR A PARTICULAR PURPOSE
# #
############################################################################## ##############################################################################
import asyncore
import errno import errno
import select import select
import socket import socket
...@@ -20,13 +21,11 @@ import time ...@@ -20,13 +21,11 @@ import time
import types import types
import logging import logging
import ThreadedAsync
from ZODB.POSException import ReadOnlyError from ZODB.POSException import ReadOnlyError
from ZODB.loglevels import BLATHER from ZODB.loglevels import BLATHER
from ZEO.zrpc.log import log from ZEO.zrpc.log import log
from ZEO.zrpc.trigger import trigger import ZEO.zrpc.trigger
from ZEO.zrpc.connection import ManagedClientConnection from ZEO.zrpc.connection import ManagedClientConnection
class ConnectionManager(object): class ConnectionManager(object):
...@@ -43,9 +42,6 @@ class ConnectionManager(object): ...@@ -43,9 +42,6 @@ class ConnectionManager(object):
# If thread is not None, then there is a helper thread # If thread is not None, then there is a helper thread
# attempting to connect. # attempting to connect.
self.thread = None # Protected by self.cond self.thread = None # Protected by self.cond
self.trigger = None
self.thr_async = 0
ThreadedAsync.register_loop_callback(self.set_async)
def __repr__(self): def __repr__(self):
return "<%s for %s>" % (self.__class__.__name__, self.addrlist) return "<%s for %s>" % (self.__class__.__name__, self.addrlist)
...@@ -85,7 +81,6 @@ class ConnectionManager(object): ...@@ -85,7 +81,6 @@ class ConnectionManager(object):
def close(self): def close(self):
"""Prevent ConnectionManager from opening new connections""" """Prevent ConnectionManager from opening new connections"""
self.closed = 1 self.closed = 1
ThreadedAsync.remove_loop_callback(self.set_async)
self.cond.acquire() self.cond.acquire()
try: try:
t = self.thread t = self.thread
...@@ -103,29 +98,6 @@ class ConnectionManager(object): ...@@ -103,29 +98,6 @@ class ConnectionManager(object):
if conn is not None: if conn is not None:
# This will call close_conn() below which clears self.connection # This will call close_conn() below which clears self.connection
conn.close() conn.close()
if self.trigger is not None:
self.trigger.close()
self.trigger = None
ThreadedAsync.remove_loop_callback(self.set_async)
def set_async(self, map):
# This is the callback registered with ThreadedAsync. The
# callback might be called multiple times, so it shouldn't
# create a trigger every time and should never do anything
# after it's closed.
# It may be that the only case where it is called multiple
# times is in the test suite, where ThreadedAsync's loop can
# be started in a child process after a fork. Regardless,
# it's good to be defensive.
# We need each connection started with async==0 to have a
# callback.
log("CM.set_async(%s)" % repr(map), level=logging.DEBUG)
if not self.closed and self.trigger is None:
log("CM.set_async(): first call")
self.trigger = trigger()
self.thr_async = 1 # needs to be set on the Connection
def attempt_connect(self): def attempt_connect(self):
"""Attempt a connection to the server without blocking too long. """Attempt a connection to the server without blocking too long.
......
...@@ -19,6 +19,8 @@ import threading ...@@ -19,6 +19,8 @@ import threading
import types import types
import logging import logging
import traceback, time
import ThreadedAsync import ThreadedAsync
from ZEO.zrpc import smac from ZEO.zrpc import smac
from ZEO.zrpc.error import ZRPCError, DisconnectedError from ZEO.zrpc.error import ZRPCError, DisconnectedError
...@@ -30,6 +32,89 @@ from ZODB.loglevels import BLATHER, TRACE ...@@ -30,6 +32,89 @@ from ZODB.loglevels import BLATHER, TRACE
REPLY = ".reply" # message name used for replies REPLY = ".reply" # message name used for replies
ASYNC = 1 ASYNC = 1
##############################################################################
# Dedicated Client select loop:
client_map = {}
client_trigger = trigger(client_map)
client_timeout = 30.0
client_timeout_count = 0 # for testing
def client_loop():
map = client_map
logger = logging.getLogger('ZEO.zrpc.client_loop')
logger.addHandler(logging.StreamHandler())
read = asyncore.read
write = asyncore.write
_exception = asyncore._exception
while map:
try:
r = e = list(client_map)
w = [fd for (fd, obj) in map.iteritems() if obj.writable()]
try:
r, w, e = select.select(r, w, e, client_timeout)
except select.error, err:
if err[0] != errno.EINTR:
if err[0] == errno.EBADF:
# If a connection is closed while we are
# calling select on it, we can get a bad
# file-descriptor error. We'll check for this
# case by looking for entries in r and w that
# are not in the socket map.
if [fd for fd in r if fd not in client_map]:
continue
if [fd for fd in w if fd not in client_map]:
continue
raise
else:
continue
if not (r or w or e):
for obj in client_map.itervalues():
if isinstance(obj, Connection):
# Send a heartbeat message as a reply to a
# non-existent message id.
try:
obj.send_reply(-1, None)
except DisconnectedError:
pass
global client_timeout_count
client_timeout_count += 1
continue
for fd in r:
obj = map.get(fd)
if obj is None:
continue
read(obj)
for fd in w:
obj = map.get(fd)
if obj is None:
continue
write(obj)
for fd in e:
obj = map.get(fd)
if obj is None:
continue
_exception(obj)
except:
logger.exception('poll failure')
raise
client_thread = threading.Thread(target=client_loop)
client_thread.setDaemon(True)
client_thread.start()
#
##############################################################################
class Delay: class Delay:
"""Used to delay response to client for synchronous calls. """Used to delay response to client for synchronous calls.
...@@ -235,7 +320,7 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -235,7 +320,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
# Client constructor passes 'C' for tag, server constructor 'S'. This # Client constructor passes 'C' for tag, server constructor 'S'. This
# is used in log messages, and to determine whether we can speak with # is used in log messages, and to determine whether we can speak with
# our peer. # our peer.
def __init__(self, sock, addr, obj, tag): def __init__(self, sock, addr, obj, tag, map=None):
self.obj = None self.obj = None
self.marshal = Marshaller() self.marshal = Marshaller()
self.closed = False self.closed = False
...@@ -315,8 +400,10 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -315,8 +400,10 @@ class Connection(smac.SizedMessageAsyncConnection, object):
# isn't necessary before Python 2.4, but doesn't hurt then (it just # isn't necessary before Python 2.4, but doesn't hurt then (it just
# gives us an unused attribute in 2.3); updating the global socket # gives us an unused attribute in 2.3); updating the global socket
# map is necessary regardless of Python version. # map is necessary regardless of Python version.
self._map = asyncore.socket_map if map is None:
asyncore.socket_map.update(ourmap) map = asyncore.socket_map
self._map = map
map.update(ourmap)
def __repr__(self): def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.addr) return "<%s %s>" % (self.__class__.__name__, self.addr)
...@@ -331,12 +418,13 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -331,12 +418,13 @@ class Connection(smac.SizedMessageAsyncConnection, object):
return return
self._singleton.clear() self._singleton.clear()
self.closed = True self.closed = True
self.close_trigger()
self.__super_close() self.__super_close()
self.close_trigger()
def close_trigger(self): def close_trigger(self):
# Overridden by ManagedClientConnection. # Overridden by ManagedClientConnection.
if self.trigger is not None: if self.trigger is not None:
self.trigger.pull_trigger()
self.trigger.close() self.trigger.close()
def register_object(self, obj): def register_object(self, obj):
...@@ -538,16 +626,16 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -538,16 +626,16 @@ class Connection(smac.SizedMessageAsyncConnection, object):
return r_args return r_args
# For testing purposes, it is useful to begin a synchronous call # For testing purposes, it is useful to begin a synchronous call
# but not block waiting for its response. Since these methods are # but not block waiting for its response.
# used for testing they can assume they are not in async mode and
# call asyncore.poll() directly to get the message out without
# also waiting for the reply.
def _deferred_call(self, method, *args): def _deferred_call(self, method, *args):
if self.closed: if self.closed:
raise DisconnectedError() raise DisconnectedError()
msgid = self.send_call(method, args, 0) msgid = self.send_call(method, args, 0)
asyncore.poll(0.01, self._singleton) if self.is_async():
self.trigger.pull_trigger()
else:
asyncore.poll(0.01, self._singleton)
return msgid return msgid
def _deferred_wait(self, msgid): def _deferred_wait(self, msgid):
...@@ -663,7 +751,7 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -663,7 +751,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
else: else:
asyncore.poll(0.0, self._singleton) asyncore.poll(0.0, self._singleton)
def pending(self, timeout=0): def _pending(self, timeout=0):
"""Invoke mainloop until any pending messages are handled.""" """Invoke mainloop until any pending messages are handled."""
if __debug__: if __debug__:
self.log("pending(), async=%d" % self.is_async(), level=TRACE) self.log("pending(), async=%d" % self.is_async(), level=TRACE)
...@@ -758,8 +846,10 @@ class ManagedClientConnection(Connection): ...@@ -758,8 +846,10 @@ class ManagedClientConnection(Connection):
self.queue_output = True self.queue_output = True
self.queued_messages = [] self.queued_messages = []
self.__super_init(sock, addr, obj, tag='C') self.__super_init(sock, addr, obj, tag='C', map=client_map)
self.check_mgr_async() self.thr_async = True
self.trigger = client_trigger
client_trigger.pull_trigger()
# Our message_ouput() queues messages until recv_handshake() gets the # Our message_ouput() queues messages until recv_handshake() gets the
# protocol handshake from the server. # protocol handshake from the server.
...@@ -806,9 +896,12 @@ class ManagedClientConnection(Connection): ...@@ -806,9 +896,12 @@ class ManagedClientConnection(Connection):
# Defer the ThreadedAsync work to the manager. # Defer the ThreadedAsync work to the manager.
def close_trigger(self): def close_trigger(self):
# the manager should actually close the trigger # We are using a shared trigger for all client connections.
# TODO: what is that comment trying to say? What 'manager'? # We never want to close it.
del self.trigger
# We do want to pull it to make sure the select loop detects that
# we're closed.
self.trigger.pull_trigger()
def set_async(self, map): def set_async(self, map):
pass pass
...@@ -817,20 +910,8 @@ class ManagedClientConnection(Connection): ...@@ -817,20 +910,8 @@ class ManagedClientConnection(Connection):
# Don't do the register_loop_callback that the superclass does # Don't do the register_loop_callback that the superclass does
pass pass
def check_mgr_async(self):
if not self.thr_async and self.mgr.thr_async:
assert self.mgr.trigger is not None, \
"manager (%s) has no trigger" % self.mgr
self.thr_async = True
self.trigger = self.mgr.trigger
return 1
return 0
def is_async(self): def is_async(self):
# TODO: could the check_mgr_async() be avoided on each test? return True
if self.thr_async:
return 1
return self.check_mgr_async()
def close(self): def close(self):
self.mgr.close_conn(self) self.mgr.close_conn(self)
......
...@@ -135,10 +135,10 @@ if os.name == 'posix': ...@@ -135,10 +135,10 @@ if os.name == 'posix':
class trigger(_triggerbase, asyncore.file_dispatcher): class trigger(_triggerbase, asyncore.file_dispatcher):
kind = "pipe" kind = "pipe"
def __init__(self): def __init__(self, map=None):
_triggerbase.__init__(self) _triggerbase.__init__(self)
r, self.trigger = self._fds = os.pipe() r, self.trigger = self._fds = os.pipe()
asyncore.file_dispatcher.__init__(self, r) asyncore.file_dispatcher.__init__(self, r, map)
def _close(self): def _close(self):
for fd in self._fds: for fd in self._fds:
...@@ -155,7 +155,7 @@ else: ...@@ -155,7 +155,7 @@ else:
class trigger(_triggerbase, asyncore.dispatcher): class trigger(_triggerbase, asyncore.dispatcher):
kind = "loopback" kind = "loopback"
def __init__(self): def __init__(self, map=None):
_triggerbase.__init__(self) _triggerbase.__init__(self)
# Get a pair of connected sockets. The trigger is the 'w' # Get a pair of connected sockets. The trigger is the 'w'
...@@ -208,7 +208,7 @@ else: ...@@ -208,7 +208,7 @@ else:
r, addr = a.accept() # r becomes asyncore's (self.)socket r, addr = a.accept() # r becomes asyncore's (self.)socket
a.close() a.close()
self.trigger = w self.trigger = w
asyncore.dispatcher.__init__(self, r) asyncore.dispatcher.__init__(self, r, map)
def _close(self): def _close(self):
# self.socket is r, and self.trigger is w, from __init__ # self.socket is r, and self.trigger is w, from __init__
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment