diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 88ba532406a8ae604b414263cdee648d67aff4c0..ad9577b7f9a07fa61aed6d802fc131791e9fd69b 100644 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -291,45 +291,48 @@ class SQLDict(RAMDict): # Release locks before starting a potentially long calculation get_transaction().commit() + + # Try to invoke + if group_method_id is not None: + LOG('SQLDict', TRACE, + 'invoking a group method %s with %d objects '\ + ' (%d objects in expanded form)' % ( + group_method_id, len(message_list), count)) + activity_tool.invokeGroup(group_method_id, message_list) + else: + activity_tool.invoke(message_list[0]) + + # Check if messages are executed successfully. + # When some of them are executed successfully, it may not be acceptable to + # abort the transaction, because these remain pending, only due to other + # invalid messages. This means that a group method should not be used if + # it has a side effect. For now, only indexing uses a group method, and this + # has no side effect. + for m in message_list: + if m.is_executed: + get_transaction().commit() + break + else: + get_transaction().abort() except ConflictError: # If a conflict occurs, abort the transaction to minimize the impact, # then simply delay the operations. get_transaction().abort() for uid_list in uid_list_list: - activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY, - retry = 1) + if len(uid_list): + activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY, + retry = 1) get_transaction().commit() return 0 except: # For other exceptions, put the messages to an invalid state immediately. get_transaction().abort() for uid_list in uid_list_list: - activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE) + if len(uid_list): + activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE) get_transaction().commit() return 0 - # Try to invoke - if group_method_id is not None: - LOG('SQLDict', TRACE, - 'invoking a group method %s with %d objects '\ - ' (%d objects in expanded form)' % ( - group_method_id, len(message_list), count)) - activity_tool.invokeGroup(group_method_id, message_list) - else: - activity_tool.invoke(message_list[0]) - - # Check if messages are executed successfully. - # When some of them are executed successfully, it may not be acceptable to - # abort the transaction, because these remain pending, only due to other - # invalid messages. This means that a group method should not be used if - # it has a side effect. For now, only indexing uses a group method, and this - # has no side effect. - for m in message_list: - if m.is_executed: - break - else: - get_transaction().abort() - for i in xrange(len(message_list)): m = message_list[i] uid_list = uid_list_list[i] diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py index a804c53840497a8501e257efceddd4d6b3cc2bca..ea7695aa61c16e653e9f59cc1258752ea53642ee 100644 --- a/product/CMFActivity/Activity/SQLQueue.py +++ b/product/CMFActivity/Activity/SQLQueue.py @@ -119,6 +119,11 @@ class SQLQueue(RAMQueue): priority = line.priority) get_transaction().commit() # Release locks before starting a potentially long calculation return 0 + + # Try to invoke + activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ? + if m.is_executed: # Make sure message could be invoked + get_transaction().commit() # If successful, commit except ConflictError: # If a conflict occurs, catch it and delay the operation. get_transaction().abort() @@ -134,11 +139,8 @@ class SQLQueue(RAMQueue): return 0 - # Try to invoke - activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ? - if m.is_executed: # Make sure message could be invoked + if m.is_executed: activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it - get_transaction().commit() # If successful, commit else: get_transaction().abort() # If not, abort transaction and start a new one if line.priority > MAX_PRIORITY: @@ -146,12 +148,11 @@ class SQLQueue(RAMQueue): activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE) # Assign message back to 'error' state m.notifyUser(activity_tool) # Notify Error - get_transaction().commit() # and commit else: # Lower priority activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date, priority = line.priority + 1) - get_transaction().commit() # Release locks before starting a potentially long calculation + get_transaction().commit() return 0 get_transaction().commit() # Release locks before starting a potentially long calculation return 1