Commit 6ab7cd80 authored by Vincent Pelletier's avatar Vincent Pelletier

Implement forced invalidation processing at transaction begin.

This fixes problems in multi-database setups (ie. ZODB and other
databases).
For example, imagine a transaction stores an object in ZODB and its
identifier in a relational database.
Then, another process connected to the same databases and polling the
relational database find the new entry, and tried to load the object from
ZODB.
If the second node didn't process invalidation sent by initial transaction
when it starts its transaction, second client would not be able to see that
object, which should not happen (inter-database inconsistency).

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2329 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 4323acc0
......@@ -63,11 +63,6 @@ RC - Review output of pylint (CODE)
Also, the code to detect wether a response is expected or not must be
genericised and moved out of handlers.
- Pack (FEATURE)
- Control that client processed all invalidations before starting a
transaction (CONSISTENCY)
If a client starts a transaction before it received an invalidation
message caused by a transaction commited, it will use outdated data.
This is a bug known in Zeo.
- Factorise node initialisation for admin, client and storage (CODE)
The same code to ask/receive node list and partition table exists in too
many places.
......
......@@ -193,3 +193,19 @@ class Storage(BaseStorage.BaseStorage,
# cluster. For now make this a no-op.
pass
def invalidationBarrier(self):
self.app.invalidationBarrier()
# Monkey-patch ZODB.Connection to fetch all invalidations before starting a
# transaction.
from ZODB.Connection import Connection
# XXX: a better detection should be done if this patch enters ZODB
INVALIDATION_MARKER = '__INVALIDATION_BARRIER_PATCH_IS_HERE__'
if not hasattr(Connection, INVALIDATION_MARKER):
orig_newTransaction = Connection.newTransaction
def newTransaction(self, *ignored):
getattr(self._storage, 'invalidationBarrier', lambda: None)()
orig_newTransaction(self, *ignored)
Connection.newTransaction = newTransaction
setattr(Connection, INVALIDATION_MARKER, True)
......@@ -558,7 +558,7 @@ class Application(object):
# Those invalidations are checked at ZODB level, so it decides if
# loaded data can be handed to current transaction or if a separate
# loadBefore call is required.
self._askPrimary(Packets.AskBarrier())
self.invalidationBarrier()
return result
finally:
self._load_lock_release()
......@@ -1191,6 +1191,9 @@ class Application(object):
self.poll_thread.stop()
close = __del__
def invalidationBarrier(self):
self._askPrimary(Packets.AskBarrier())
def sync(self):
self._waitAnyMessage(False)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment