diff --git a/product/CMFActivity/Activity/RAMDict.py b/product/CMFActivity/Activity/RAMDict.py index b376e035c383890243fa20c196a615cc34523516..3a1bcce44565e8d6e75ade02ec68a7644d554a66 100644 --- a/product/CMFActivity/Activity/RAMDict.py +++ b/product/CMFActivity/Activity/RAMDict.py @@ -26,7 +26,7 @@ # ############################################################################## -from Products.CMFActivity.ActivityTool import registerActivity +from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_EXECUTED from Products.CMFActivity.Errors import ActivityFlushError from Queue import Queue, VALID @@ -83,7 +83,7 @@ class RAMDict(Queue): for key, m in self.getDict(path).items(): if m.validate(self, activity_tool) is VALID: activity_tool.invoke(m) - if m.is_executed: + if m.is_executed == MESSAGE_EXECUTED: del self.getDict(path)[key] get_transaction().commit() return 0 @@ -133,7 +133,7 @@ class RAMDict(Queue): # First Validate if m.validate(self, activity_tool) is VALID: activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? - if not m.is_executed: # Make sure message could be invoked + if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked # The message no longer exists raise ActivityFlushError, ( 'Could not evaluate %s on %s' % (method_id , path)) @@ -158,7 +158,7 @@ class RAMDict(Queue): LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path)) if invoke: activity_tool.invoke(m) - if m.is_executed: + if m.is_executed == MESSAGE_EXECUTED: method_dict[m.method_id] = 1 self.deleteMessage(activity_tool, m) else: diff --git a/product/CMFActivity/Activity/RAMQueue.py b/product/CMFActivity/Activity/RAMQueue.py index 27f52a86fac6a6eaef0e0ce697c18e73e63b8fc4..d2bb08aad675d7eec7bc5cff13257312c3bce438 100644 --- a/product/CMFActivity/Activity/RAMQueue.py +++ b/product/CMFActivity/Activity/RAMQueue.py @@ -26,7 +26,7 @@ # ############################################################################## -from Products.CMFActivity.ActivityTool import registerActivity +from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_EXECUTED from Queue import Queue, VALID try: @@ -70,7 +70,7 @@ class RAMQueue(Queue): get_transaction().commit() # Start a new transaction return 0 # Keep on ticking activity_tool.invoke(m) - if m.is_executed: + if m.is_executed == MESSAGE_EXECUTED: self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling) get_transaction().commit() # Start a new transaction return 0 # Keep on ticking @@ -117,7 +117,7 @@ class RAMQueue(Queue): else: if invoke: activity_tool.invoke(m) - if m.is_executed: + if m.is_executed == MESSAGE_EXECUTED: activity_tool.unregisterMessage(self, m) else: activity_tool.unregisterMessage(self, m) @@ -130,7 +130,7 @@ class RAMQueue(Queue): else: if invoke: activity_tool.invoke(m) - if m.is_executed: + if m.is_executed == MESSAGE_EXECUTED: self.deleteMessage(activity_tool, m) # Only delete if no error happens else: self.deleteMessage(activity_tool, m) diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 8732ca15b2eea9f788f9594d25199ee1eafdb3f0..c6e3b3f8cc31135504a9ec6ac111782f1c333817 100644 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -26,7 +26,7 @@ # ############################################################################## -from Products.CMFActivity.ActivityTool import registerActivity +from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \ abortTransactionSynchronously from RAMDict import RAMDict @@ -344,9 +344,10 @@ class SQLDict(RAMDict, SQLBase): make_available_uid_list = [] message_with_active_process_list = [] notify_user_list = [] - something_failed = (len([x for x in message_uid_priority_list if not x[1].is_executed]) != 0) + non_executable_message_list = [] + something_failed = (len([x for x in message_uid_priority_list if x[1].is_executed == MESSAGE_NOT_EXECUTED]) != 0) for uid, m, priority in message_uid_priority_list: - if m.is_executed: + if m.is_executed == MESSAGE_EXECUTED: if something_failed: make_available_uid_list.append(uid) make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, [])) @@ -357,7 +358,7 @@ class SQLDict(RAMDict, SQLBase): # XXX: Bug here: Even if a duplicate message has an active_process, # it won't be called on the duplicate. message_with_active_process_list.append(m) - else: + elif m.is_executed == MESSAGE_NOT_EXECUTED: # Should duplicate messages follow strictly the original message, or # should they be just made available again ? make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, [])) @@ -376,6 +377,11 @@ class SQLDict(RAMDict, SQLBase): except: LOG('SQLDict', WARNING, 'Failed to increase priority of %r' % (uid, ), error=sys.exc_info()) delay_uid_list.append(uid) + else: + # Internal CMFActivity error: the message can not be executed because + # something is missing (context object cannot be found, method cannot + # be accessed on object). + non_executable_message_list.append(uid) if len(deletable_uid_list): try: self._retryOnLockError(activity_tool.SQLDict_delMessage, kw={'uid': deletable_uid_list}) @@ -396,6 +402,11 @@ class SQLDict(RAMDict, SQLBase): processing_node=INVOKE_ERROR_STATE) except: LOG('SQLDict', ERROR, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info()) + if len(non_executable_message_list): + try: + activity_tool.SQLDict_assignMessage(uid=non_executable_message_list, processing_node=VALIDATE_ERROR_STATE) + except: + LOG('SQLDict', ERROR, 'Failed to set message to invalid path state for %r' % (non_executable_message_list, ), error=sys.exc_info()) if len(make_available_uid_list): try: makeMessageListAvailable(make_available_uid_list) @@ -472,7 +483,7 @@ class SQLDict(RAMDict, SQLBase): else: LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list)) # Abort if something failed. - if len([x for x in message_uid_priority_list if not x[1].is_executed]) != 0: + if len([x for x in message_uid_priority_list if x[1].is_executed == MESSAGE_NOT_EXECUTED]) != 0: endTransaction = abortTransactionSynchronously else: endTransaction = get_transaction().commit @@ -489,7 +500,7 @@ class SQLDict(RAMDict, SQLBase): LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.') raise for x in message_uid_priority_list: - x[1].is_executed = 0 + x[1].is_executed = MESSAGE_NOT_EXECUTED failed_message_uid_list = [x[0] for x in message_uid_priority_list] try: makeMessageListAvailable(failed_message_uid_list, uid_to_duplicate_uid_list_dict) @@ -541,7 +552,7 @@ class SQLDict(RAMDict, SQLBase): validate_value = m.validate(self, activity_tool) if validate_value is VALID: activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? - if not m.is_executed: # Make sure message could be invoked + if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked # The message no longer exists raise ActivityFlushError, ( 'Could not evaluate %s on %s' % (m.method_id , path)) @@ -572,7 +583,7 @@ class SQLDict(RAMDict, SQLBase): if validate_value is VALID: activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? # LOG('SQLDict.flush m.is_executed',0,m.is_executed) - if not m.is_executed: # Make sure message could be invoked + if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked # The message no longer exists raise ActivityFlushError, ( 'Could not evaluate %s on %s' % (m.method_id , path)) diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py index dda0d2a604434d65d783df8ed7ea5d37e53aa294..d222636263fa0cc849ecb80a1694b4bbcdf87e7d 100644 --- a/product/CMFActivity/Activity/SQLQueue.py +++ b/product/CMFActivity/Activity/SQLQueue.py @@ -26,7 +26,7 @@ # ############################################################################## -from Products.CMFActivity.ActivityTool import registerActivity +from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED from RAMQueue import RAMQueue from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \ abortTransactionSynchronously @@ -197,12 +197,13 @@ class SQLQueue(RAMQueue, SQLBase): final_error_uid_list = [] message_with_active_process_list = [] notify_user_list = [] + non_executable_message_list = [] for uid, m, priority in message_uid_priority_list: - if m.is_executed: + if m.is_executed == MESSAGE_EXECUTED: deletable_uid_list.append(uid) if m.active_process: message_with_active_process_list.append(m) - else: + elif m.is_executed == MESSAGE_NOT_EXECUTED: if type(m.exc_type) is ClassType and \ issubclass(m.exc_type, ConflictError): delay_uid_list.append(uid) @@ -224,6 +225,11 @@ class SQLQueue(RAMQueue, SQLBase): LOG('SQLQueue', ERROR, 'Failed to unreserve %r' % (uid, ), error=sys.exc_info()) else: LOG('SQLQueue', TRACE, 'Freed message %r' % (uid, )) + else: + # Internal CMFActivity error: the message can not be executed because + # something is missing (context object cannot be found, method cannot + # be accessed on object). + non_executable_message_list.append(uid) if len(deletable_uid_list): try: self._retryOnLockError(activity_tool.SQLQueue_delMessage, kw={'uid': deletable_uid_list}) @@ -249,6 +255,12 @@ class SQLQueue(RAMQueue, SQLBase): processing_node=INVOKE_ERROR_STATE) except: LOG('SQLQueue', ERROR, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info()) + if len(non_executable_message_list): + try: + activity_tool.SQLQueue_assignMessage(uid=non_executable_message_list, + processing_node=VALIDATE_ERROR_STATE) + except: + LOG('SQLQueue', ERROR, 'Failed to set message to invalid path state for %r' % (final_error_uid_list, ), error=sys.exc_info()) try: for m in notify_user_list: m.notifyUser(activity_tool) @@ -287,7 +299,7 @@ class SQLQueue(RAMQueue, SQLBase): # Try to invoke try: activity_tool.invoke(value[1]) - if value[1].is_executed: + if value[1].is_executed != MESSAGE_NOT_EXECUTED: # Commit so that if a message raises it doesn't causes previous # successfull messages to be rolled back. This commit might fail, # so it is protected the same way as activity execution by the @@ -307,7 +319,7 @@ class SQLQueue(RAMQueue, SQLBase): # We must make sure that the message is not set as executed. # It is possible that the message is executed but the commit # of the transaction fails - value[1].is_executed = 0 + value[1].is_executed = MESSAGE_NOT_EXECUTED try: makeMessageListAvailable([value[0]]) except: @@ -368,7 +380,7 @@ class SQLQueue(RAMQueue, SQLBase): validate_value = m.validate(self, activity_tool) if validate_value is VALID: activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? - if not m.is_executed: # Make sure message could be invoked + if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked # The message no longer exists raise ActivityFlushError, ( 'Could not evaluate %s on %s' % (m.method_id , path)) @@ -391,7 +403,7 @@ class SQLQueue(RAMQueue, SQLBase): validate_value = m.validate(self, activity_tool) if validate_value is VALID: activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? - if not m.is_executed: # Make sure message could be invoked + if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked # The message no longer exists raise ActivityFlushError, ( 'Could not evaluate %s on %s' % (method_id , path)) diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py index 7cd0a3648e00eb813612d8dba976afa4aa18c3fc..715928e03cd87356783002fc2a4d8ae0815a23a7 100644 --- a/product/CMFActivity/ActivityTool.py +++ b/product/CMFActivity/ActivityTool.py @@ -104,6 +104,10 @@ def registerActivity(activity): activity_instance = activity() activity_dict[activity.__name__] = activity_instance +MESSAGE_NOT_EXECUTED = 0 +MESSAGE_EXECUTED = 1 +MESSAGE_NOT_EXECUTABLE = 2 + class Message: """Activity Message Class. @@ -128,7 +132,7 @@ class Message: self.method_id = method_id self.args = args self.kw = kw - self.is_executed = 0 + self.is_executed = MESSAGE_NOT_EXECUTED self.exc_type = None self.exc_value = None self.traceback = None @@ -206,30 +210,41 @@ class Message: def __call__(self, activity_tool): try: obj = self.getObject(activity_tool) - old_security_manager = getSecurityManager() - # Change user if required (TO BE DONE) - # We will change the user only in order to execute this method - user = self.changeUser(self.user_name, activity_tool) + except KeyError: + self.is_executed = MESSAGE_NOT_EXECUTABLE + else: try: - result = getattr(obj, self.method_id)(*self.args, **self.kw) - finally: - setSecurityManager(old_security_manager) - - self.activateResult(activity_tool, result, obj) - self.is_executed = 1 - except: - self.is_executed = 0 - exc_info = sys.exc_info() - self.exc_type = exc_info[0] - self.exc_value = str(exc_info[1]) - self.traceback = ''.join(ExceptionFormatter.format_exception( - *exc_info)) - LOG('ActivityTool', WARNING, - 'Could not call method %s on object %s' % ( - self.method_id, self.object_path), error=exc_info) - # push the error in ZODB error_log - if getattr(activity_tool, 'error_log', None) is not None: - activity_tool.error_log.raising(exc_info) + old_security_manager = getSecurityManager() + # Change user if required (TO BE DONE) + # We will change the user only in order to execute this method + user = self.changeUser(self.user_name, activity_tool) + # XXX: There is no check to see if user is allowed to access + # that method ! + method = getattr(obj, self.method_id, None) + try: + if method is None: + self.is_executed = MESSAGE_NOT_EXECUTABLE + else: + result = method(*self.args, **self.kw) + finally: + setSecurityManager(old_security_manager) + + if method is not None: + self.activateResult(activity_tool, result, obj) + self.is_executed = MESSAGE_EXECUTED + except: + self.is_executed = MESSAGE_NOT_EXECUTED + exc_info = sys.exc_info() + self.exc_type = exc_info[0] + self.exc_value = str(exc_info[1]) + self.traceback = ''.join(ExceptionFormatter.format_exception( + *exc_info)) + LOG('ActivityTool', WARNING, + 'Could not call method %s on object %s' % ( + self.method_id, self.object_path), error=exc_info) + # push the error in ZODB error_log + if getattr(activity_tool, 'error_log', None) is not None: + activity_tool.error_log.raising(exc_info) def validate(self, activity, activity_tool, check_order_validation=1): return activity.validate(activity_tool, self, @@ -846,13 +861,17 @@ class ActivityTool (Folder, UniqueObject): expanded_object_list = [] new_message_list = [] path_dict = {} - # Filter the list of messages. If an object is not available, ignore such a message. + # Filter the list of messages. If an object is not available, mark its message as non-executable. # In addition, expand an object if necessary, and make sure that no duplication happens. for m in message_list: # alternate method is used to segregate objects which cannot be grouped. alternate_method_id = m.activity_kw.get('alternate_method_id') try: obj = m.getObject(self) + except KeyError: + m.is_executed = MESSAGE_NOT_EXECUTABLE + continue + try: i = len(new_message_list) # This is an index of this message in new_message_list. if m.hasExpandMethod(): for subobj in m.getObjectList(self): @@ -890,7 +909,7 @@ class ActivityTool (Folder, UniqueObject): object_list.append(obj) new_message_list.append(m) except: - m.is_executed = 0 + m.is_executed = MESSAGE_NOT_EXECUTED exc_info = sys.exc_info() m.exc_type = exc_info[0] m.exc_value = str(exc_info[1]) @@ -917,7 +936,7 @@ class ActivityTool (Folder, UniqueObject): traceback = ''.join(ExceptionFormatter.format_exception( *exc_info)) for m in new_message_list: - m.is_executed = 0 + m.is_executed = MESSAGE_NOT_EXECUTED m.exc_type = exc_type m.exc_value = exc_value m.traceback = traceback @@ -937,16 +956,16 @@ class ActivityTool (Folder, UniqueObject): object = object_list[i] m = new_message_list[i] if i in failed_message_dict: - m.is_executed = 0 + m.is_executed = MESSAGE_NOT_EXECUTED LOG('ActivityTool', WARNING, 'the method %s partially failed on object %s' % (m.method_id, m.object_path,)) else: try: m.activateResult(self, result, object) - m.is_executed = 1 + m.is_executed = MESSAGE_EXECUTED except: - m.is_executed = 0 + m.is_executed = MESSAGE_NOT_EXECUTED m.exc_type = sys.exc_info()[0] LOG('ActivityTool', WARNING, 'Could not call method %s on object %s' % ( diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py index c695cf0c78600fd257a475001d73f4009d48d1e2..69e2cc8f805cc8576d6bcb0b998b6ced7d9e4d86 100644 --- a/product/CMFActivity/tests/testCMFActivity.py +++ b/product/CMFActivity/tests/testCMFActivity.py @@ -2826,6 +2826,93 @@ class TestCMFActivity(ERP5TypeTestCase): finally: delattr(Organisation, 'checkAbsoluteUrl') + def CheckMissingActivityContextObject(self, activity): + """ + Check that a message whose context has ben deleted goes to -3 + processing_node. + This must happen on first message execution, without any delay. + """ + readMessageList = getattr(self.getPortalObject(), '%s_readMessageList' % (activity, )) + activity_tool = self.getActivityTool() + container = self.getPortal().organisation_module + organisation = container.newContent(portal_type='Organisation') + get_transaction().commit() + self.tic() + organisation.activate(activity=activity).getTitle() + get_transaction().commit() + self.assertEqual(len(activity_tool.getMessageList()), 1) + # Here, we delete the subobject using most low-level method, to avoid + # pending activity to be removed. + organisation_id = organisation.id + container._delOb(organisation_id) + del organisation # Avoid keeping a reference to a deleted object. + get_transaction().commit() + self.assertEqual(getattr(container, organisation_id, None), None) + self.assertEqual(len(activity_tool.getMessageList()), 1) + activity_tool.distribute() + self.assertEquals(len(readMessageList(processing_node=-3, + include_processing=1)), 0) + activity_tool.tic() + self.assertEquals(len(readMessageList(processing_node=-3, + include_processing=1)), 1) + + def test_109_checkMissingActivityContextObjectSQLDict(self, quiet=0, + run=run_all_test): + if not run: return + if not quiet: + message = '\nCheck missing activity context object (SQLDict)' + ZopeTestCase._print(message) + LOG('Testing... ',0,message) + self.CheckMissingActivityContextObject('SQLDict') + + def test_110_checkMissingActivityContextObjectSQLQueue(self, quiet=0, + run=run_all_test): + if not run: return + if not quiet: + message = '\nCheck missing activity context object (SQLQueue)' + ZopeTestCase._print(message) + LOG('Testing... ',0,message) + self.CheckMissingActivityContextObject('SQLQueue') + + def test_111_checkMissingActivityContextObjectSQLDict(self, quiet=0, + run=run_all_test): + """ + This is similar to tst 108, but here the object will be missing for an + activity with a group_method_id. + """ + if not run: return + if not quiet: + message = '\nCheck missing activity context object with ' \ + 'group_method_id (SQLDict)' + ZopeTestCase._print(message) + LOG('Testing... ',0,message) + readMessageList = self.getPortalObject().SQLDict_readMessageList + activity_tool = self.getActivityTool() + container = self.getPortalObject().organisation_module + organisation = container.newContent(portal_type='Organisation') + organisation_2 = container.newContent(portal_type='Organisation') + get_transaction().commit() + self.tic() + organisation.reindexObject() + organisation_2.reindexObject() + get_transaction().commit() + self.assertEqual(len(activity_tool.getMessageList()), 2) + # Here, we delete the subobject using most low-level method, to avoid + # pending activity to be removed. + organisation_id = organisation.id + container._delOb(organisation_id) + del organisation # Avoid keeping a reference to a deleted object. + get_transaction().commit() + self.assertEqual(getattr(container, organisation_id, None), None) + self.assertEqual(len(activity_tool.getMessageList()), 2) + activity_tool.distribute() + self.assertEquals(len(readMessageList(processing_node=-3, + include_processing=1)), 0) + activity_tool.tic() + self.assertEquals(len(readMessageList(processing_node=-3, + include_processing=1)), 1) + # The message excuted on "organisation_2" must have succeeded. + self.assertEqual(len(activity_tool.getMessageList()), 1) def test_suite(): suite = unittest.TestSuite()