Commit 4e422371 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Remove ZODB test module copy, use it from imports.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1780 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent c457898e
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# Copyright (C) 2009-2010 Nexedi SA
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
##############################################################################
import unittest
import ZODB
import ZODB.FileStorage
from ZODB.POSException import ReadConflictError
from ZODB.POSException import TransactionFailedError
from ZODB.tests.warnhook import WarningsHook
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from persistent import Persistent
from persistent.mapping import PersistentMapping
import transaction
import unittest
from ZODB.tests.testZODB import ZODBTests
from neo.tests.zodb import ZODBTestCase
class P(Persistent):
class NEOZODBTests(ZODBTestCase, ZODBTests):
pass
class Independent(Persistent):
def _p_independent(self):
return 1
class DecoyIndependent(Persistent):
def _p_independent(self):
return 0
class ZODBTests(ZODBTestCase):
def populate(self):
transaction.begin()
conn = self._db.open()
root = conn.root()
root['test'] = pm = PersistentMapping()
for n in range(100):
pm[n] = PersistentMapping({0: 100 - n})
transaction.get().note('created test data')
transaction.commit()
conn.close()
def checkExportImport(self, abort_it=False):
self.populate()
conn = self._db.open()
try:
self.duplicate(conn, abort_it)
finally:
conn.close()
conn = self._db.open()
try:
self.verify(conn, abort_it)
finally:
conn.close()
def duplicate(self, conn, abort_it):
transaction.begin()
transaction.get().note('duplication')
root = conn.root()
ob = root['test']
assert len(ob) > 10, 'Insufficient test data'
try:
import tempfile
f = tempfile.TemporaryFile()
ob._p_jar.exportFile(ob._p_oid, f)
assert f.tell() > 0, 'Did not export correctly'
f.seek(0)
new_ob = ob._p_jar.importFile(f)
self.assertEqual(new_ob, ob)
root['dup'] = new_ob
f.close()
if abort_it:
transaction.abort()
else:
transaction.commit()
except:
transaction.abort()
raise
def verify(self, conn, abort_it):
transaction.begin()
root = conn.root()
ob = root['test']
try:
ob2 = root['dup']
except KeyError:
if abort_it:
# Passed the test.
return
else:
raise
else:
self.assertTrue(not abort_it, 'Did not abort duplication')
l1 = list(ob.items())
l1.sort()
l2 = list(ob2.items())
l2.sort()
l1 = map(lambda (k, v): (k, v[0]), l1)
l2 = map(lambda (k, v): (k, v[0]), l2)
self.assertEqual(l1, l2)
self.assert_(ob._p_oid != ob2._p_oid)
self.assertEqual(ob._p_jar, ob2._p_jar)
oids = {}
for v in ob.values():
oids[v._p_oid] = 1
for v in ob2.values():
assert not oids.has_key(v._p_oid), (
'Did not fully separate duplicate from original')
transaction.commit()
def checkExportImportAborted(self):
self.checkExportImport(abort_it=True)
def checkVersionOnly(self):
# Make sure the changes to make empty transactions a no-op
# still allow things like abortVersion(). This should work
# because abortVersion() calls tpc_begin() itself.
conn = self._db.open("version")
try:
r = conn.root()
r[1] = 1
transaction.commit()
finally:
conn.close()
self._db.abortVersion("version")
transaction.commit()
def checkResetCache(self):
# The cache size after a reset should be 0. Note that
# _resetCache is not a public API, but the resetCaches()
# function is, and resetCaches() causes _resetCache() to be
# called.
self.populate()
conn = self._db.open()
conn.root()
self.assert_(len(conn._cache) > 0) # Precondition
conn._resetCache()
self.assertEqual(len(conn._cache), 0)
def checkResetCachesAPI(self):
# Checks the resetCaches() API.
# (resetCaches used to be called updateCodeTimestamp.)
self.populate()
conn = self._db.open()
conn.root()
self.assert_(len(conn._cache) > 0) # Precondition
ZODB.Connection.resetCaches()
conn.close()
self.assert_(len(conn._cache) > 0) # Still not flushed
conn._setDB(self._db) # simulate the connection being reopened
self.assertEqual(len(conn._cache), 0)
def checkExplicitTransactionManager(self):
# Test of transactions that apply to only the connection,
# not the thread.
tm1 = transaction.TransactionManager()
conn1 = self._db.open(transaction_manager=tm1)
tm2 = transaction.TransactionManager()
conn2 = self._db.open(transaction_manager=tm2)
try:
r1 = conn1.root()
r2 = conn2.root()
if r1.has_key('item'):
del r1['item']
tm1.get().commit()
r1.get('item')
r2.get('item')
r1['item'] = 1
tm1.get().commit()
self.assertEqual(r1['item'], 1)
# r2 has not seen a transaction boundary,
# so it should be unchanged.
self.assertEqual(r2.get('item'), None)
conn2.sync()
# Now r2 is updated.
self.assertEqual(r2['item'], 1)
# Now, for good measure, send an update in the other direction.
r2['item'] = 2
tm2.get().commit()
self.assertEqual(r1['item'], 1)
self.assertEqual(r2['item'], 2)
conn1.sync()
conn2.sync()
self.assertEqual(r1['item'], 2)
self.assertEqual(r2['item'], 2)
finally:
conn1.close()
conn2.close()
def checkLocalTransactions(self):
# Test of transactions that apply to only the connection,
# not the thread.
conn1 = self._db.open()
conn2 = self._db.open()
hook = WarningsHook()
hook.install()
try:
conn1.setLocalTransaction()
conn2.setLocalTransaction()
r1 = conn1.root()
r2 = conn2.root()
if r1.has_key('item'):
del r1['item']
conn1.getTransaction().commit()
r1.get('item')
r2.get('item')
r1['item'] = 1
conn1.getTransaction().commit()
self.assertEqual(r1['item'], 1)
# r2 has not seen a transaction boundary,
# so it should be unchanged.
self.assertEqual(r2.get('item'), None)
conn2.sync()
# Now r2 is updated.
self.assertEqual(r2['item'], 1)
# Now, for good measure, send an update in the other direction.
r2['item'] = 2
conn2.getTransaction().commit()
self.assertEqual(r1['item'], 1)
self.assertEqual(r2['item'], 2)
conn1.sync()
conn2.sync()
self.assertEqual(r1['item'], 2)
self.assertEqual(r2['item'], 2)
for msg, obj, filename, lineno in hook.warnings:
self.assert_(msg in [
"This will be removed in ZODB 3.6:\n"
"setLocalTransaction() is deprecated. "
"Use the transaction_manager argument "
"to DB.open() instead.",
"This will be removed in ZODB 3.6:\n"
"getTransaction() is deprecated. "
"Use the transaction_manager argument "
"to DB.open() instead, or access "
".transaction_manager directly on the Connection."])
finally:
conn1.close()
conn2.close()
hook.uninstall()
def checkReadConflict(self):
self.obj = P()
self.readConflict()
def readConflict(self, shouldFail=True):
# Two transactions run concurrently. Each reads some object,
# then one commits and the other tries to read an object
# modified by the first. This read should fail with a conflict
# error because the object state read is not necessarily
# consistent with the objects read earlier in the transaction.
tm1 = transaction.TransactionManager()
conn = self._db.open(mvcc=False, transaction_manager=tm1)
r1 = conn.root()
r1["p"] = self.obj
self.obj.child1 = P()
tm1.get().commit()
# start a new transaction with a new connection
tm2 = transaction.TransactionManager()
cn2 = self._db.open(mvcc=False, transaction_manager=tm2)
# start a new transaction with the other connection
r2 = cn2.root()
self.assertEqual(r1._p_serial, r2._p_serial)
self.obj.child2 = P()
tm1.get().commit()
# resume the transaction using cn2
obj = r2["p"]
# An attempt to access obj should fail, because r2 was read
# earlier in the transaction and obj was modified by the othe
# transaction.
if shouldFail:
self.assertRaises(ReadConflictError, lambda: obj.child1)
# And since ReadConflictError was raised, attempting to commit
# the transaction should re-raise it. checkNotIndependent()
# failed this part of the test for a long time.
self.assertRaises(ReadConflictError, tm2.get().commit)
# And since that commit failed, trying to commit again should
# fail again.
self.assertRaises(TransactionFailedError, tm2.get().commit)
# And again.
self.assertRaises(TransactionFailedError, tm2.get().commit)
# Etc.
self.assertRaises(TransactionFailedError, tm2.get().commit)
else:
# make sure that accessing the object succeeds
obj.child1
tm2.get().abort()
def checkReadConflictIgnored(self):
# Test that an application that catches a read conflict and
# continues can not commit the transaction later.
root = self._db.open(mvcc=False).root()
root["real_data"] = real_data = PersistentMapping()
root["index"] = index = PersistentMapping()
real_data["a"] = PersistentMapping({"indexed_value": 0})
real_data["b"] = PersistentMapping({"indexed_value": 1})
index[1] = PersistentMapping({"b": 1})
index[0] = PersistentMapping({"a": 1})
transaction.commit()
# load some objects from one connection
tm = transaction.TransactionManager()
cn2 = self._db.open(mvcc=False, transaction_manager=tm)
r2 = cn2.root()
real_data2 = r2["real_data"]
index2 = r2["index"]
real_data["b"]["indexed_value"] = 0
del index[1]["b"]
index[0]["b"] = 1
transaction.commit()
del real_data2["a"]
try:
del index2[0]["a"]
except ReadConflictError:
# This is the crux of the text. Ignore the error.
pass
else:
self.fail("No conflict occurred")
# real_data2 still ready to commit
self.assert_(real_data2._p_changed)
# index2 values not ready to commit
self.assert_(not index2._p_changed)
self.assert_(not index2[0]._p_changed)
self.assert_(not index2[1]._p_changed)
self.assertRaises(ReadConflictError, tm.get().commit)
self.assertRaises(TransactionFailedError, tm.get().commit)
tm.get().abort()
def checkIndependent(self):
self.obj = Independent()
self.readConflict(shouldFail=False)
def checkNotIndependent(self):
self.obj = DecoyIndependent()
self.readConflict()
def checkSubtxnCommitDoesntGetInvalidations(self):
# Prior to ZODB 3.2.9 and 3.4, Connection.tpc_finish() processed
# invalidations even for a subtxn commit. This could make
# inconsistent state visible after a subtxn commit. There was a
# suspicion that POSKeyError was possible as a result, but I wasn't
# able to construct a case where that happened.
# Set up the database, to hold
# root --> "p" -> value = 1
# --> "q" -> value = 2
tm1 = transaction.TransactionManager()
conn = self._db.open(transaction_manager=tm1)
r1 = conn.root()
p = P()
p.value = 1
r1["p"] = p
q = P()
q.value = 2
r1["q"] = q
tm1.commit()
# Now txn T1 changes p.value to 3 locally (subtxn commit).
p.value = 3
tm1.commit(True)
# Start new txn T2 with a new connection.
tm2 = transaction.TransactionManager()
cn2 = self._db.open(transaction_manager=tm2)
r2 = cn2.root()
p2 = r2["p"]
self.assertEqual(p._p_oid, p2._p_oid)
# T2 shouldn't see T1's change of p.value to 3, because T1 didn't
# commit yet.
self.assertEqual(p2.value, 1)
# Change p.value to 4, and q.value to 5. Neither should be visible
# to T1, because T1 is still in progress.
p2.value = 4
q2 = r2["q"]
self.assertEqual(q._p_oid, q2._p_oid)
self.assertEqual(q2.value, 2)
q2.value = 5
tm2.commit()
# Back to T1. p and q still have the expected values.
rt = conn.root()
self.assertEqual(rt["p"].value, 3)
self.assertEqual(rt["q"].value, 2)
# Now do another subtxn commit in T1. This shouldn't change what
# T1 sees for p and q.
rt["r"] = P()
tm1.commit(True)
# Doing that subtxn commit in T1 should not process invalidations
# from T2's commit. p.value should still be 3 here (because that's
# what T1 subtxn-committed earlier), and q.value should still be 2.
# Prior to ZODB 3.2.9 and 3.4, q.value was 5 here.
rt = conn.root()
try:
self.assertEqual(rt["p"].value, 3)
self.assertEqual(rt["q"].value, 2)
finally:
tm1.abort()
def checkReadConflictErrorClearedDuringAbort(self):
# When a transaction is aborted, the "memory" of which
# objects were the cause of a ReadConflictError during
# that transaction should be cleared.
root = self._db.open(mvcc=False).root()
data = PersistentMapping({'d': 1})
root["data"] = data
transaction.commit()
# Provoke a ReadConflictError.
tm2 = transaction.TransactionManager()
cn2 = self._db.open(mvcc=False, transaction_manager=tm2)
r2 = cn2.root()
data2 = r2["data"]
data['d'] = 2
transaction.commit()
try:
data2['d'] = 3
except ReadConflictError:
pass
else:
self.fail("No conflict occurred")
# Explicitly abort cn2's transaction.
tm2.get().abort()
# cn2 should retain no memory of the read conflict after an abort(),
# but 3.2.3 had a bug wherein it did.
data_conflicts = data._p_jar._conflicts
data2_conflicts = data2._p_jar._conflicts
self.failIf(data_conflicts)
self.failIf(data2_conflicts) # this used to fail
# And because of that, we still couldn't commit a change to data2['d']
# in the new transaction.
cn2.sync() # process the invalidation for data2['d']
data2['d'] = 3
tm2.get().commit() # 3.2.3 used to raise ReadConflictError
cn2.close()
def checkTxnBeginImpliesAbort(self):
# begin() should do an abort() first, if needed.
cn = self._db.open()
rt = cn.root()
rt['a'] = 1
transaction.begin() # should abort adding 'a' to the root
rt = cn.root()
self.assertRaises(KeyError, rt.__getitem__, 'a')
# A longstanding bug: this didn't work if changes were only in
# subtransactions.
transaction.begin()
rt = cn.root()
rt['a'] = 2
transaction.commit(1)
transaction.begin()
rt = cn.root()
self.assertRaises(KeyError, rt.__getitem__, 'a')
# One more time, mixing "top level" and subtransaction changes.
transaction.begin()
rt = cn.root()
rt['a'] = 3
transaction.commit(1)
rt['b'] = 4
transaction.begin()
rt = cn.root()
self.assertRaises(KeyError, rt.__getitem__, 'a')
self.assertRaises(KeyError, rt.__getitem__, 'b')
# That used methods of the default transaction *manager*. Alas,
# that's not necessarily the same as using methods of the current
# transaction, and, in fact, when this test was written,
# Transaction.begin() didn't do anything (everything from here
# down failed).
# Oh, bleech. Since Transaction.begin is also deprecated, we have
# to goof around suppressing the deprecation warning.
import warnings
# First verify that Transaction.begin *is* deprecated, by turning
# the warning into an error.
warnings.filterwarnings("error", category=DeprecationWarning)
self.assertRaises(DeprecationWarning, transaction.get().begin)
del warnings.filters[0]
# Now ignore DeprecationWarnings for the duration. Use a
# try/finally block to ensure we reenable DeprecationWarnings
# no matter what.
warnings.filterwarnings("ignore", category=DeprecationWarning)
try:
cn = self._db.open()
rt = cn.root()
rt['a'] = 1
transaction.get().begin() # should abort adding 'a' to the root
rt = cn.root()
self.assertRaises(KeyError, rt.__getitem__, 'a')
# A longstanding bug: this didn't work if changes were only in
# subtransactions.
transaction.get().begin()
rt = cn.root()
rt['a'] = 2
transaction.get().commit(1)
transaction.get().begin()
rt = cn.root()
self.assertRaises(KeyError, rt.__getitem__, 'a')
# One more time, mixing "top level" and subtransaction changes.
transaction.get().begin()
rt = cn.root()
rt['a'] = 3
transaction.get().commit(1)
rt['b'] = 4
transaction.get().begin()
rt = cn.root()
self.assertRaises(KeyError, rt.__getitem__, 'a')
self.assertRaises(KeyError, rt.__getitem__, 'b')
cn.close()
finally:
del warnings.filters[0]
def checkFailingCommitSticks(self):
# See also checkFailingSubtransactionCommitSticks.
cn = self._db.open()
rt = cn.root()
rt['a'] = 1
# Arrange for commit to fail during tpc_vote.
poisoned = PoisonedObject(PoisonedJar(break_tpc_vote=True))
transaction.get().register(poisoned)
self.assertRaises(PoisonedError, transaction.get().commit)
# Trying to commit again fails too.
self.assertRaises(TransactionFailedError, transaction.get().commit)
self.assertRaises(TransactionFailedError, transaction.get().commit)
self.assertRaises(TransactionFailedError, transaction.get().commit)
# The change to rt['a'] is lost.
self.assertRaises(KeyError, rt.__getitem__, 'a')
# Trying to modify an object also fails, because Transaction.join()
# also raises TransactionFailedError.
self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2)
# Clean up via abort(), and try again.
transaction.get().abort()
rt['a'] = 1
transaction.get().commit()
self.assertEqual(rt['a'], 1)
# Cleaning up via begin() should also work.
rt['a'] = 2
transaction.get().register(poisoned)
self.assertRaises(PoisonedError, transaction.get().commit)
self.assertRaises(TransactionFailedError, transaction.get().commit)
# The change to rt['a'] is lost.
self.assertEqual(rt['a'], 1)
# Trying to modify an object also fails.
self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2)
# Clean up via begin(), and try again.
transaction.begin()
rt['a'] = 2
transaction.get().commit()
self.assertEqual(rt['a'], 2)
cn.close()
def checkFailingSubtransactionCommitSticks(self):
cn = self._db.open()
rt = cn.root()
rt['a'] = 1
transaction.get().commit(True)
self.assertEqual(rt['a'], 1)
rt['b'] = 2
# Subtransactions don't do tpc_vote, so we poison tpc_begin.
poisoned = PoisonedJar()
transaction.get().join(poisoned)
poisoned.break_savepoint = True
self.assertRaises(PoisonedError, transaction.get().commit, True)
# Trying to subtxn-commit again fails too.
self.assertRaises(TransactionFailedError,
transaction.get().commit, True)
self.assertRaises(TransactionFailedError,
transaction.get().commit, True)
# Top-level commit also fails.
self.assertRaises(TransactionFailedError, transaction.get().commit)
# The changes to rt['a'] and rt['b'] are lost.
self.assertRaises(KeyError, rt.__getitem__, 'a')
self.assertRaises(KeyError, rt.__getitem__, 'b')
# Trying to modify an object also fails, because Transaction.join()
# also raises TransactionFailedError.
self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2)
# Clean up via abort(), and try again.
transaction.get().abort()
rt['a'] = 1
transaction.get().commit()
self.assertEqual(rt['a'], 1)
# Cleaning up via begin() should also work.
rt['a'] = 2
poisoned = PoisonedJar()
transaction.get().join(poisoned)
poisoned.break_savepoint = True
self.assertRaises(PoisonedError, transaction.get().commit, True)
self.assertRaises(TransactionFailedError,
transaction.get().commit, True)
# The change to rt['a'] is lost.
self.assertEqual(rt['a'], 1)
# Trying to modify an object also fails.
self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2)
# Clean up via begin(), and try again.
transaction.begin()
rt['a'] = 2
transaction.get().commit(True)
self.assertEqual(rt['a'], 2)
transaction.get().commit()
cn2 = self._db.open()
rt = cn.root()
self.assertEqual(rt['a'], 2)
cn.close()
cn2.close()
class PoisonedError(Exception):
pass
# PoisonedJar arranges to raise exceptions from interesting places.
# For whatever reason, subtransaction commits don't call tpc_vote.
class PoisonedJar:
def __init__(self, break_tpc_begin=False, break_tpc_vote=False,
break_savepoint=False):
self.break_tpc_begin = break_tpc_begin
self.break_tpc_vote = break_tpc_vote
self.break_savepoint = break_savepoint
def sortKey(self):
return str(id(self))
# A way to poison a subtransaction commit.
def tpc_begin(self, *args):
if self.break_tpc_begin:
raise PoisonedError("tpc_begin fails")
# A way to poison a top-level commit.
def tpc_vote(self, *args):
if self.break_tpc_vote:
raise PoisonedError("tpc_vote fails")
def savepoint(self):
if self.break_savepoint:
raise PoisonedError("savepoint fails")
def commit(*args):
pass
def abort(*self):
pass
class PoisonedObject:
def __init__(self, poisonedjar):
self._p_jar = poisonedjar
def test_suite():
return unittest.makeSuite(ZODBTests, 'check')
if __name__ == "__main__":
unittest.main(defaultTest="test_suite")
suite = unittest.makeSuite(NEOZODBTests, 'check')
unittest.TextTestRunner().run(suite)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment