Commit e69ff3a5 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Rescue the situation where a conflict error happens after executing an active object.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@8244 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 5fa35e8d
......@@ -291,45 +291,48 @@ class SQLDict(RAMDict):
# Release locks before starting a potentially long calculation
get_transaction().commit()
# Try to invoke
if group_method_id is not None:
LOG('SQLDict', TRACE,
'invoking a group method %s with %d objects '\
' (%d objects in expanded form)' % (
group_method_id, len(message_list), count))
activity_tool.invokeGroup(group_method_id, message_list)
else:
activity_tool.invoke(message_list[0])
# Check if messages are executed successfully.
# When some of them are executed successfully, it may not be acceptable to
# abort the transaction, because these remain pending, only due to other
# invalid messages. This means that a group method should not be used if
# it has a side effect. For now, only indexing uses a group method, and this
# has no side effect.
for m in message_list:
if m.is_executed:
get_transaction().commit()
break
else:
get_transaction().abort()
except ConflictError:
# If a conflict occurs, abort the transaction to minimize the impact,
# then simply delay the operations.
get_transaction().abort()
for uid_list in uid_list_list:
activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
retry = 1)
if len(uid_list):
activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
retry = 1)
get_transaction().commit()
return 0
except:
# For other exceptions, put the messages to an invalid state immediately.
get_transaction().abort()
for uid_list in uid_list_list:
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
if len(uid_list):
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
get_transaction().commit()
return 0
# Try to invoke
if group_method_id is not None:
LOG('SQLDict', TRACE,
'invoking a group method %s with %d objects '\
' (%d objects in expanded form)' % (
group_method_id, len(message_list), count))
activity_tool.invokeGroup(group_method_id, message_list)
else:
activity_tool.invoke(message_list[0])
# Check if messages are executed successfully.
# When some of them are executed successfully, it may not be acceptable to
# abort the transaction, because these remain pending, only due to other
# invalid messages. This means that a group method should not be used if
# it has a side effect. For now, only indexing uses a group method, and this
# has no side effect.
for m in message_list:
if m.is_executed:
break
else:
get_transaction().abort()
for i in xrange(len(message_list)):
m = message_list[i]
uid_list = uid_list_list[i]
......
......@@ -119,6 +119,11 @@ class SQLQueue(RAMQueue):
priority = line.priority)
get_transaction().commit() # Release locks before starting a potentially long calculation
return 0
# Try to invoke
activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
if m.is_executed: # Make sure message could be invoked
get_transaction().commit() # If successful, commit
except ConflictError:
# If a conflict occurs, catch it and delay the operation.
get_transaction().abort()
......@@ -134,11 +139,8 @@ class SQLQueue(RAMQueue):
return 0
# Try to invoke
activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
if m.is_executed: # Make sure message could be invoked
if m.is_executed:
activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it
get_transaction().commit() # If successful, commit
else:
get_transaction().abort() # If not, abort transaction and start a new one
if line.priority > MAX_PRIORITY:
......@@ -146,12 +148,11 @@ class SQLQueue(RAMQueue):
activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE)
# Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else:
# Lower priority
activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
get_transaction().commit()
return 0
get_transaction().commit() # Release locks before starting a potentially long calculation
return 1
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment