Commit e044bfcc authored by Jeremy Hylton's avatar Jeremy Hylton

Bacport various cache consistency bug fixes from the ZODB3-3_1-branch.

parent 1616e14c
......@@ -268,6 +268,15 @@ class ClientStorage:
self._oid_lock = threading.Lock()
self._oids = [] # Object ids retrieved from new_oids()
# load() and tpc_finish() must be serialized to guarantee
# that cache modifications from each occur atomically.
# It also prevents multiple load calls occuring simultaneously,
# which simplifies the cache logic.
self._load_lock = threading.Lock()
# _load_oid and _load_status are protected by _lock
self._load_oid = None
self._load_status = None
# Can't read data in one thread while writing data
# (tpc_finish) in another thread. In general, the lock
# must prevent access to the cache while _update_cache
......@@ -696,20 +705,37 @@ class ClientStorage:
"""
self._lock.acquire() # for atomic processing of invalidations
try:
p = self._cache.load(oid, version)
if p:
return p
pair = self._cache.load(oid, version)
if pair:
return pair
finally:
self._lock.release()
if self._server is None:
raise ClientDisconnected()
# If an invalidation for oid comes in during zeoLoad, that's OK
# because we'll get oid's new state.
p, s, v, pv, sv = self._server.zeoLoad(oid)
self._cache.checkSize(0)
self._cache.store(oid, p, s, v, pv, sv)
self._load_lock.acquire()
try:
self._lock.acquire()
try:
self._load_oid = oid
self._load_status = 1
finally:
self._lock.release()
p, s, v, pv, sv = self._server.zeoLoad(oid)
self._lock.acquire() # for atomic processing of invalidations
try:
if self._load_status:
self._cache.checkSize(0)
self._cache.store(oid, p, s, v, pv, sv)
self._load_oid = None
finally:
self._lock.release()
finally:
self._load_lock.release()
if v and version and v == version:
return pv, sv
else:
......@@ -864,22 +890,22 @@ class ClientStorage:
"""Storage API: finish a transaction."""
if transaction is not self._transaction:
return
self._load_lock.acquire()
try:
self._lock.acquire() # for atomic processing of invalidations
try:
self._update_cache()
if f is not None:
f()
finally:
self._lock.release()
if f is not None:
f()
tid = self._server.tpc_finish(self._serial)
self._cache.setLastTid(tid)
self._server.tpc_finish(self._serial)
r = self._check_serials()
assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
finally:
self._load_lock.release()
self.end_transaction()
def _update_cache(self):
......@@ -1000,15 +1026,17 @@ class ClientStorage:
# Invalidations are sent by the ZEO server as a sequence of
# oid, version pairs. The DB's invalidate() method expects a
# dictionary of oids.
self._lock.acquire()
try:
# versions maps version names to dictionary of invalidations
versions = {}
for oid, version in invs:
d = versions.setdefault(version, {})
if oid == self._load_oid:
self._load_status = 0
self._cache.invalidate(oid, version=version)
d[oid] = 1
versions.setdefault(version, {})[oid] = 1
if self._db is not None:
for v, d in versions.items():
self._db.invalidate(d, version=v)
......@@ -1038,10 +1066,10 @@ class ClientStorage:
"""Invalidate objects modified by tid."""
self._cache.setLastTid(tid)
if self._pickler is not None:
self.log("Transactional invalidation during cache verification",
level=zLOG.BLATHER)
log2(BLATHER,
"Transactional invalidation during cache verification")
for t in args:
self.self._pickler.dump(t)
self._pickler.dump(t)
return
self._process_invalidations(args)
......
......@@ -53,7 +53,7 @@ class ClientStorage:
self.rpc.callAsync('endVerify')
def invalidateTransaction(self, tid, args):
self.rpc.callAsync('invalidateTransaction', tid, args)
self.rpc.callAsyncNoPoll('invalidateTransaction', tid, args)
def serialnos(self, arg):
self.rpc.callAsync('serialnos', arg)
......
......@@ -30,6 +30,7 @@ from ZEO.ClientStorage import ClientStorage
from ZEO.Exceptions import ClientDisconnected
from ZEO.zrpc.marshal import Marshaller
from ZEO.tests import forker
from ZEO.tests.InvalidationTests import InvalidationTests
from ZODB.DB import DB
from ZODB.Transaction import get_transaction, Transaction
......@@ -198,7 +199,7 @@ class CommonSetupTearDown(StorageTestBase):
self.fail("timed out waiting for storage to disconnect")
class ConnectionTests(CommonSetupTearDown):
class ConnectionTests(CommonSetupTearDown, InvalidationTests):
"""Tests that explicitly manage the server process.
To test the cache or re-connection, these test cases explicit
......
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from thread import get_ident
import threading
import time
from BTrees.check import check, display
from BTrees.OOBTree import OOBTree
from ZEO.tests.TestThread import TestThread
from ZODB.DB import DB
from ZODB.POSException \
import ReadConflictError, ConflictError, VersionLockError
import zLOG
class StressThread(TestThread):
def __init__(self, testcase, db, stop, threadnum, startnum,
step=2, sleep=None):
TestThread.__init__(self, testcase)
self.db = db
self.stop = stop
self.threadnum = threadnum
self.startnum = startnum
self.step = step
self.sleep = sleep
self.added_keys = []
def testrun(self):
cn = self.db.open()
while not self.stop.isSet():
try:
tree = cn.root()["tree"]
break
except (ConflictError, KeyError):
get_transaction().abort()
cn.sync()
key = self.startnum
while not self.stop.isSet():
try:
tree[key] = self.threadnum
get_transaction().note("add key %s" % key)
get_transaction().commit()
if self.sleep:
time.sleep(self.sleep)
except (ReadConflictError, ConflictError), msg:
get_transaction().abort()
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
# no objects were modified so cn never got registered
# with the transaction.
cn.sync()
else:
self.added_keys.append(key)
key += self.step
cn.close()
class VersionStressThread(TestThread):
def __init__(self, testcase, db, stop, threadnum, startnum,
step=2, sleep=None):
TestThread.__init__(self, testcase)
self.db = db
self.stop = stop
self.threadnum = threadnum
self.startnum = startnum
self.step = step
self.sleep = sleep
self.added_keys = []
def log(self, msg):
zLOG.LOG("thread %d" % get_ident(), 0, msg)
def testrun(self):
self.log("thread begin")
commit = 0
key = self.startnum
while not self.stop.isSet():
version = "%s:%s" % (self.threadnum, key)
commit = not commit
self.log("attempt to add key=%s version=%s commit=%d" %
(key, version, commit))
if self.oneupdate(version, key, commit):
self.added_keys.append(key)
key += self.step
def oneupdate(self, version, key, commit=1):
# The mess of sleeps below were added to reduce the number
# of VersionLockErrors, based on empirical observation.
# It looks like the threads don't switch enough without
# the sleeps.
cn = self.db.open(version)
while not self.stop.isSet():
try:
tree = cn.root()["tree"]
break
except (ConflictError, KeyError):
get_transaction().abort()
cn.sync()
while not self.stop.isSet():
try:
tree[key] = self.threadnum
get_transaction().note("add key %d" % key)
get_transaction().commit()
if self.sleep:
time.sleep(self.sleep)
break
except (VersionLockError, ReadConflictError, ConflictError), msg:
self.log(msg)
get_transaction().abort()
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
# no objects were modified so cn never got registered
# with the transaction.
cn.sync()
if self.sleep:
time.sleep(self.sleep)
try:
while not self.stop.isSet():
try:
if commit:
self.db.commitVersion(version)
get_transaction().note("commit version %s" % version)
else:
self.db.abortVersion(version)
get_transaction().note("abort version %s" % version)
get_transaction().commit()
if self.sleep:
time.sleep(self.sleep)
return commit
except ConflictError, msg:
self.log(msg)
get_transaction().abort()
cn.sync()
finally:
cn.close()
return 0
class InvalidationTests:
level = 2
DELAY = 15
def _check_tree(self, cn, tree):
# Make sure the BTree is sane and that all the updates persisted
retries = 3
while retries:
retries -= 1
try:
check(tree)
tree._check()
except ReadConflictError:
if retries:
get_transaction().abort()
cn.sync()
else:
raise
except:
display(tree)
raise
def _check_threads(self, tree, *threads):
# Make sure the thread's view of the world is consistent with
# the actual database state.
for t in threads:
# If the test didn't add any keys, it didn't do what we expected.
self.assert_(t.added_keys)
for key in t.added_keys:
self.assert_(tree.has_key(key), key)
def go(self, stop, *threads):
# Run the threads
for t in threads:
t.start()
time.sleep(self.DELAY)
stop.set()
for t in threads:
t.cleanup()
def checkConcurrentUpdates2Storages(self):
self._storage = storage1 = self.openClientStorage()
storage2 = self.openClientStorage(cache="2")
db1 = DB(storage1)
db2 = DB(storage2)
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
# Run two threads that update the BTree
t1 = StressThread(self, db1, stop, 1, 1)
t2 = StressThread(self, db2, stop, 2, 2)
self.go(stop, t1, t2)
cn.sync()
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2)
cn.close()
db1.close()
db2.close()
def checkConcurrentUpdates1Storage(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
# Run two threads that update the BTree
t1 = StressThread(self, db1, stop, 1, 1, sleep=0.001)
t2 = StressThread(self, db1, stop, 2, 2, sleep=0.001)
self.go(stop, t1, t2)
cn.sync()
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2)
cn.close()
db1.close()
def checkConcurrentUpdates2StoragesMT(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
db2 = DB(self.openClientStorage(cache="2"))
# Run three threads that update the BTree.
# Two of the threads share a single storage so that it
# is possible for both threads to read the same object
# at the same time.
t1 = StressThread(self, db1, stop, 1, 1, 3)
t2 = StressThread(self, db2, stop, 2, 2, 3, 0.001)
t3 = StressThread(self, db2, stop, 3, 3, 3, 0.001)
self.go(stop, t1, t2, t3)
cn.sync()
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2, t3)
cn.close()
db1.close()
db2.close()
def checkConcurrentUpdatesInVersions(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
db2 = DB(self.openClientStorage(cache="2"))
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
# Run three threads that update the BTree.
# Two of the threads share a single storage so that it
# is possible for both threads to read the same object
# at the same time.
t1 = VersionStressThread(self, db1, stop, 1, 1, 3)
t2 = VersionStressThread(self, db2, stop, 2, 2, 3, 0.001)
t3 = VersionStressThread(self, db2, stop, 3, 3, 3, 0.001)
self.go(stop, t1, t2, t3)
cn.sync()
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2, t3)
cn.close()
db1.close()
db2.close()
......@@ -110,12 +110,9 @@ class MappingStorageTimeoutTests(
test_classes = [FileStorageConnectionTests,
FileStorageReconnectionTests,
FileStorageTimeoutTests]
test_classes.extend(
[MappingStorageConnectionTests,
MappingStorageTimeoutTests])
FileStorageTimeoutTests,
MappingStorageConnectionTests,
MappingStorageTimeoutTests]
import BDBStorage
if BDBStorage.is_available:
......
......@@ -398,6 +398,14 @@ class Connection(smac.SizedMessageAsyncConnection):
self.send_call(method, args, ASYNC)
self.poll()
def callAsyncNoPoll(self, method, *args):
# Like CallAsync but doesn't poll. This exists so that we can
# send invalidations atomically to all clients without
# allowing any client to sneak in a load request.
if self.closed:
raise DisconnectedError()
self.send_call(method, args, ASYNC)
# handle IO, possibly in async mode
def _prepare_async(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment