Commit 65d90cfd authored by Grégory Wisniewski's avatar Grégory Wisniewski

Update conflict resolution test.

Ensure that ConflictError is raised if _p_resolveConflict is not implemented.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1518 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 80948fb7
......@@ -18,6 +18,7 @@
import unittest
import transaction
import ZODB
from ZODB.POSException import ConflictError
from Persistence import Persistent
from neo.tests.functional import NEOCluster, NEOFunctionalTest
......@@ -35,6 +36,9 @@ class PCounter(Persistent):
def inc(self):
self._value += 1
class PCounterWithResolution(PCounter):
def _p_resolveConflict(self, old, saved, new):
new['_value'] = saved['_value'] + new['_value']
return new
......@@ -68,36 +72,57 @@ class ClientTests(NEOFunctionalTest):
root = self.db.open(transaction_manager=txn).root()
return (txn, root)
def testConflictResolution(self):
def testConflictResolutionTriggered(self):
""" Check that ConflictError is raised on write conflict """
# create the initial objects
self.__setup()
# create the initial object
t, r = self.makeTransaction()
r['counter'] = PCounter()
r['without_resolution'] = PCounter()
t.commit()
# two concurrent transactions modify it
# first with no conflict resolution
t1, r1 = self.makeTransaction()
t2, r2 = self.makeTransaction()
o1, o2 = r1['counter'], r2['counter']
o1 = r1['without_resolution']
o2 = r2['without_resolution']
self.assertEqual(o1.value(), 0)
self.assertEqual(o2.value(), 0)
o1.inc()
o2.inc()
o2.inc()
# the first commit
t1.commit()
self.assertEqual(o1.value(), 1)
self.assertEqual(o2.value(), 2)
self.assertRaises(ConflictError, t2.commit)
# and the second (conflict triggered, resolution happen)
def testConflictResolutionTriggered(self):
""" Check that conflict resolution works """
# create the initial objects
self.__setup()
t, r = self.makeTransaction()
r['with_resolution'] = PCounterWithResolution()
t.commit()
# then with resolution
t1, r1 = self.makeTransaction()
t2, r2 = self.makeTransaction()
o1 = r1['with_resolution']
o2 = r2['with_resolution']
self.assertEqual(o1.value(), 0)
self.assertEqual(o2.value(), 0)
o1.inc()
o2.inc()
o2.inc()
t1.commit()
self.assertEqual(o1.value(), 1)
self.assertEqual(o2.value(), 2)
t2.commit()
t1.begin()
t2.begin()
# object graph view must be consistent
self.assertEqual(o2.value(), 3)
self.assertEqual(o1.value(), 3)
def test_suite():
return unittest.makeSuite(ClientTests)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment