Commit aab687e8 authored by Barry Warsaw's avatar Barry Warsaw

checkTransactionalUndoAfterPackWithObjectUnlinkFromRoot(): New method

which tests a particular combination of packing and transactional undo
at the ZODB layer.
parent 56b13d6e
...@@ -9,12 +9,17 @@ from ZODB import POSException ...@@ -9,12 +9,17 @@ from ZODB import POSException
from ZODB.Transaction import Transaction from ZODB.Transaction import Transaction
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from ZODB.utils import u64 from ZODB.utils import u64
from ZODB import DB
from Persistence import Persistent
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
ZERO = '\0'*8 ZERO = '\0'*8
class C(Persistent):
pass
class TransactionalUndoStorage: class TransactionalUndoStorage:
def _transaction_begin(self): def _transaction_begin(self):
...@@ -461,3 +466,58 @@ class TransactionalUndoStorage: ...@@ -461,3 +466,58 @@ class TransactionalUndoStorage:
data, revid = self._storage.load(oid, '') data, revid = self._storage.load(oid, '')
# The object must now be at the second state # The object must now be at the second state
eq(zodb_unpickle(data), MinPO(52)) eq(zodb_unpickle(data), MinPO(52))
def checkTransactionalUndoAfterPackWithObjectUnlinkFromRoot(self):
eq = self.assertEqual
db = DB(self._storage)
conn = db.open()
root = conn.root()
o1 = C()
o2 = C()
root['obj'] = o1
o1.obj = o2
txn = get_transaction()
txn.note('o1 -> o2')
txn.commit()
now = packtime = time.time()
while packtime <= now:
packtime = time.time()
o3 = C()
o2.obj = o3
txn = get_transaction()
txn.note('o1 -> o2 -> o3')
txn.commit()
o1.obj = o3
txn = get_transaction()
txn.note('o1 -> o3')
txn.commit()
log = self._storage.undoLog()
eq(len(log), 4)
for entry in zip(log, ('o1 -> o3', 'o1 -> o2 -> o3',
'o1 -> o2', 'initial database creation')):
eq(entry[0]['description'], entry[1])
self._storage.pack(packtime, referencesf)
log = self._storage.undoLog()
for entry in zip(log, ('o1 -> o3', 'o1 -> o2 -> o3')):
eq(entry[0]['description'], entry[1])
tid = log[0]['id']
db.undo(tid)
txn = get_transaction()
txn.note('undo')
txn.commit()
# undo does a txn-undo, but doesn't invalidate
conn.sync()
log = self._storage.undoLog()
for entry in zip(log, ('undo', 'o1 -> o3', 'o1 -> o2 -> o3')):
eq(entry[0]['description'], entry[1])
eq(o1.obj, o2)
eq(o1.obj.obj, o3)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment