Commit 44134205 authored by Jim Fulton's avatar Jim Fulton

Fixed the "potential ZODB cache inconsistency after client reconnect"

problem reported on zodb-dev:

http://mail.zope.org/pipermail/zodb-dev/2006-August/010343.html

Added a new invalidateCache protocol for DBs and Connections to
invalidate the entire in-memory caches.  This is used when ZEO clients
reconnect.
parent 084d9815
...@@ -460,6 +460,10 @@ class ClientStorage(object): ...@@ -460,6 +460,10 @@ class ClientStorage(object):
# this method before it was stopped. # this method before it was stopped.
return return
# invalidate our db cache
if self._db is not None:
self._db.invalidateCache()
# TODO: report whether we get a read-only connection. # TODO: report whether we get a read-only connection.
if self._connection is not None: if self._connection is not None:
reconnect = 1 reconnect = 1
......
...@@ -79,6 +79,9 @@ class DummyDB: ...@@ -79,6 +79,9 @@ class DummyDB:
def invalidate(self, *args, **kwargs): def invalidate(self, *args, **kwargs):
pass pass
def invalidateCache(self):
pass
class CommonSetupTearDown(StorageTestBase): class CommonSetupTearDown(StorageTestBase):
"""Common boilerplate""" """Common boilerplate"""
......
...@@ -306,6 +306,53 @@ class CatastrophicClientLoopFailure( ...@@ -306,6 +306,53 @@ class CatastrophicClientLoopFailure(
self.assert_('exc_info' in log[0][1]) self.assert_('exc_info' in log[0][1])
self.assertEqual(log[1][0], "Couldn't close a dispatcher.") self.assertEqual(log[1][0], "Couldn't close a dispatcher.")
self.assert_('exc_info' in log[1][1]) self.assert_('exc_info' in log[1][1])
class ConnectionInvalidationOnReconnect(
ZEO.tests.ConnectionTests.CommonSetupTearDown):
"""Test what happens when the client loop falls over
"""
def getConfig(self, path, create, read_only):
return """<mappingstorage 1/>"""
def checkConnectionInvalidationOnReconnect(self):
storage = ClientStorage(self.addr, wait=1, min_disconnect_poll=0.1)
self._storage = storage
# and we'll wait for the storage to be reconnected:
for i in range(100):
if storage.is_connected():
break
time.sleep(0.1)
else:
raise AssertionError("Couldn't connect to server")
class DummyDB:
_invalidatedCache = 0
def invalidateCache(self):
self._invalidatedCache += 1
def invalidate(*a, **k):
pass
db = DummyDB()
storage.registerDB(db, None)
base = db._invalidatedCache
# Now we'll force a disconnection and reconnection
storage._connection.close()
# and we'll wait for the storage to be reconnected:
for i in range(100):
if storage.is_connected():
break
time.sleep(0.1)
else:
raise AssertionError("Couldn't connect to server")
# Now, the root object in the connection should have been invalidated:
self.assertEqual(db._invalidatedCache, base+1)
class DemoStorageWrappedAroundClientStorage(DemoStorageWrappedBase): class DemoStorageWrappedAroundClientStorage(DemoStorageWrappedBase):
...@@ -345,6 +392,7 @@ test_classes = [OneTimeTests, ...@@ -345,6 +392,7 @@ test_classes = [OneTimeTests,
DemoStorageWrappedAroundClientStorage, DemoStorageWrappedAroundClientStorage,
HeartbeatTests, HeartbeatTests,
CatastrophicClientLoopFailure, CatastrophicClientLoopFailure,
ConnectionInvalidationOnReconnect,
] ]
def test_suite(): def test_suite():
......
...@@ -159,6 +159,9 @@ class Connection(ExportImport, object): ...@@ -159,6 +159,9 @@ class Connection(ExportImport, object):
self._inv_lock = threading.Lock() self._inv_lock = threading.Lock()
self._invalidated = {} self._invalidated = {}
# Flag indicating whether the cache has been invalidated:
self._invalidatedCache = False
# We intend to prevent committing a transaction in which # We intend to prevent committing a transaction in which
# ReadConflictError occurs. _conflicts is the set of oids that # ReadConflictError occurs. _conflicts is the set of oids that
# experienced ReadConflictError. Any time we raise ReadConflictError, # experienced ReadConflictError. Any time we raise ReadConflictError,
...@@ -311,6 +314,14 @@ class Connection(ExportImport, object): ...@@ -311,6 +314,14 @@ class Connection(ExportImport, object):
finally: finally:
self._inv_lock.release() self._inv_lock.release()
def invalidateCache(self):
self._inv_lock.acquire()
try:
self._invalidatedCache = True
finally:
self._inv_lock.release()
def root(self): def root(self):
"""Return the database root object.""" """Return the database root object."""
return self.get(z64) return self.get(z64)
...@@ -473,6 +484,9 @@ class Connection(ExportImport, object): ...@@ -473,6 +484,9 @@ class Connection(ExportImport, object):
invalidated = self._invalidated invalidated = self._invalidated
self._invalidated = {} self._invalidated = {}
self._txn_time = None self._txn_time = None
if self._invalidatedCache:
self._invalidatedCache = False
invalidated = self._cache.cache_data.copy()
finally: finally:
self._inv_lock.release() self._inv_lock.release()
...@@ -524,6 +538,9 @@ class Connection(ExportImport, object): ...@@ -524,6 +538,9 @@ class Connection(ExportImport, object):
self._added_during_commit = [] self._added_during_commit = []
if self._invalidatedCache:
raise ConflictError()
for obj in self._registered_objects: for obj in self._registered_objects:
oid = obj._p_oid oid = obj._p_oid
assert oid assert oid
...@@ -782,6 +799,10 @@ class Connection(ExportImport, object): ...@@ -782,6 +799,10 @@ class Connection(ExportImport, object):
# dict update could go on in another thread, but we don't care # dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway. # because we have to check again after the load anyway.
if self._invalidatedCache:
raise ReadConflictError()
if (obj._p_oid in self._invalidated and if (obj._p_oid in self._invalidated and
not myhasattr(obj, "_p_independent")): not myhasattr(obj, "_p_independent")):
# If the object has _p_independent(), we will handle it below. # If the object has _p_independent(), we will handle it below.
...@@ -970,6 +991,7 @@ class Connection(ExportImport, object): ...@@ -970,6 +991,7 @@ class Connection(ExportImport, object):
""" """
self._reset_counter = global_reset_counter self._reset_counter = global_reset_counter
self._invalidated.clear() self._invalidated.clear()
self._invalidatedCache = False
cache_size = self._cache.cache_size cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size) self._cache = cache = PickleCache(self, cache_size)
......
...@@ -480,6 +480,12 @@ class DB(object): ...@@ -480,6 +480,12 @@ class DB(object):
c.invalidate(tid, oids) c.invalidate(tid, oids)
self._connectionMap(inval) self._connectionMap(inval)
def invalidateCache(self):
"""Invalidate each of the connection caches
"""
self._miv_cache.clear()
self._connectionMap(lambda c: c.invalidateCache())
def modifiedInVersion(self, oid): def modifiedInVersion(self, oid):
h = hash(oid) % 131 h = hash(oid) % 131
cache = self._miv_cache cache = self._miv_cache
......
...@@ -283,40 +283,51 @@ class IConnection(Interface): ...@@ -283,40 +283,51 @@ class IConnection(Interface):
If clear is True, reset the counters. If clear is True, reset the counters.
""" """
def invalidateCache():
"""Invalidate the connection cache
This invalidates *all* objects in the cache. If the connection
is open, subsequent reads will fail until a new transaction
begins or until the connection os reopned.
"""
class IDatabase(Interface): class IDatabase(Interface):
"""ZODB DB. """ZODB DB.
TODO: This interface is incomplete. TODO: This interface is incomplete.
""" """
def __init__(storage, ## __init__ methods don't belong in interfaces:
pool_size=7, ##
cache_size=400, ## def __init__(storage,
version_pool_size=3, ## pool_size=7,
version_cache_size=100, ## cache_size=400,
database_name='unnamed', ## version_pool_size=3,
databases=None, ## version_cache_size=100,
): ## database_name='unnamed',
"""Create an object database. ## databases=None,
## ):
storage: the storage used by the database, e.g. FileStorage ## """Create an object database.
pool_size: expected maximum number of open connections
cache_size: target size of Connection object cache, in number of ## storage: the storage used by the database, e.g. FileStorage
objects ## pool_size: expected maximum number of open connections
version_pool_size: expected maximum number of connections (per ## cache_size: target size of Connection object cache, in number of
version) ## objects
version_cache_size: target size of Connection object cache for ## version_pool_size: expected maximum number of connections (per
version connections, in number of objects ## version)
database_name: when using a multi-database, the name of this DB ## version_cache_size: target size of Connection object cache for
within the database group. It's a (detected) error if databases ## version connections, in number of objects
is specified too and database_name is already a key in it. ## database_name: when using a multi-database, the name of this DB
This becomes the value of the DB's database_name attribute. ## within the database group. It's a (detected) error if databases
databases: when using a multi-database, a mapping to use as the ## is specified too and database_name is already a key in it.
binding of this DB's .databases attribute. It's intended ## This becomes the value of the DB's database_name attribute.
that the second and following DB's added to a multi-database ## databases: when using a multi-database, a mapping to use as the
pass the .databases attribute set on the first DB added to the ## binding of this DB's .databases attribute. It's intended
collection. ## that the second and following DB's added to a multi-database
""" ## pass the .databases attribute set on the first DB added to the
## collection.
## """
databases = Attribute("""\ databases = Attribute("""\
A mapping from database name to DB (database) object. A mapping from database name to DB (database) object.
...@@ -328,6 +339,12 @@ class IDatabase(Interface): ...@@ -328,6 +339,12 @@ class IDatabase(Interface):
entry. entry.
""") """)
def invalidateCache():
"""Invalidate all objects in the database object caches
invalidateCache will be called on each of the database's connections.
"""
class IStorage(Interface): class IStorage(Interface):
"""A storage is responsible for storing and retrieving data of objects. """A storage is responsible for storing and retrieving data of objects.
""" """
......
...@@ -478,6 +478,80 @@ class InvalidationTests(unittest.TestCase): ...@@ -478,6 +478,80 @@ class InvalidationTests(unittest.TestCase):
""" """
def test_invalidateCache():
"""\
The invalidateCache method invalidates a connection's cache. It also
prevents reads until the end of a transaction.
>>> from ZODB.tests.util import DB
>>> import transaction
>>> db = DB()
>>> tm = transaction.TransactionManager()
>>> connection = db.open(transaction_manager=tm)
>>> connection.root()['a'] = StubObject()
>>> connection.root()['a'].x = 1
>>> connection.root()['b'] = StubObject()
>>> connection.root()['b'].x = 1
>>> connection.root()['c'] = StubObject()
>>> connection.root()['c'].x = 1
>>> tm.commit()
>>> connection.root()['b']._p_deactivate()
>>> connection.root()['c'].x = 2
So we have a connection and an active transaction with some
modifications. Lets call invalidateCache:
>>> connection.invalidateCache()
Now, if we try to load an object, we'll get a read conflict:
>>> connection.root()['b'].x
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
If we try to commit the transaction, we'll get a conflict error:
>>> tm.commit()
Traceback (most recent call last):
...
ConflictError: database conflict error
and the cache will have been cleared:
>>> print connection.root()['a']._p_changed
None
>>> print connection.root()['b']._p_changed
None
>>> print connection.root()['c']._p_changed
None
But we'll be able to access data again:
>>> connection.root()['b'].x
1
Aborting a transaction after a read conflict also lets us read data
and go on about our business:
>>> connection.invalidateCache()
>>> connection.root()['c'].x
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
>>> tm.abort()
>>> connection.root()['c'].x
1
>>> connection.root()['c'].x = 2
>>> tm.commit()
>>> db.close()
"""
# ---- stubs # ---- stubs
class StubObject(Persistent): class StubObject(Persistent):
......
...@@ -18,6 +18,8 @@ import warnings ...@@ -18,6 +18,8 @@ import warnings
import transaction import transaction
from zope.testing import doctest
import ZODB import ZODB
import ZODB.FileStorage import ZODB.FileStorage
...@@ -130,5 +132,49 @@ class DBTests(unittest.TestCase): ...@@ -130,5 +132,49 @@ class DBTests(unittest.TestCase):
self.assertEqual(nconn(pools), 3) self.assertEqual(nconn(pools), 3)
def test_invalidateCache():
"""\
The invalidateCache method invalidates a connection caches for all of the connections attached to a database.
>>> from ZODB.tests.util import DB
>>> import transaction
>>> db = DB()
>>> tm1 = transaction.TransactionManager()
>>> c1 = db.open(transaction_manager=tm1)
>>> c1.root()['a'] = MinPO(1)
>>> tm1.commit()
>>> tm2 = transaction.TransactionManager()
>>> c2 = db.open(transaction_manager=tm2)
>>> c1.root()['a']._p_deactivate()
>>> tm3 = transaction.TransactionManager()
>>> c3 = db.open(transaction_manager=tm3)
>>> c3.root()['a'].value
1
>>> c3.close()
>>> db.invalidateCache()
>>> c1.root()['a'].value
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
>>> c2.root()['a'].value
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
>>> c3 is db.open(transaction_manager=tm3)
True
>>> print c3.root()['a']._p_changed
None
>>> db.close()
"""
def test_suite(): def test_suite():
return unittest.makeSuite(DBTests) s = unittest.makeSuite(DBTests)
s.addTest(doctest.DocTestSuite())
return s
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment