Commit efea73eb authored by Grégory Wisniewski's avatar Grégory Wisniewski

Add a test to check if an abort cancel the delayed stores.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2152 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 989d1e61
...@@ -58,6 +58,8 @@ class PCounterWithResolution(PCounter): ...@@ -58,6 +58,8 @@ class PCounterWithResolution(PCounter):
new['_value'] = saved['_value'] + new['_value'] new['_value'] = saved['_value'] + new['_value']
return new return new
class PObject(Persistent):
pass
class ClientTests(NEOFunctionalTest): class ClientTests(NEOFunctionalTest):
...@@ -230,8 +232,6 @@ class ClientTests(NEOFunctionalTest): ...@@ -230,8 +232,6 @@ class ClientTests(NEOFunctionalTest):
def testLockTimeout(self): def testLockTimeout(self):
""" Hold a lock on an object to block a second transaction """ """ Hold a lock on an object to block a second transaction """
def test(): def test():
class PObject(Persistent):
pass
self.neo = NEOCluster(['test_neo1'], replicas=0, self.neo = NEOCluster(['test_neo1'], replicas=0,
temp_dir=self.getTempDirectory()) temp_dir=self.getTempDirectory())
neoctl = self.neo.getNEOCTL() neoctl = self.neo.getNEOCTL()
...@@ -254,6 +254,52 @@ class ClientTests(NEOFunctionalTest): ...@@ -254,6 +254,52 @@ class ClientTests(NEOFunctionalTest):
self.assertRaises(ConflictError, st2.tpc_vote, t2) self.assertRaises(ConflictError, st2.tpc_vote, t2)
self.runWithTimeout(test, 40) self.runWithTimeout(test, 40)
def testDelayedLocksCancelled(self):
"""
Hold a lock on an object, try to get another lock on the same
object to delay it. Then cancel the second transaction and check
that the lock is not hold when the first transaction ends
"""
def test():
self.neo = NEOCluster(['test_neo1'], replicas=0,
temp_dir=self.getTempDirectory())
neoctl = self.neo.getNEOCTL()
self.neo.start()
db1, conn1 = self.neo.getZODBConnection()
db2, conn2 = self.neo.getZODBConnection()
st1, st2 = conn1._storage, conn2._storage
t1, t2 = transaction.Transaction(), transaction.Transaction()
t1.user = t2.user = 'user'
t1.description = t2.description = 'desc'
oid = st1.new_oid()
rev = '\0' * 8
data = zodb_pickle(PObject())
st1.tpc_begin(t1)
st2.tpc_begin(t2)
# t1 own the lock
st1.store(oid, rev, data, '', t1)
# t2 store is delayed
st2.store(oid, rev, data, '', t2)
# cancel t2, should cancel the store too
st2.tpc_abort(t2)
# finish t1, should release the lock
st1.tpc_vote(t1)
st1.tpc_finish(t1)
db3, conn3 = self.neo.getZODBConnection()
st3 = conn3._storage
t3 = transaction.Transaction()
t3.user = 'user'
t3.description = 'desc'
st3.tpc_begin(t3)
# retreive the last revision
data, serial = st3.load(oid)
# try to store again, should not be delayed
st3.store(oid, serial, data, '', t3)
# the vote should not timeout
st3.tpc_vote(t3)
st3.tpc_finish(t3)
self.runWithTimeout(test, 10)
def test_suite(): def test_suite():
return unittest.makeSuite(ClientTests) return unittest.makeSuite(ClientTests)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment