Commit b55075f7 authored by Julien Muchembled's avatar Julien Muchembled

client: do not clear the entire connection cache in tpc_finish !

This fixes a regression introduced by implementation of IMVCCStorage in r2532.
On recent ZODB, this fixes a severe performance issue.
With ZODB 3.4, objects were never invalidated, which was even worse.

This fixes includes compatibility code for ZODB 3.4 so that each connection
has its own NEOStorage instance.
The DB's storage is changed to always consider the last revision of
objects.

IMVCCStorage seems too complicated to me. Connection should be a better
place to implement it (by extending/fixing the 'before' attribute).
So we have decided that NEOStorage stops implementing IMVCCStorage.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2802 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 016a1c8d
......@@ -53,7 +53,6 @@ class Storage(BaseStorage.BaseStorage,
ZODB.interfaces.IStorageUndoable,
getattr(ZODB.interfaces, 'IExternalGC', None), # XXX ZODB < 3.9
getattr(ZODB.interfaces, 'ReadVerifyingStorage', None), # XXX ZODB 3.9
getattr(ZODB.interfaces, 'IMVCCStorage', None), # XXX ZODB < 3.9
)))
def __init__(self, master_nodes, name, read_only=False,
......@@ -71,6 +70,8 @@ class Storage(BaseStorage.BaseStorage,
self._is_read_only = read_only
if _app is None:
_app = Application(master_nodes, name, compress=compress)
# always read the last revision when not bound to a connection
self._getSnapshotTID = lambda: None
self.app = _app
# Used to clone self (see new_instance & IMVCCStorage definition).
self._init_args = (master_nodes, name)
......@@ -136,14 +137,11 @@ class Storage(BaseStorage.BaseStorage,
@check_read_only
def tpc_abort(self, transaction):
self.sync()
return self.app.tpc_abort(transaction=transaction)
def tpc_finish(self, transaction, f=None):
result = self.app.tpc_finish(transaction=transaction,
return self.app.tpc_finish(transaction=transaction,
tryToResolveConflict=self.tryToResolveConflict, f=f)
self.sync()
return result
@check_read_only
def store(self, oid, serial, data, version, transaction):
......@@ -281,12 +279,3 @@ class Storage(BaseStorage.BaseStorage,
def new_instance(self):
return Storage(*self._init_args, **self._init_kw)
def poll_invalidations(self):
"""
Nothing to do, NEO doesn't need any polling.
"""
pass
release = sync
......@@ -50,3 +50,37 @@ if needs_patch:
Connection.tpc_finish = tpc_finish
class _DB(object):
"""
Wrapper to DB instance that properly initialize Connection objects
with NEO storages.
It forces the connection to always create a new instance of the
storage, for compatibility with ZODB 3.4, and because we don't
implement IMVCCStorage completely.
"""
def __new__(cls, db, connection):
if db._storage.__class__.__module__ != 'neo.client.Storage':
return db
self = object.__new__(cls)
self._db = db
self._connection = connection
return self
def __getattr__(self, attr):
result = getattr(self._db, attr)
if attr in ('storage', '_storage'):
result = result.new_instance()
self._connection._db = self._db
setattr(self, attr, result)
return result
try:
Connection_setDB = Connection._setDB
except AttributeError: # recent ZODB
Connection_init = Connection.__init__
Connection.__init__ = lambda self, db, *args, **kw: \
Connection_init(self, _DB(db, self), *args, **kw)
else: # old ZODB (e.g. ZODB 3.4)
Connection._setDB = lambda self, odb, *args, **kw: \
Connection_setDB(self, _DB(odb, self), *args, **kw)
......@@ -881,7 +881,7 @@ class Application(object):
undo_serial = None
self._store(txn_context, oid, current_serial, data, undo_serial)
return None, () # required for ZODB < 3.10
return None, txn_oid_list
def _insertMetadata(self, txn_info, extension):
for k, v in loads(extension).items():
......
......@@ -360,7 +360,8 @@ class DatabaseManager(object):
p64 = util.p64
oid = u64(oid)
tid = u64(tid)
ltid = u64(ltid)
if ltid:
ltid = u64(ltid)
undone_tid = u64(undone_tid)
_getDataTID = self._getDataTID
if transaction_object is not None:
......
......@@ -118,33 +118,6 @@ class ClientTests(NEOFunctionalTest):
self.assertEqual(o2.value(), 2)
self.assertRaises(ConflictError, t2.commit)
def testConflictResolutionTriggered2(self):
""" Check that conflict resolution works """
# create the initial objects
self.__setup()
t, c = self.makeTransaction()
c.root()['with_resolution'] = PCounterWithResolution()
t.commit()
# then with resolution
t1, c1 = self.makeTransaction()
t2, c2 = self.makeTransaction()
o1 = c1.root()['with_resolution']
o2 = c2.root()['with_resolution']
self.assertEqual(o1.value(), 0)
self.assertEqual(o2.value(), 0)
o1.inc()
o2.inc()
o2.inc()
t1.commit()
self.assertEqual(o1.value(), 1)
self.assertEqual(o2.value(), 2)
t2.commit()
t1.begin()
t2.begin()
self.assertEqual(o2.value(), 3)
self.assertEqual(o1.value(), 3)
def testIsolationAtZopeLevel(self):
""" Check transaction isolation within zope connection """
self.__setup()
......
......@@ -17,23 +17,58 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from persistent import Persistent
from neo.lib.protocol import NodeStates
from neo.lib.protocol import NodeStates, ZERO_TID
from neo.tests.threaded import NEOCluster, NEOThreadedTest
from neo.client.pool import CELL_CONNECTED, CELL_GOOD
class PObject(Persistent):
pass
class PCounter(Persistent):
value = 0
class PCounterWithResolution(PCounter):
def _p_resolveConflict(self, old, saved, new):
new['value'] += saved['value'] - old.get('value', 0)
return new
class Test(NEOThreadedTest):
def test_commit(self):
def testConflictResolutionTriggered2(self):
""" Check that conflict resolution works """
cluster = NEOCluster()
cluster.start()
try:
# create the initial object
t, c = cluster.getTransaction()
c.root()['foo'] = PObject()
c.root()['with_resolution'] = ob = PCounterWithResolution()
t.commit()
self.assertEqual(ob._p_changed, 0)
tid1 = ob._p_serial
self.assertNotEqual(tid1, ZERO_TID)
del ob, t, c
# then check resolution
t1, c1 = cluster.getTransaction()
t2, c2 = cluster.getTransaction()
o1 = c1.root()['with_resolution']
o2 = c2.root()['with_resolution']
self.assertEqual(o1.value, 0)
self.assertEqual(o2.value, 0)
o1.value += 1
o2.value += 2
t1.commit()
self.assertEqual(o1._p_changed, 0)
tid2 = o1._p_serial
self.assertTrue(tid1 < tid2)
self.assertEqual(o1.value, 1)
self.assertEqual(o2.value, 2)
t2.commit()
self.assertEqual(o2._p_changed, None)
t1.begin()
t2.begin()
self.assertEqual(o2.value, 3)
self.assertEqual(o1.value, 3)
tid3 = o1._p_serial
self.assertTrue(tid2 < tid3)
self.assertEqual(tid3, o2._p_serial)
finally:
cluster.stop()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment