Commit d87f899b authored by Julien Muchembled's avatar Julien Muchembled

client: fix processing of invalidations older than snapshot tid

Also patch ZODB to fix an invalidation bug.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2812 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 76c85aa3
...@@ -39,6 +39,15 @@ class Storage(BaseStorage.BaseStorage, ...@@ -39,6 +39,15 @@ class Storage(BaseStorage.BaseStorage,
ConflictResolution.ConflictResolvingStorage): ConflictResolution.ConflictResolvingStorage):
"""Wrapper class for neoclient.""" """Wrapper class for neoclient."""
# Stores the highest TID visible for current transaction.
# First call sets this snapshot by asking master node most recent
# committed TID.
# As a (positive) side-effect, this forces us to handle all pending
# invalidations, so we get a very recent view of the database (which is
# good when multiple databases are used in the same program with some
# amount of referential integrity).
# Should remain None when not bound to a connection,
# so that it always read the last revision.
_snapshot_tid = None _snapshot_tid = None
implements(*filter(None, ( implements(*filter(None, (
...@@ -70,8 +79,6 @@ class Storage(BaseStorage.BaseStorage, ...@@ -70,8 +79,6 @@ class Storage(BaseStorage.BaseStorage,
self._is_read_only = read_only self._is_read_only = read_only
if _app is None: if _app is None:
_app = Application(master_nodes, name, compress=compress) _app = Application(master_nodes, name, compress=compress)
# always read the last revision when not bound to a connection
self._getSnapshotTID = lambda: None
self.app = _app self.app = _app
# Used to clone self (see new_instance & IMVCCStorage definition). # Used to clone self (see new_instance & IMVCCStorage definition).
self._init_args = (master_nodes, name) self._init_args = (master_nodes, name)
...@@ -87,34 +94,13 @@ class Storage(BaseStorage.BaseStorage, ...@@ -87,34 +94,13 @@ class Storage(BaseStorage.BaseStorage,
def _cache(self): def _cache(self):
return self.app._cache return self.app._cache
def _getSnapshotTID(self):
"""
Get the highest TID visible for current transaction.
First call sets this snapshot by asking master node most recent
committed TID.
As a (positive) side-effect, this forces us to handle all pending
invalidations, so we get a very recent view of the database (which is
good when multiple databases are used in the same program with some
amount of referential integrity).
"""
tid = self._snapshot_tid
if tid is None:
tid = self.lastTransaction()
if tid is ZERO_TID:
raise NEOStorageDoesNotExistError('No transaction in storage')
# Increment by one, as we will use this as an excluded upper
# bound (loadBefore).
tid = add64(tid, 1)
self._snapshot_tid = tid
return tid
def load(self, oid, version=''): def load(self, oid, version=''):
# XXX: interface definition states that version parameter is # XXX: interface definition states that version parameter is
# mandatory, while some ZODB tests do not provide it. For now, make # mandatory, while some ZODB tests do not provide it. For now, make
# it optional. # it optional.
assert version == '', 'Versions are not supported' assert version == '', 'Versions are not supported'
try: try:
return self.app.load(oid, None, self._getSnapshotTID())[:2] return self.app.load(oid, None, self._snapshot_tid)[:2]
except NEOStorageNotFoundError: except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid) raise POSException.POSKeyError(oid)
...@@ -181,7 +167,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -181,7 +167,7 @@ class Storage(BaseStorage.BaseStorage,
# undo # undo
@check_read_only @check_read_only
def undo(self, transaction_id, txn): def undo(self, transaction_id, txn):
return self.app.undo(self._getSnapshotTID(), undone_tid=transaction_id, return self.app.undo(self._snapshot_tid, undone_tid=transaction_id,
txn=txn, tryToResolveConflict=self.tryToResolveConflict) txn=txn, tryToResolveConflict=self.tryToResolveConflict)
...@@ -205,7 +191,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -205,7 +191,7 @@ class Storage(BaseStorage.BaseStorage,
def loadEx(self, oid, version): def loadEx(self, oid, version):
try: try:
data, serial, _ = self.app.load(oid, None, self._getSnapshotTID()) data, serial, _ = self.app.load(oid, None, self._snapshot_tid)
except NEOStorageNotFoundError: except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid) raise POSException.POSKeyError(oid)
return data, serial, '' return data, serial, ''
...@@ -223,7 +209,12 @@ class Storage(BaseStorage.BaseStorage, ...@@ -223,7 +209,12 @@ class Storage(BaseStorage.BaseStorage,
raise KeyError raise KeyError
def sync(self, force=True): def sync(self, force=True):
self._snapshot_tid = None # XXX: Unfortunately, we're quite slow (lastTransaction) and
# we're also called at the end of each transaction by ZODB
# (see Connection.afterCompletion), probably for no useful reason.
# Increment by one, as we will use this as an excluded upper
# bound (loadBefore).
self._snapshot_tid = add64(self.lastTransaction(), 1)
def copyTransactionsFrom(self, source, verbose=False): def copyTransactionsFrom(self, source, verbose=False):
""" Zope compliant API """ """ Zope compliant API """
......
...@@ -50,6 +50,45 @@ if needs_patch: ...@@ -50,6 +50,45 @@ if needs_patch:
Connection.tpc_finish = tpc_finish Connection.tpc_finish = tpc_finish
try:
if Connection._nexedi_fix != 1:
raise Exception("A different ZODB fix is already applied")
except AttributeError:
Connection._nexedi_fix = 1
# Whenever an connection is opened (and there's usually an existing one
# in DB pool that can be reused) whereas the transaction is already
# started, we must make sure that proper storage setup is done by
# calling Connection.newTransaction.
# For example, there's no open transaction when a ZPublisher/Publish
# transaction begins.
def open(self, *args, **kw):
def _flush_invalidations():
acquire = self._db._a
try:
self._db._r()
except thread.error:
acquire = lambda: None
try:
del self._flush_invalidations
self.newTransaction()
finally:
acquire()
self._flush_invalidations = _flush_invalidations
self._flush_invalidations = _flush_invalidations
try:
Connection_open(self, *args, **kw)
finally:
del self._flush_invalidations
try:
Connection_open = Connection._setDB
Connection._setDB = open
except AttributeError: # recent ZODB
Connection_open = Connection.open
Connection.open = open
class _DB(object): class _DB(object):
""" """
Wrapper to DB instance that properly initialize Connection objects Wrapper to DB instance that properly initialize Connection objects
......
...@@ -30,6 +30,7 @@ import tempfile ...@@ -30,6 +30,7 @@ import tempfile
import traceback import traceback
import threading import threading
import psutil import psutil
import transaction
import neo.scripts import neo.scripts
from neo.neoctl.neoctl import NeoCTL, NotReadyException from neo.neoctl.neoctl import NeoCTL, NotReadyException
...@@ -654,6 +655,13 @@ class NEOCluster(object): ...@@ -654,6 +655,13 @@ class NEOCluster(object):
class NEOFunctionalTest(NeoTestBase): class NEOFunctionalTest(NeoTestBase):
def tearDown(self):
# Kill all unfinished transactions for next test.
# Note we don't even abort them because it may require a valid
# connection to a master node (see Storage.sync()).
transaction.manager.__init__()
NeoTestBase.tearDown(self)
def setupLog(self): def setupLog(self):
log_file = os.path.join(self.getTempDirectory(), 'test.log') log_file = os.path.join(self.getTempDirectory(), 'test.log')
setupLog('TEST', log_file, True) setupLog('TEST', log_file, True)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment