Commit 7dc3b911 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Abort a transaction synchronously, so that the status of connections and...

Abort a transaction synchronously, so that the status of connections and databases are updated after abort. This is a hack, but should work fine with all versions of Zope.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@14517 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 5a25f0f8
......@@ -48,6 +48,40 @@ INVALID_ORDER = 2
MAX_PROCESSING_TIME = 900 # in seconds
VALIDATION_ERROR_DELAY = 30 # in seconds
def abortTransactionSynchronously():
"""Abort a transaction in a synchronous manner.
Manual invocation of transaction abort does not synchronize
connections with databases, thus invalidations are not cleared out.
This may cause an infinite loop, because a read conflict error happens
again and again on the same object.
So, in this method, collect (potential) Connection objects used
for current transaction, and invoke the sync method on every Connection
object, then abort the transaction. In most cases, aborting the
transaction is redundant, because sync should call abort implicitly.
But if no connection is present, it is still required to call abort
explicitly, and it does not cause any harm to call abort more than once.
XXX this is really a hack. This touches the internal code of Transaction.
"""
try:
import transaction
# Zope 2.8 and later.
manager_list = transaction.get()._adapters.keys()
for manager in manager_list:
if hasattr(manager, 'sync'):
manager.sync()
transaction.abort()
except ImportError:
# Zope 2.7 and earlier.
t = get_transaction()
jar_list = t._get_jars(t._objects, 0)
for jar in jar_list:
if hasattr(jar, 'sync'):
jar.sync()
t.abort()
class Queue:
"""
Step 1: use lists
......
......@@ -28,7 +28,8 @@
from DateTime import DateTime
from Products.CMFActivity.ActivityTool import registerActivity
from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY
from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \
abortTransactionSynchronously
from RAMDict import RAMDict
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError
......@@ -275,14 +276,14 @@ class SQLDict(RAMDict):
get_transaction().commit()
break
else:
get_transaction().abort()
abortTransactionSynchronously()
except:
LOG('SQLDict', ERROR,
'an uncatched exception happened during processing %r' % (uid_list_list,),
error=sys.exc_info())
# If an exception occurs, abort the transaction to minimize the impact,
try:
get_transaction().abort()
abortTransactionSynchronously()
except:
# Unfortunately, database adapters may raise an exception against abort.
LOG('SQLDict', WARNING,
......
......@@ -29,7 +29,8 @@
from Products.CMFActivity.ActivityTool import registerActivity
from RAMQueue import RAMQueue
from DateTime import DateTime
from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY
from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \
abortTransactionSynchronously
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError
from ZODB.POSException import ConflictError
......@@ -118,7 +119,7 @@ class SQLQueue(RAMQueue):
except:
# If an exception occurs, abort the transaction to minimize the impact,
try:
get_transaction().abort()
abortTransactionSynchronously()
except:
# Unfortunately, database adapters may raise an exception against abort.
LOG('SQLQueue', WARNING, 'abort failed, thus some objects may be modified accidentally')
......@@ -142,7 +143,7 @@ class SQLQueue(RAMQueue):
else:
try:
# If not, abort transaction and start a new one
get_transaction().abort()
abortTransactionSynchronously()
except:
# Unfortunately, database adapters may raise an exception against abort.
LOG('SQLQueue', WARNING, 'abort failed, thus some objects may be modified accidentally')
......
......@@ -1751,6 +1751,49 @@ class TestCMFActivity(ERP5TypeTestCase):
LOG('Testing... ',0,message)
self.checkIsMessageRegisteredMethod('SQLDict')
def test_79_AbortTransactionSynchronously(self, quiet=0, run=run_all_test):
"""
This test tests if abortTransactionSynchronously really aborts
a transaction synchronously.
"""
if not run: return
if not quiet:
message = '\nTest Aborting Transaction Synchronously'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
# Make a new persistent object, and commit it so that an oid gets
# assigned.
module = self.getOrganisationModule()
organisation = module.newContent(portal_type = 'Organisation')
organisation_id = organisation.getId()
get_transaction().commit()
organisation = module[organisation_id]
# Now fake a read conflict.
from ZODB.POSException import ReadConflictError
tid = organisation._p_serial
oid = organisation._p_oid
conn = organisation._p_jar
if getattr(conn, '_mvcc', 0):
conn._mvcc = 0 # XXX disable MVCC forcibly
try:
conn.db().invalidate({oid: tid})
except TypeError:
conn.db().invalidate(tid, {oid: tid})
conn._cache.invalidate(oid)
# Usual abort should not remove a read conflict error.
organisation = module[organisation_id]
self.assertRaises(ReadConflictError, getattr, organisation, 'uid')
get_transaction().abort()
self.assertRaises(ReadConflictError, getattr, organisation, 'uid')
# Synchronous abort.
from Products.CMFActivity.Activity.Queue import abortTransactionSynchronously
abortTransactionSynchronously()
getattr(organisation, 'uid')
if __name__ == '__main__':
framework()
else:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment