Remove Zope 2.7 compatibility fossils

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@37189 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 73f645cd
...@@ -33,10 +33,7 @@ from ZODB.POSException import ConflictError ...@@ -33,10 +33,7 @@ from ZODB.POSException import ConflictError
import sha import sha
from cStringIO import StringIO from cStringIO import StringIO
try: import transaction
from transaction import get as get_transaction
except ImportError:
pass
# Error values for message validation # Error values for message validation
EXCEPTION = -1 EXCEPTION = -1
...@@ -48,36 +45,6 @@ INVALID_ORDER = 2 ...@@ -48,36 +45,6 @@ INVALID_ORDER = 2
MAX_PROCESSING_TIME = 900 # in seconds MAX_PROCESSING_TIME = 900 # in seconds
VALIDATION_ERROR_DELAY = 30 # in seconds VALIDATION_ERROR_DELAY = 30 # in seconds
def abortTransactionSynchronously():
"""Abort a transaction in a synchronous manner.
Manual invocation of transaction abort does not synchronize
connections with databases, thus invalidations are not cleared out.
This may cause an infinite loop, because a read conflict error happens
again and again on the same object.
So, in this method, collect (potential) Connection objects used
for current transaction, and invoke the sync method on every Connection
object, then abort the transaction. In most cases, aborting the
transaction is redundant, because sync should call abort implicitly.
But if no connection is present, it is still required to call abort
explicitly, and it does not cause any harm to call abort more than once.
XXX this is really a hack. This touches the internal code of Transaction.
"""
try:
import transaction
# Zope 2.8 and later. sync is automatic.
transaction.abort()
except ImportError:
# Zope 2.7 and earlier.
t = get_transaction()
jar_list = t._get_jars(t._objects, 0)
for jar in jar_list:
if getattr(jar, 'sync', None) is not None:
jar.sync()
t.abort()
class Queue: class Queue:
""" """
Step 1: use lists Step 1: use lists
...@@ -228,7 +195,7 @@ class Queue: ...@@ -228,7 +195,7 @@ class Queue:
cached_result = validation_text_dict.get(message.order_validation_text) cached_result = validation_text_dict.get(message.order_validation_text)
if cached_result is None: if cached_result is None:
message_list = message.getDependentMessageList(self, activity_tool) message_list = message.getDependentMessageList(self, activity_tool)
get_transaction().commit() # Release locks. transaction.commit() # Release locks.
if message_list: if message_list:
# The result is not empty, so this message is not executable. # The result is not empty, so this message is not executable.
validation_text_dict[message.order_validation_text] = 0 validation_text_dict[message.order_validation_text] = 0
......
...@@ -32,10 +32,7 @@ from Queue import Queue, VALID ...@@ -32,10 +32,7 @@ from Queue import Queue, VALID
from zLOG import LOG from zLOG import LOG
try: import transaction
from transaction import get as get_transaction
except ImportError:
pass
class RAMDict(Queue): class RAMDict(Queue):
""" """
...@@ -88,11 +85,11 @@ class RAMDict(Queue): ...@@ -88,11 +85,11 @@ class RAMDict(Queue):
activity_tool.invoke(m) activity_tool.invoke(m)
if m.getExecutionState() == MESSAGE_EXECUTED: if m.getExecutionState() == MESSAGE_EXECUTED:
del self.getDict(path)[key] del self.getDict(path)[key]
get_transaction().commit() transaction.commit()
return 0 return 0
else: else:
# Start a new transaction and keep on to next message # Start a new transaction and keep on to next message
get_transaction().commit() transaction.commit()
return 1 return 1
def countMessage(self, activity_tool,path=None,method_id=None,**kw): def countMessage(self, activity_tool,path=None,method_id=None,**kw):
......
...@@ -29,10 +29,7 @@ ...@@ -29,10 +29,7 @@
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_EXECUTED from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_EXECUTED
from Queue import Queue, VALID from Queue import Queue, VALID
try: import transaction
from transaction import get as get_transaction
except ImportError:
pass
class RAMQueue(Queue): class RAMQueue(Queue):
""" """
...@@ -67,16 +64,16 @@ class RAMQueue(Queue): ...@@ -67,16 +64,16 @@ class RAMQueue(Queue):
for m in self.getQueue(path): for m in self.getQueue(path):
if m.validate(self, activity_tool) is not VALID: if m.validate(self, activity_tool) is not VALID:
self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling) self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling)
get_transaction().commit() # Start a new transaction transaction.commit() # Start a new transaction
return 0 # Keep on ticking return 0 # Keep on ticking
activity_tool.invoke(m) activity_tool.invoke(m)
if m.getExecutionState() == MESSAGE_EXECUTED: if m.getExecutionState() == MESSAGE_EXECUTED:
self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling) self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling)
get_transaction().commit() # Start a new transaction transaction.commit() # Start a new transaction
return 0 # Keep on ticking return 0 # Keep on ticking
else: else:
# Start a new transaction and keep on to next message # Start a new transaction and keep on to next message
get_transaction().commit() transaction.commit()
return 1 # Go to sleep return 1 # Go to sleep
def countMessage(self, activity_tool,path=None,method_id=None,**kw): def countMessage(self, activity_tool,path=None,method_id=None,**kw):
......
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
############################################################################## ##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from Queue import VALID, INVALID_PATH, abortTransactionSynchronously from Queue import VALID, INVALID_PATH
from RAMDict import RAMDict from RAMDict import RAMDict
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError from Products.CMFActivity.Errors import ActivityFlushError
...@@ -40,10 +40,7 @@ from Products.CMFActivity.ActivityRuntimeEnvironment import ( ...@@ -40,10 +40,7 @@ from Products.CMFActivity.ActivityRuntimeEnvironment import (
ActivityRuntimeEnvironment, getTransactionalVariable) ActivityRuntimeEnvironment, getTransactionalVariable)
from zExceptions import ExceptionFormatter from zExceptions import ExceptionFormatter
try: import transaction
from transaction import get as get_transaction
except ImportError:
pass
from zLOG import LOG, TRACE, WARNING, ERROR, INFO, PANIC from zLOG import LOG, TRACE, WARNING, ERROR, INFO, PANIC
...@@ -334,7 +331,7 @@ class SQLDict(RAMDict, SQLBase): ...@@ -334,7 +331,7 @@ class SQLDict(RAMDict, SQLBase):
# version - to ZODB connector. # version - to ZODB connector.
# So all connectors must be committed now that we have selected # So all connectors must be committed now that we have selected
# everything needed from MySQL to get a fresh view of ZODB objects. # everything needed from MySQL to get a fresh view of ZODB objects.
get_transaction().commit() transaction.commit()
tv = getTransactionalVariable(None) tv = getTransactionalVariable(None)
tv['activity_runtime_environment'] = activity_runtime_environment tv['activity_runtime_environment'] = activity_runtime_environment
# Try to invoke # Try to invoke
...@@ -343,7 +340,7 @@ class SQLDict(RAMDict, SQLBase): ...@@ -343,7 +340,7 @@ class SQLDict(RAMDict, SQLBase):
except: except:
LOG('SQLDict', WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info()) LOG('SQLDict', WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
try: try:
abortTransactionSynchronously() transaction.abort()
except: except:
# Unfortunately, database adapters may raise an exception against abort. # Unfortunately, database adapters may raise an exception against abort.
LOG('SQLDict', PANIC, LOG('SQLDict', PANIC,
...@@ -360,18 +357,18 @@ class SQLDict(RAMDict, SQLBase): ...@@ -360,18 +357,18 @@ class SQLDict(RAMDict, SQLBase):
LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list)) LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
# Abort if something failed. # Abort if something failed.
if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]: if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
endTransaction = abortTransactionSynchronously endTransaction = transaction.abort
else: else:
endTransaction = get_transaction().commit endTransaction = transaction.commit
try: try:
endTransaction() endTransaction()
except: except:
LOG('SQLDict', WARNING, 'Failed to end transaction for messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info()) LOG('SQLDict', WARNING, 'Failed to end transaction for messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
if endTransaction == abortTransactionSynchronously: if endTransaction == transaction.abort:
LOG('SQLDict', PANIC, 'Failed to abort executed messages. Some objects may be modified accidentally.') LOG('SQLDict', PANIC, 'Failed to abort executed messages. Some objects may be modified accidentally.')
else: else:
try: try:
abortTransactionSynchronously() transaction.abort()
except: except:
LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.') LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
raise raise
...@@ -385,7 +382,7 @@ class SQLDict(RAMDict, SQLBase): ...@@ -385,7 +382,7 @@ class SQLDict(RAMDict, SQLBase):
else: else:
LOG('SQLDict', TRACE, 'Freed messages %r' % (message_list, )) LOG('SQLDict', TRACE, 'Freed messages %r' % (message_list, ))
self.finalizeMessageExecution(activity_tool, message_list, uid_to_duplicate_uid_list_dict) self.finalizeMessageExecution(activity_tool, message_list, uid_to_duplicate_uid_list_dict)
get_transaction().commit() transaction.commit()
return not message_list return not message_list
def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None): def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
...@@ -505,7 +502,7 @@ class SQLDict(RAMDict, SQLBase): ...@@ -505,7 +502,7 @@ class SQLDict(RAMDict, SQLBase):
offset=offset, count=READ_MESSAGE_LIMIT) offset=offset, count=READ_MESSAGE_LIMIT)
if not result: if not result:
return return
get_transaction().commit() transaction.commit()
validation_text_dict = {'none': 1} validation_text_dict = {'none': 1}
message_dict = {} message_dict = {}
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from RAMQueue import RAMQueue from RAMQueue import RAMQueue
from Queue import VALID, INVALID_PATH, abortTransactionSynchronously from Queue import VALID, INVALID_PATH
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError from Products.CMFActivity.Errors import ActivityFlushError
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
...@@ -40,10 +40,7 @@ from Products.CMFActivity.ActivityRuntimeEnvironment import ( ...@@ -40,10 +40,7 @@ from Products.CMFActivity.ActivityRuntimeEnvironment import (
ActivityRuntimeEnvironment, getTransactionalVariable) ActivityRuntimeEnvironment, getTransactionalVariable)
from zExceptions import ExceptionFormatter from zExceptions import ExceptionFormatter
try: import transaction
from transaction import get as get_transaction
except ImportError:
pass
from zLOG import LOG, WARNING, ERROR, INFO, PANIC, TRACE from zLOG import LOG, WARNING, ERROR, INFO, PANIC, TRACE
...@@ -214,7 +211,7 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -214,7 +211,7 @@ class SQLQueue(RAMQueue, SQLBase):
# version - to ZODB connector. # version - to ZODB connector.
# So all connectors must be committed now that we have selected # So all connectors must be committed now that we have selected
# everything needed from MySQL to get a fresh view of ZODB objects. # everything needed from MySQL to get a fresh view of ZODB objects.
get_transaction().commit() transaction.commit()
tv = getTransactionalVariable(None) tv = getTransactionalVariable(None)
for m in message_list: for m in message_list:
tv['activity_runtime_environment'] = ActivityRuntimeEnvironment(m) tv['activity_runtime_environment'] = ActivityRuntimeEnvironment(m)
...@@ -227,15 +224,15 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -227,15 +224,15 @@ class SQLQueue(RAMQueue, SQLBase):
# successfull messages to be rolled back. This commit might fail, # successfull messages to be rolled back. This commit might fail,
# so it is protected the same way as activity execution by the # so it is protected the same way as activity execution by the
# same "try" block. # same "try" block.
get_transaction().commit() transaction.commit()
else: else:
# This message failed, revert. # This message failed, abort.
abortTransactionSynchronously() transaction.abort()
except: except:
value = m.uid, m.object_path, m.method_id value = m.uid, m.object_path, m.method_id
LOG('SQLQueue', WARNING, 'Exception raised when invoking message (uid, path, method_id) %r' % (value, ), error=sys.exc_info()) LOG('SQLQueue', WARNING, 'Exception raised when invoking message (uid, path, method_id) %r' % (value, ), error=sys.exc_info())
try: try:
abortTransactionSynchronously() transaction.abort()
except: except:
# Unfortunately, database adapters may raise an exception against abort. # Unfortunately, database adapters may raise an exception against abort.
LOG('SQLQueue', PANIC, 'abort failed, thus some objects may be modified accidentally') LOG('SQLQueue', PANIC, 'abort failed, thus some objects may be modified accidentally')
...@@ -266,7 +263,7 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -266,7 +263,7 @@ class SQLQueue(RAMQueue, SQLBase):
LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, )) LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
self.finalizeMessageExecution(activity_tool, self.finalizeMessageExecution(activity_tool,
message_list[:processed_count]) message_list[:processed_count])
get_transaction().commit() transaction.commit()
return not message_list return not message_list
...@@ -396,7 +393,7 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -396,7 +393,7 @@ class SQLQueue(RAMQueue, SQLBase):
offset=offset, count=READ_MESSAGE_LIMIT) offset=offset, count=READ_MESSAGE_LIMIT)
if not result: if not result:
return return
get_transaction().commit() transaction.commit()
validation_text_dict = {'none': 1} validation_text_dict = {'none': 1}
message_dict = {} message_dict = {}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment