Commit 6efbc340 authored by Vincent Pelletier's avatar Vincent Pelletier

Use symbolic constants instead of values 0 and 1 for Message.is_executed .

Mark messages as not executable when either their path or the method to call on it cannot be retrieved.
Make messages marked as not excutable immediately fail with VALIDATE_ERROR_STATE state.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@20311 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent ed272136
......@@ -26,7 +26,7 @@
#
##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_EXECUTED
from Products.CMFActivity.Errors import ActivityFlushError
from Queue import Queue, VALID
......@@ -83,7 +83,7 @@ class RAMDict(Queue):
for key, m in self.getDict(path).items():
if m.validate(self, activity_tool) is VALID:
activity_tool.invoke(m)
if m.is_executed:
if m.is_executed == MESSAGE_EXECUTED:
del self.getDict(path)[key]
get_transaction().commit()
return 0
......@@ -133,7 +133,7 @@ class RAMDict(Queue):
# First Validate
if m.validate(self, activity_tool) is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if not m.is_executed: # Make sure message could be invoked
if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path))
......@@ -158,7 +158,7 @@ class RAMDict(Queue):
LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path))
if invoke:
activity_tool.invoke(m)
if m.is_executed:
if m.is_executed == MESSAGE_EXECUTED:
method_dict[m.method_id] = 1
self.deleteMessage(activity_tool, m)
else:
......
......@@ -26,7 +26,7 @@
#
##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_EXECUTED
from Queue import Queue, VALID
try:
......@@ -70,7 +70,7 @@ class RAMQueue(Queue):
get_transaction().commit() # Start a new transaction
return 0 # Keep on ticking
activity_tool.invoke(m)
if m.is_executed:
if m.is_executed == MESSAGE_EXECUTED:
self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling)
get_transaction().commit() # Start a new transaction
return 0 # Keep on ticking
......@@ -117,7 +117,7 @@ class RAMQueue(Queue):
else:
if invoke:
activity_tool.invoke(m)
if m.is_executed:
if m.is_executed == MESSAGE_EXECUTED:
activity_tool.unregisterMessage(self, m)
else:
activity_tool.unregisterMessage(self, m)
......@@ -130,7 +130,7 @@ class RAMQueue(Queue):
else:
if invoke:
activity_tool.invoke(m)
if m.is_executed:
if m.is_executed == MESSAGE_EXECUTED:
self.deleteMessage(activity_tool, m) # Only delete if no error happens
else:
self.deleteMessage(activity_tool, m)
......
......@@ -26,7 +26,7 @@
#
##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \
abortTransactionSynchronously
from RAMDict import RAMDict
......@@ -344,9 +344,10 @@ class SQLDict(RAMDict, SQLBase):
make_available_uid_list = []
message_with_active_process_list = []
notify_user_list = []
something_failed = (len([x for x in message_uid_priority_list if not x[1].is_executed]) != 0)
non_executable_message_list = []
something_failed = (len([x for x in message_uid_priority_list if x[1].is_executed == MESSAGE_NOT_EXECUTED]) != 0)
for uid, m, priority in message_uid_priority_list:
if m.is_executed:
if m.is_executed == MESSAGE_EXECUTED:
if something_failed:
make_available_uid_list.append(uid)
make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
......@@ -357,7 +358,7 @@ class SQLDict(RAMDict, SQLBase):
# XXX: Bug here: Even if a duplicate message has an active_process,
# it won't be called on the duplicate.
message_with_active_process_list.append(m)
else:
elif m.is_executed == MESSAGE_NOT_EXECUTED:
# Should duplicate messages follow strictly the original message, or
# should they be just made available again ?
make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
......@@ -376,6 +377,11 @@ class SQLDict(RAMDict, SQLBase):
except:
LOG('SQLDict', WARNING, 'Failed to increase priority of %r' % (uid, ), error=sys.exc_info())
delay_uid_list.append(uid)
else:
# Internal CMFActivity error: the message can not be executed because
# something is missing (context object cannot be found, method cannot
# be accessed on object).
non_executable_message_list.append(uid)
if len(deletable_uid_list):
try:
self._retryOnLockError(activity_tool.SQLDict_delMessage, kw={'uid': deletable_uid_list})
......@@ -396,6 +402,11 @@ class SQLDict(RAMDict, SQLBase):
processing_node=INVOKE_ERROR_STATE)
except:
LOG('SQLDict', ERROR, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info())
if len(non_executable_message_list):
try:
activity_tool.SQLDict_assignMessage(uid=non_executable_message_list, processing_node=VALIDATE_ERROR_STATE)
except:
LOG('SQLDict', ERROR, 'Failed to set message to invalid path state for %r' % (non_executable_message_list, ), error=sys.exc_info())
if len(make_available_uid_list):
try:
makeMessageListAvailable(make_available_uid_list)
......@@ -472,7 +483,7 @@ class SQLDict(RAMDict, SQLBase):
else:
LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
# Abort if something failed.
if len([x for x in message_uid_priority_list if not x[1].is_executed]) != 0:
if len([x for x in message_uid_priority_list if x[1].is_executed == MESSAGE_NOT_EXECUTED]) != 0:
endTransaction = abortTransactionSynchronously
else:
endTransaction = get_transaction().commit
......@@ -489,7 +500,7 @@ class SQLDict(RAMDict, SQLBase):
LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
raise
for x in message_uid_priority_list:
x[1].is_executed = 0
x[1].is_executed = MESSAGE_NOT_EXECUTED
failed_message_uid_list = [x[0] for x in message_uid_priority_list]
try:
makeMessageListAvailable(failed_message_uid_list, uid_to_duplicate_uid_list_dict)
......@@ -541,7 +552,7 @@ class SQLDict(RAMDict, SQLBase):
validate_value = m.validate(self, activity_tool)
if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if not m.is_executed: # Make sure message could be invoked
if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (m.method_id , path))
......@@ -572,7 +583,7 @@ class SQLDict(RAMDict, SQLBase):
if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
# LOG('SQLDict.flush m.is_executed',0,m.is_executed)
if not m.is_executed: # Make sure message could be invoked
if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (m.method_id , path))
......
......@@ -26,7 +26,7 @@
#
##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from RAMQueue import RAMQueue
from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \
abortTransactionSynchronously
......@@ -197,12 +197,13 @@ class SQLQueue(RAMQueue, SQLBase):
final_error_uid_list = []
message_with_active_process_list = []
notify_user_list = []
non_executable_message_list = []
for uid, m, priority in message_uid_priority_list:
if m.is_executed:
if m.is_executed == MESSAGE_EXECUTED:
deletable_uid_list.append(uid)
if m.active_process:
message_with_active_process_list.append(m)
else:
elif m.is_executed == MESSAGE_NOT_EXECUTED:
if type(m.exc_type) is ClassType and \
issubclass(m.exc_type, ConflictError):
delay_uid_list.append(uid)
......@@ -224,6 +225,11 @@ class SQLQueue(RAMQueue, SQLBase):
LOG('SQLQueue', ERROR, 'Failed to unreserve %r' % (uid, ), error=sys.exc_info())
else:
LOG('SQLQueue', TRACE, 'Freed message %r' % (uid, ))
else:
# Internal CMFActivity error: the message can not be executed because
# something is missing (context object cannot be found, method cannot
# be accessed on object).
non_executable_message_list.append(uid)
if len(deletable_uid_list):
try:
self._retryOnLockError(activity_tool.SQLQueue_delMessage, kw={'uid': deletable_uid_list})
......@@ -249,6 +255,12 @@ class SQLQueue(RAMQueue, SQLBase):
processing_node=INVOKE_ERROR_STATE)
except:
LOG('SQLQueue', ERROR, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info())
if len(non_executable_message_list):
try:
activity_tool.SQLQueue_assignMessage(uid=non_executable_message_list,
processing_node=VALIDATE_ERROR_STATE)
except:
LOG('SQLQueue', ERROR, 'Failed to set message to invalid path state for %r' % (final_error_uid_list, ), error=sys.exc_info())
try:
for m in notify_user_list:
m.notifyUser(activity_tool)
......@@ -287,7 +299,7 @@ class SQLQueue(RAMQueue, SQLBase):
# Try to invoke
try:
activity_tool.invoke(value[1])
if value[1].is_executed:
if value[1].is_executed != MESSAGE_NOT_EXECUTED:
# Commit so that if a message raises it doesn't causes previous
# successfull messages to be rolled back. This commit might fail,
# so it is protected the same way as activity execution by the
......@@ -307,7 +319,7 @@ class SQLQueue(RAMQueue, SQLBase):
# We must make sure that the message is not set as executed.
# It is possible that the message is executed but the commit
# of the transaction fails
value[1].is_executed = 0
value[1].is_executed = MESSAGE_NOT_EXECUTED
try:
makeMessageListAvailable([value[0]])
except:
......@@ -368,7 +380,7 @@ class SQLQueue(RAMQueue, SQLBase):
validate_value = m.validate(self, activity_tool)
if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if not m.is_executed: # Make sure message could be invoked
if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (m.method_id , path))
......@@ -391,7 +403,7 @@ class SQLQueue(RAMQueue, SQLBase):
validate_value = m.validate(self, activity_tool)
if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if not m.is_executed: # Make sure message could be invoked
if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path))
......
......@@ -104,6 +104,10 @@ def registerActivity(activity):
activity_instance = activity()
activity_dict[activity.__name__] = activity_instance
MESSAGE_NOT_EXECUTED = 0
MESSAGE_EXECUTED = 1
MESSAGE_NOT_EXECUTABLE = 2
class Message:
"""Activity Message Class.
......@@ -128,7 +132,7 @@ class Message:
self.method_id = method_id
self.args = args
self.kw = kw
self.is_executed = 0
self.is_executed = MESSAGE_NOT_EXECUTED
self.exc_type = None
self.exc_value = None
self.traceback = None
......@@ -206,30 +210,41 @@ class Message:
def __call__(self, activity_tool):
try:
obj = self.getObject(activity_tool)
old_security_manager = getSecurityManager()
# Change user if required (TO BE DONE)
# We will change the user only in order to execute this method
user = self.changeUser(self.user_name, activity_tool)
except KeyError:
self.is_executed = MESSAGE_NOT_EXECUTABLE
else:
try:
result = getattr(obj, self.method_id)(*self.args, **self.kw)
finally:
setSecurityManager(old_security_manager)
self.activateResult(activity_tool, result, obj)
self.is_executed = 1
except:
self.is_executed = 0
exc_info = sys.exc_info()
self.exc_type = exc_info[0]
self.exc_value = str(exc_info[1])
self.traceback = ''.join(ExceptionFormatter.format_exception(
*exc_info))
LOG('ActivityTool', WARNING,
'Could not call method %s on object %s' % (
self.method_id, self.object_path), error=exc_info)
# push the error in ZODB error_log
if getattr(activity_tool, 'error_log', None) is not None:
activity_tool.error_log.raising(exc_info)
old_security_manager = getSecurityManager()
# Change user if required (TO BE DONE)
# We will change the user only in order to execute this method
user = self.changeUser(self.user_name, activity_tool)
# XXX: There is no check to see if user is allowed to access
# that method !
method = getattr(obj, self.method_id, None)
try:
if method is None:
self.is_executed = MESSAGE_NOT_EXECUTABLE
else:
result = method(*self.args, **self.kw)
finally:
setSecurityManager(old_security_manager)
if method is not None:
self.activateResult(activity_tool, result, obj)
self.is_executed = MESSAGE_EXECUTED
except:
self.is_executed = MESSAGE_NOT_EXECUTED
exc_info = sys.exc_info()
self.exc_type = exc_info[0]
self.exc_value = str(exc_info[1])
self.traceback = ''.join(ExceptionFormatter.format_exception(
*exc_info))
LOG('ActivityTool', WARNING,
'Could not call method %s on object %s' % (
self.method_id, self.object_path), error=exc_info)
# push the error in ZODB error_log
if getattr(activity_tool, 'error_log', None) is not None:
activity_tool.error_log.raising(exc_info)
def validate(self, activity, activity_tool, check_order_validation=1):
return activity.validate(activity_tool, self,
......@@ -846,13 +861,17 @@ class ActivityTool (Folder, UniqueObject):
expanded_object_list = []
new_message_list = []
path_dict = {}
# Filter the list of messages. If an object is not available, ignore such a message.
# Filter the list of messages. If an object is not available, mark its message as non-executable.
# In addition, expand an object if necessary, and make sure that no duplication happens.
for m in message_list:
# alternate method is used to segregate objects which cannot be grouped.
alternate_method_id = m.activity_kw.get('alternate_method_id')
try:
obj = m.getObject(self)
except KeyError:
m.is_executed = MESSAGE_NOT_EXECUTABLE
continue
try:
i = len(new_message_list) # This is an index of this message in new_message_list.
if m.hasExpandMethod():
for subobj in m.getObjectList(self):
......@@ -890,7 +909,7 @@ class ActivityTool (Folder, UniqueObject):
object_list.append(obj)
new_message_list.append(m)
except:
m.is_executed = 0
m.is_executed = MESSAGE_NOT_EXECUTED
exc_info = sys.exc_info()
m.exc_type = exc_info[0]
m.exc_value = str(exc_info[1])
......@@ -917,7 +936,7 @@ class ActivityTool (Folder, UniqueObject):
traceback = ''.join(ExceptionFormatter.format_exception(
*exc_info))
for m in new_message_list:
m.is_executed = 0
m.is_executed = MESSAGE_NOT_EXECUTED
m.exc_type = exc_type
m.exc_value = exc_value
m.traceback = traceback
......@@ -937,16 +956,16 @@ class ActivityTool (Folder, UniqueObject):
object = object_list[i]
m = new_message_list[i]
if i in failed_message_dict:
m.is_executed = 0
m.is_executed = MESSAGE_NOT_EXECUTED
LOG('ActivityTool', WARNING,
'the method %s partially failed on object %s' %
(m.method_id, m.object_path,))
else:
try:
m.activateResult(self, result, object)
m.is_executed = 1
m.is_executed = MESSAGE_EXECUTED
except:
m.is_executed = 0
m.is_executed = MESSAGE_NOT_EXECUTED
m.exc_type = sys.exc_info()[0]
LOG('ActivityTool', WARNING,
'Could not call method %s on object %s' % (
......
......@@ -2826,6 +2826,93 @@ class TestCMFActivity(ERP5TypeTestCase):
finally:
delattr(Organisation, 'checkAbsoluteUrl')
def CheckMissingActivityContextObject(self, activity):
"""
Check that a message whose context has ben deleted goes to -3
processing_node.
This must happen on first message execution, without any delay.
"""
readMessageList = getattr(self.getPortalObject(), '%s_readMessageList' % (activity, ))
activity_tool = self.getActivityTool()
container = self.getPortal().organisation_module
organisation = container.newContent(portal_type='Organisation')
get_transaction().commit()
self.tic()
organisation.activate(activity=activity).getTitle()
get_transaction().commit()
self.assertEqual(len(activity_tool.getMessageList()), 1)
# Here, we delete the subobject using most low-level method, to avoid
# pending activity to be removed.
organisation_id = organisation.id
container._delOb(organisation_id)
del organisation # Avoid keeping a reference to a deleted object.
get_transaction().commit()
self.assertEqual(getattr(container, organisation_id, None), None)
self.assertEqual(len(activity_tool.getMessageList()), 1)
activity_tool.distribute()
self.assertEquals(len(readMessageList(processing_node=-3,
include_processing=1)), 0)
activity_tool.tic()
self.assertEquals(len(readMessageList(processing_node=-3,
include_processing=1)), 1)
def test_109_checkMissingActivityContextObjectSQLDict(self, quiet=0,
run=run_all_test):
if not run: return
if not quiet:
message = '\nCheck missing activity context object (SQLDict)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.CheckMissingActivityContextObject('SQLDict')
def test_110_checkMissingActivityContextObjectSQLQueue(self, quiet=0,
run=run_all_test):
if not run: return
if not quiet:
message = '\nCheck missing activity context object (SQLQueue)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.CheckMissingActivityContextObject('SQLQueue')
def test_111_checkMissingActivityContextObjectSQLDict(self, quiet=0,
run=run_all_test):
"""
This is similar to tst 108, but here the object will be missing for an
activity with a group_method_id.
"""
if not run: return
if not quiet:
message = '\nCheck missing activity context object with ' \
'group_method_id (SQLDict)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
readMessageList = self.getPortalObject().SQLDict_readMessageList
activity_tool = self.getActivityTool()
container = self.getPortalObject().organisation_module
organisation = container.newContent(portal_type='Organisation')
organisation_2 = container.newContent(portal_type='Organisation')
get_transaction().commit()
self.tic()
organisation.reindexObject()
organisation_2.reindexObject()
get_transaction().commit()
self.assertEqual(len(activity_tool.getMessageList()), 2)
# Here, we delete the subobject using most low-level method, to avoid
# pending activity to be removed.
organisation_id = organisation.id
container._delOb(organisation_id)
del organisation # Avoid keeping a reference to a deleted object.
get_transaction().commit()
self.assertEqual(getattr(container, organisation_id, None), None)
self.assertEqual(len(activity_tool.getMessageList()), 2)
activity_tool.distribute()
self.assertEquals(len(readMessageList(processing_node=-3,
include_processing=1)), 0)
activity_tool.tic()
self.assertEquals(len(readMessageList(processing_node=-3,
include_processing=1)), 1)
# The message excuted on "organisation_2" must have succeeded.
self.assertEqual(len(activity_tool.getMessageList()), 1)
def test_suite():
suite = unittest.TestSuite()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment