Commit b7325a8d authored by Jeremy Hylton's avatar Jeremy Hylton

Fix ZEO to work correctly with atomic invalidations.

parent edfa017f
...@@ -903,25 +903,32 @@ class ClientStorage: ...@@ -903,25 +903,32 @@ class ClientStorage:
return return
self._pickler.dump(args) self._pickler.dump(args)
def _process_invalidations(self, invs):
# Invalidations are sent by the ZEO server as a sequence of
# oid, version pairs. The DB's invalidate() method expects a
# dictionary of oids.
# versions maps version names to dictionary of invalidations
versions = {}
for oid, version in invs:
d = versions.setdefault(version, {})
self._cache.invalidate(oid, version=version)
d[oid] = 1
if self._db is not None:
for v, d in versions.items():
self._db.invalidate(d, version=v)
def endVerify(self): def endVerify(self):
"""Server callback to signal end of cache validation.""" """Server callback to signal end of cache validation."""
if self._pickler is None: if self._pickler is None:
return return
self._pickler.dump((0,0)) # write end-of-data marker
self._pickler.dump((None, None))
self._pickler = None self._pickler = None
self._tfile.seek(0) self._tfile.seek(0)
unpick = cPickle.Unpickler(self._tfile)
f = self._tfile f = self._tfile
self._tfile = None self._tfile = None
self._process_invalidations(InvalidationLogIterator(f))
while 1:
oid, version = unpick.load()
log2(INFO, "verify invalidate %r" % oid)
if not oid:
break
self._cache.invalidate(oid, version=version)
if self._db is not None:
self._db.invalidate(oid, version=version)
f.close() f.close()
log2(INFO, "endVerify finishing") log2(INFO, "endVerify finishing")
...@@ -939,11 +946,7 @@ class ClientStorage: ...@@ -939,11 +946,7 @@ class ClientStorage:
for t in args: for t in args:
self.self._pickler.dump(t) self.self._pickler.dump(t)
return return
db = self._db self._process_invalidations(args)
for oid, version in args:
self._cache.invalidate(oid, version=version)
if db is not None:
db.invalidate(oid, version=version)
# The following are for compatibility with protocol version 2.0.0 # The following are for compatibility with protocol version 2.0.0
...@@ -954,4 +957,18 @@ class ClientStorage: ...@@ -954,4 +957,18 @@ class ClientStorage:
end = endVerify end = endVerify
Invalidate = invalidateTrans Invalidate = invalidateTrans
class InvalidationLogIterator:
"""Helper class for reading invalidations in endVerify."""
# XXX will require extra work to backport to Python 2.1
def __init__(self, fileobj):
self._unpickler = cPickle.Unpickler(fileobj)
def __iter__(self):
return self
def next(self):
oid, version = self._unpickler.load()
if oid is None:
raise StopIteration
return oid, version
...@@ -31,6 +31,7 @@ from ZEO.Exceptions import ClientDisconnected ...@@ -31,6 +31,7 @@ from ZEO.Exceptions import ClientDisconnected
from ZEO.zrpc.marshal import Marshaller from ZEO.zrpc.marshal import Marshaller
from ZEO.tests import forker from ZEO.tests import forker
from ZODB.DB import DB
from ZODB.Transaction import get_transaction, Transaction from ZODB.Transaction import get_transaction, Transaction
from ZODB.POSException import ReadOnlyError from ZODB.POSException import ReadOnlyError
from ZODB.tests.StorageTestBase import StorageTestBase from ZODB.tests.StorageTestBase import StorageTestBase
...@@ -61,6 +62,7 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -61,6 +62,7 @@ class CommonSetupTearDown(StorageTestBase):
invq = None invq = None
timeout = None timeout = None
monitor = 0 monitor = 0
db_class = DummyDB
def setUp(self): def setUp(self):
"""Test setup for connection tests. """Test setup for connection tests.
...@@ -472,6 +474,36 @@ class ConnectionTests(CommonSetupTearDown): ...@@ -472,6 +474,36 @@ class ConnectionTests(CommonSetupTearDown):
for t in threads: for t in threads:
t.closeclients() t.closeclients()
def checkCrossDBInvalidations(self):
db1 = DB(self.openClientStorage())
c1 = db1.open()
r1 = c1.root()
r1["a"] = MinPO("a")
get_transaction().commit()
db2 = DB(self.openClientStorage())
r2 = db2.open().root()
self.assertEqual(r2["a"].value, "a")
r2["b"] = MinPO("b")
get_transaction().commit()
# make sure the invalidation is received in the other client
c1._storage.sync()
self.assert_(r1._p_oid in c1._invalidated)
# force the invalidations to be applied...
c1.setLocalTransaction()
c1.getTransaction().register(c1)
c1.getTransaction().abort()
r1.keys() # unghostify
self.assertEqual(r1._p_serial, r2._p_serial)
db2.close()
db1.close()
class ReconnectionTests(CommonSetupTearDown): class ReconnectionTests(CommonSetupTearDown):
keep = 1 keep = 1
invq = 2 invq = 2
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment