Commit 61aecbbe authored by Vincent Pelletier's avatar Vincent Pelletier

Document finalizeMessageExecution.

Cause partial execution failure to abort transaction.
When some messages succeeded and some failed, make successful messages available for immediate processing, and make failed messages go through usual conditions (involves postponing them).
This fixes a stupid mistake in makeMessageListAvailable invoquation on delay_uid_list instead of [uid] in finalizeMessageExecution..


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@18526 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 0180d6b6
...@@ -295,17 +295,32 @@ class SQLDict(RAMDict, SQLBase): ...@@ -295,17 +295,32 @@ class SQLDict(RAMDict, SQLBase):
return [], 0, None return [], 0, None
def finalizeMessageExecution(self, activity_tool, message_uid_priority_list): def finalizeMessageExecution(self, activity_tool, message_uid_priority_list):
"""
If everything was fine, delete all messages.
If anything failed, make successfull messages available (if any), and
the following rules apply to failed messages:
- Failures due to ConflictErrors cause messages to be postponed,
but their priority is *not* increased.
- Failures of messages already above maximum priority cause them to
be put in a permanent-error state.
- In all other cases, priotity is increased and message is delayed.
"""
def makeMessageListAvailable(uid_list): def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list) self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
deletable_uid_list = [] deletable_uid_list = []
delay_uid_list = [] delay_uid_list = []
final_error_uid_list = [] final_error_uid_list = []
make_available_uid_list = []
message_with_active_process_list = [] message_with_active_process_list = []
something_failed = (len([x for x in message_uid_priority_list if not x[1].is_executed]) != 0)
for uid, m, priority in message_uid_priority_list: for uid, m, priority in message_uid_priority_list:
if m.is_executed: if m.is_executed:
deletable_uid_list.append(uid) if something_failed:
if m.active_process: make_available_uid_list.append(uid)
message_with_active_process_list.append(m) else:
deletable_uid_list.append(uid)
if m.active_process:
message_with_active_process_list.append(m)
else: else:
if type(m.exc_type) is ClassType and \ if type(m.exc_type) is ClassType and \
issubclass(m.exc_type, ConflictError): issubclass(m.exc_type, ConflictError):
...@@ -317,16 +332,10 @@ class SQLDict(RAMDict, SQLBase): ...@@ -317,16 +332,10 @@ class SQLDict(RAMDict, SQLBase):
# Immediately update, because values different for every message # Immediately update, because values different for every message
activity_tool.SQLDict_setPriority( activity_tool.SQLDict_setPriority(
uid=[uid], uid=[uid],
delay=VALIDATION_ERROR_DELAY,
priority=priority + 1) priority=priority + 1)
except: except:
LOG('SQLDict', WARNING, 'Failed to increase priority of %r' % (uid, ), error=sys.exc_info()) LOG('SQLDict', WARNING, 'Failed to increase priority of %r' % (uid, ), error=sys.exc_info())
try: delay_uid_list.append(uid)
makeMessageListAvailable(delay_uid_list)
except:
LOG('SQLDict', PANIC, 'Failed to unreserve %r' % (uid, ), error=sys.exc_info())
else:
LOG('SQLDict', TRACE, 'Freed message %r' % (uid, ))
if len(deletable_uid_list): if len(deletable_uid_list):
try: try:
activity_tool.SQLDict_delMessage(uid=deletable_uid_list) activity_tool.SQLDict_delMessage(uid=deletable_uid_list)
...@@ -340,18 +349,20 @@ class SQLDict(RAMDict, SQLBase): ...@@ -340,18 +349,20 @@ class SQLDict(RAMDict, SQLBase):
activity_tool.SQLDict_setPriority(uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY) activity_tool.SQLDict_setPriority(uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY)
except: except:
LOG('SQLDict', TRACE, 'Failed to delay %r' % (delay_uid_list, ), error=sys.exc_info()) LOG('SQLDict', TRACE, 'Failed to delay %r' % (delay_uid_list, ), error=sys.exc_info())
try: make_available_uid_list += delay_uid_list
makeMessageListAvailable(delay_uid_list)
except:
LOG('SQLDict', PANIC, 'Failed to unreserve %r' % (delay_uid_list, ), error=sys.exc_info())
else:
LOG('SQLDict', TRACE, 'Freed messages %r' % (delay_uid_list, ))
if len(final_error_uid_list): if len(final_error_uid_list):
try: try:
activity_tool.SQLDict_assignMessage(uid=final_error_uid_list, activity_tool.SQLDict_assignMessage(uid=final_error_uid_list,
processing_node=INVOKE_ERROR_STATE) processing_node=INVOKE_ERROR_STATE)
except: except:
LOG('SQLDict', WARNING, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info()) LOG('SQLDict', WARNING, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info())
if len(make_available_uid_list):
try:
makeMessageListAvailable(make_available_uid_list)
except:
LOG('SQLDict', PANIC, 'Failed to unreserve %r' % (make_available_uid_list, ), error=sys.exc_info())
else:
LOG('SQLDict', TRACE, 'Freed messages %r' % (make_available_uid_list, ))
for m in message_with_active_process_list: for m in message_with_active_process_list:
active_process = activity_tool.unrestrictedTraverse(m.active_process) active_process = activity_tool.unrestrictedTraverse(m.active_process)
if not active_process.hasActivity(): if not active_process.hasActivity():
...@@ -409,10 +420,8 @@ class SQLDict(RAMDict, SQLBase): ...@@ -409,10 +420,8 @@ class SQLDict(RAMDict, SQLBase):
LOG('SQLDict', PANIC, LOG('SQLDict', PANIC,
'abort failed, thus some objects may be modified accidentally') 'abort failed, thus some objects may be modified accidentally')
return True # Stop processing messages for this tic call for this queue. return True # Stop processing messages for this tic call for this queue.
# Only abort if nothing succeeded. # Abort if something failed.
# This means that when processing multiple messages, failed ones must not cause if len([x for x in message_uid_priority_list if not x[1].is_executed]) != 0:
# bad things to happen if transaction is commited.
if len([x for x in message_uid_priority_list if x[1].is_executed]) == 0:
endTransaction = abortTransactionSynchronously endTransaction = abortTransactionSynchronously
else: else:
endTransaction = get_transaction().commit endTransaction = get_transaction().commit
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment