Commit 6795ca8f authored by Jeremy Hylton's avatar Jeremy Hylton

Fix bug in quick verification.

The server was sending all the invalidations from the queue, not just
the ones that were later than the requested tid.  This didn't affect
correctness of the cache, but did cause it to throw out valid data.

Add test case to verify that getInvalidations() returns only what is
expected.

Bug fix candidate.
parent c2c5a344
...@@ -731,7 +731,10 @@ class StorageServer: ...@@ -731,7 +731,10 @@ class StorageServer:
self.database = None self.database = None
if auth_protocol: if auth_protocol:
self._setup_auth(auth_protocol) self._setup_auth(auth_protocol)
# A list of at most invalidation_queue_size invalidations # A list of at most invalidation_queue_size invalidations.
# The list is kept in sorted order with the most recent
# invalidation at the front. The list never has more than
# self.invq_bound elements.
self.invq = [] self.invq = []
self.invq_bound = invalidation_queue_size self.invq_bound = invalidation_queue_size
self.connections = {} self.connections = {}
...@@ -849,8 +852,8 @@ class StorageServer: ...@@ -849,8 +852,8 @@ class StorageServer:
""" """
if invalidated: if invalidated:
if len(self.invq) >= self.invq_bound: if len(self.invq) >= self.invq_bound:
del self.invq[0] self.invq.pop()
self.invq.append((tid, invalidated)) self.invq.insert(0, (tid, invalidated))
for p in self.connections.get(storage_id, ()): for p in self.connections.get(storage_id, ()):
if invalidated and p is not conn: if invalidated and p is not conn:
p.client.invalidateTransaction(tid, invalidated) p.client.invalidateTransaction(tid, invalidated)
...@@ -871,19 +874,18 @@ class StorageServer: ...@@ -871,19 +874,18 @@ class StorageServer:
log("invq empty") log("invq empty")
return None, [] return None, []
earliest_tid = self.invq[0][0] earliest_tid = self.invq[-1][0]
if earliest_tid > tid: if earliest_tid > tid:
log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid))) log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
return None, [] return None, []
# XXX this is wrong! must check against tid or we invalidate
# too much.
oids = {} oids = {}
for tid, L in self.invq: for _tid, L in self.invq:
if _tid <= tid:
break
for key in L: for key in L:
oids[key] = 1 oids[key] = 1
latest_tid = self.invq[-1][0] latest_tid = self.invq[0][0]
return latest_tid, oids.keys() return latest_tid, oids.keys()
def close_server(self): def close_server(self):
......
...@@ -23,6 +23,7 @@ import threading ...@@ -23,6 +23,7 @@ import threading
import zLOG import zLOG
import ZEO.ServerStub
from ZEO.ClientStorage import ClientStorage from ZEO.ClientStorage import ClientStorage
from ZEO.Exceptions import ClientDisconnected from ZEO.Exceptions import ClientDisconnected
from ZEO.zrpc.marshal import Marshaller from ZEO.zrpc.marshal import Marshaller
...@@ -34,13 +35,24 @@ from ZODB.Transaction import get_transaction, Transaction ...@@ -34,13 +35,24 @@ from ZODB.Transaction import get_transaction, Transaction
from ZODB.POSException import ReadOnlyError, ConflictError from ZODB.POSException import ReadOnlyError, ConflictError
from ZODB.tests.StorageTestBase import StorageTestBase from ZODB.tests.StorageTestBase import StorageTestBase
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle from ZODB.tests.StorageTestBase \
from ZODB.tests.StorageTestBase import handle_all_serials, ZERO import zodb_pickle, zodb_unpickle, handle_all_serials, handle_serials
from ZODB.tests.StorageTestBase import handle_serials
ZERO = '\0'*8
class TestServerStub(ZEO.ServerStub.StorageServer):
__super_getInvalidations = ZEO.ServerStub.StorageServer.getInvalidations
def getInvalidations(self, tid):
# squirrel the results away for inspection by test case
self._last_invals = self.__super_getInvalidations(tid)
return self._last_invals
class TestClientStorage(ClientStorage): class TestClientStorage(ClientStorage):
test_connection = 0 test_connection = False
StorageServerStubClass = TestServerStub
def verify_cache(self, stub): def verify_cache(self, stub):
self.end_verify = threading.Event() self.end_verify = threading.Event()
...@@ -54,7 +66,7 @@ class TestClientStorage(ClientStorage): ...@@ -54,7 +66,7 @@ class TestClientStorage(ClientStorage):
try: try:
return ClientStorage.testConnection(self, conn) return ClientStorage.testConnection(self, conn)
finally: finally:
self.test_connection = 1 self.test_connection = True
class DummyDB: class DummyDB:
def invalidate(self, *args, **kwargs): def invalidate(self, *args, **kwargs):
...@@ -571,7 +583,7 @@ class ConnectionTests(CommonSetupTearDown): ...@@ -571,7 +583,7 @@ class ConnectionTests(CommonSetupTearDown):
db1.close() db1.close()
class InvqTests(CommonSetupTearDown): class InvqTests(CommonSetupTearDown):
invq = 2 invq = 3
def checkQuickVerificationWith2Clients(self): def checkQuickVerificationWith2Clients(self):
perstorage = self.openClientStorage(cache="test") perstorage = self.openClientStorage(cache="test")
...@@ -587,7 +599,10 @@ class InvqTests(CommonSetupTearDown): ...@@ -587,7 +599,10 @@ class InvqTests(CommonSetupTearDown):
# message is generated # message is generated
revid = self._dostore(oid) revid = self._dostore(oid)
revid = self._dostore(oid, revid) revid = self._dostore(oid, revid)
self._dostore(oid2) # Create a second object and revision to guarantee it doesn't
# show up in the list of invalidations sent when perstore restarts.
revid2 = self._dostore(oid2)
revid2 = self._dostore(oid2, revid2)
# sync() is needed to prevent invalidation for oid from arriving # sync() is needed to prevent invalidation for oid from arriving
# in the middle of the load() call. # in the middle of the load() call.
...@@ -596,10 +611,11 @@ class InvqTests(CommonSetupTearDown): ...@@ -596,10 +611,11 @@ class InvqTests(CommonSetupTearDown):
perstorage.close() perstorage.close()
revid = self._dostore(oid, revid) revid = self._dostore(oid, revid)
perstorage = self.openClientStorage(cache="test") perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "quick verification") self.assertEqual(perstorage.verify_result, "quick verification")
self.assertEqual(perstorage._server._last_invals,
(revid, [(oid, '')]))
self.assertEqual(perstorage.load(oid, ''), self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, '')) self._storage.load(oid, ''))
perstorage.close() perstorage.close()
...@@ -865,7 +881,6 @@ class TimeoutTests(CommonSetupTearDown): ...@@ -865,7 +881,6 @@ class TimeoutTests(CommonSetupTearDown):
# Create the object # Create the object
oid = storage.new_oid() oid = storage.new_oid()
obj = MinPO(7) obj = MinPO(7)
ZERO = '\0'*8
# Now do a store, sleeping before the finish so as to cause a timeout # Now do a store, sleeping before the finish so as to cause a timeout
t = Transaction() t = Transaction()
storage.tpc_begin(t) storage.tpc_begin(t)
...@@ -895,7 +910,6 @@ class TimeoutTests(CommonSetupTearDown): ...@@ -895,7 +910,6 @@ class TimeoutTests(CommonSetupTearDown):
# Create the object # Create the object
oid = storage.new_oid() oid = storage.new_oid()
obj = MinPO(7) obj = MinPO(7)
ZERO = '\0'*8
# We need to successfully commit an object now so we have something to # We need to successfully commit an object now so we have something to
# conflict about. # conflict about.
t = Transaction() t = Transaction()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment