Commit 025251c6 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Ensure that SQLQueue never leave messages as processed.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@8158 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 563c86c1
...@@ -95,45 +95,56 @@ class SQLQueue(RAMQueue): ...@@ -95,45 +95,56 @@ class SQLQueue(RAMQueue):
# Make sure message can not be processed anylonger # Make sure message can not be processed anylonger
activity_tool.SQLQueue_processMessage(uid=line.uid) activity_tool.SQLQueue_processMessage(uid=line.uid)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
m = self.loadMessage(line.message)
# Make sure object exists # At this point, the message is marked as processed.
validation_state = m.validate(self, activity_tool) try:
if validation_state is not VALID: m = self.loadMessage(line.message)
if validation_state in (EXCEPTION, INVALID_PATH): # Make sure object exists
if line.priority > MAX_PRIORITY: validation_state = m.validate(self, activity_tool)
# This is an error if validation_state is not VALID:
activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE) if validation_state in (EXCEPTION, INVALID_PATH):
# Assign message back to 'error' state if line.priority > MAX_PRIORITY:
#m.notifyUser(activity_tool) # Notify Error # This is an error.
get_transaction().commit() # and commit # Assign message back to 'error' state.
activity_tool.SQLQueue_assignMessage(uid=line.uid,
processing_node = VALIDATE_ERROR_STATE)
get_transaction().commit() # and commit
else:
# Lower priority
activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
else: else:
# Lower priority # We do not lower priority for INVALID_ORDER errors but we do postpone execution
activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1) activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
priority = line.priority)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
else: return 0
# We do not lower priority for INVALID_ORDER errors but we do postpone execution except:
activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date, # If any exception occurs, catch it and delay the operation.
priority = line.priority) get_transaction().abort()
get_transaction().commit() # Release locks before starting a potentially long calculation activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
priority = line.priority)
get_transaction().commit()
return 0
# Try to invoke
activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
if m.is_executed: # Make sure message could be invoked
activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it
get_transaction().commit() # If successful, commit
else: else:
# Try to invoke get_transaction().abort() # If not, abort transaction and start a new one
activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ? if line.priority > MAX_PRIORITY:
if m.is_executed: # Make sure message could be invoked # This is an error
activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE)
get_transaction().commit() # If successful, commit # Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else: else:
get_transaction().abort() # If not, abort transaction and start a new one # Lower priority
if line.priority > MAX_PRIORITY: activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
# This is an error priority = line.priority + 1)
activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE) get_transaction().commit() # Release locks before starting a potentially long calculation
# Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else:
# Lower priority
activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
return 0 return 0
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
return 1 return 1
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment