Commit 8e9bfedf authored by Yoshinori Okuji's avatar Yoshinori Okuji

Do not delay or set to an error if an exception is raised in dequeueMessage,...

Do not delay or set to an error if an exception is raised in dequeueMessage, because invoke or invokeGroup should never emit an exception, so an exception should be nothing with a message itself.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@13713 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 76a4dd49
############################################################################## ##############################################################################
# #
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved. # Copyright (c) 2002,2007 Nexedi SA and Contributors. All Rights Reserved.
# Jean-Paul Smets-Solanes <jp@nexedi.com> # Jean-Paul Smets-Solanes <jp@nexedi.com>
# #
# WARNING: This program as such is intended to be used by professional # WARNING: This program as such is intended to be used by professional
...@@ -313,7 +313,7 @@ class SQLDict(RAMDict): ...@@ -313,7 +313,7 @@ class SQLDict(RAMDict):
break break
else: else:
get_transaction().abort() get_transaction().abort()
except Exception, exc: except:
# If an exception occurs, abort the transaction to minimize the impact, # If an exception occurs, abort the transaction to minimize the impact,
try: try:
get_transaction().abort() get_transaction().abort()
...@@ -321,33 +321,20 @@ class SQLDict(RAMDict): ...@@ -321,33 +321,20 @@ class SQLDict(RAMDict):
# Unfortunately, database adapters may raise an exception against abort. # Unfortunately, database adapters may raise an exception against abort.
LOG('SQLDict', WARNING, 'abort failed, thus some objects may be modified accidentally') LOG('SQLDict', WARNING, 'abort failed, thus some objects may be modified accidentally')
pass pass
# An exception happens at somewhere else but invoke or invokeGroup, so messages
# themselves should not be delayed.
try: try:
if isinstance(exc, ConflictError): for uid_list in uid_list_list:
# For a conflict error, simply delay the operations. if len(uid_list):
for uid_list in uid_list_list: # This only sets processing to zero.
if len(uid_list): activity_tool.SQLDict_setPriority(uid = uid_list)
activity_tool.SQLDict_setPriority(uid = uid_list, get_transaction().commit()
delay = VALIDATION_ERROR_DELAY,
retry = 1)
else:
# For other exceptions, put the messages to an invalid state immediately.
for uid_list in uid_list_list:
if len(uid_list):
activity_tool.SQLDict_assignMessage(uid = uid_list,
processing_node = INVOKE_ERROR_STATE)
LOG('SQLDict', WARNING,
'Error in ActivityTool.invoke', error=sys.exc_info())
get_transaction().commit()
except: except:
LOG('SQLDict', ERROR, 'SQLDict.dequeueMessage raised, and cannot even set processing to zero due to an exception', LOG('SQLDict', ERROR, 'SQLDict.dequeueMessage raised, and cannot even set processing to zero due to an exception',
error=sys.exc_info()) error=sys.exc_info())
raise raise
return 0 return 0
except:
LOG('SQLDict', ERROR, 'SQLDict.dequeueMessage raised an exception which is not a subclass of Exception',
error=sys.exc_info())
raise
try: try:
for i in xrange(len(message_list)): for i in xrange(len(message_list)):
......
############################################################################## ##############################################################################
# #
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved. # Copyright (c) 2002,2007 Nexedi SA and Contributors. All Rights Reserved.
# Jean-Paul Smets-Solanes <jp@nexedi.com> # Jean-Paul Smets-Solanes <jp@nexedi.com>
# #
# WARNING: This program as such is intended to be used by professional # WARNING: This program as such is intended to be used by professional
...@@ -42,7 +42,7 @@ try: ...@@ -42,7 +42,7 @@ try:
except ImportError: except ImportError:
pass pass
from zLOG import LOG, WARNING from zLOG import LOG, WARNING, ERROR
MAX_PRIORITY = 5 MAX_PRIORITY = 5
...@@ -125,7 +125,7 @@ class SQLQueue(RAMQueue): ...@@ -125,7 +125,7 @@ class SQLQueue(RAMQueue):
activity_tool.invoke(m) # Try to invoke the message activity_tool.invoke(m) # Try to invoke the message
if m.is_executed: # Make sure message could be invoked if m.is_executed: # Make sure message could be invoked
get_transaction().commit() # If successful, commit get_transaction().commit() # If successful, commit
except Exception, exc: except:
# If an exception occurs, abort the transaction to minimize the impact, # If an exception occurs, abort the transaction to minimize the impact,
try: try:
get_transaction().abort() get_transaction().abort()
...@@ -134,48 +134,50 @@ class SQLQueue(RAMQueue): ...@@ -134,48 +134,50 @@ class SQLQueue(RAMQueue):
LOG('SQLQueue', WARNING, 'abort failed, thus some objects may be modified accidentally') LOG('SQLQueue', WARNING, 'abort failed, thus some objects may be modified accidentally')
pass pass
if isinstance(exc, ConflictError): # An exception happens at somewhere else but invoke, so messages
# If a conflict occurs, delay the operation. # themselves should not be delayed.
activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
priority = line.priority)
else:
# For the other exceptions, put it into an error state.
activity_tool.SQLQueue_assignMessage(uid = line.uid,
processing_node = INVOKE_ERROR_STATE)
LOG('SQLQueue', WARNING,
'Error in ActivityTool.invoke', error=sys.exc_info())
get_transaction().commit()
return 0
if m.is_executed:
activity_tool.SQLQueue_delMessage(uid=[line.uid]) # Delete it
else:
try: try:
# If not, abort transaction and start a new one activity_tool.SQLQueue_setPriority(uid = line.uid, date = line.date,
get_transaction().abort() priority = line.priority)
get_transaction().commit()
except: except:
# Unfortunately, database adapters may raise an exception against abort. LOG('SQLQueue', ERROR, 'SQLQueue.dequeueMessage raised, and cannot even set processing to zero due to an exception',
LOG('SQLQueue', WARNING, 'abort failed, thus some objects may be modified accidentally') error=sys.exc_info())
pass raise
return 0
if type(m.exc_type) is ClassType \ try:
and issubclass(m.exc_type, ConflictError): if m.is_executed:
activity_tool.SQLQueue_setPriority(uid = line.uid, activity_tool.SQLQueue_delMessage(uid=[line.uid]) # Delete it
date = next_processing_date,
priority = line.priority)
elif line.priority > MAX_PRIORITY:
# This is an error
activity_tool.SQLQueue_assignMessage(uid = line.uid,
processing_node = INVOKE_ERROR_STATE)
# Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error
else: else:
# Lower priority try:
activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date, # If not, abort transaction and start a new one
priority = line.priority + 1) get_transaction().abort()
get_transaction().commit() except:
# Unfortunately, database adapters may raise an exception against abort.
LOG('SQLQueue', WARNING, 'abort failed, thus some objects may be modified accidentally')
pass
if type(m.exc_type) is ClassType \
and issubclass(m.exc_type, ConflictError):
activity_tool.SQLQueue_setPriority(uid = line.uid,
date = next_processing_date,
priority = line.priority)
elif line.priority > MAX_PRIORITY:
# This is an error
activity_tool.SQLQueue_assignMessage(uid = line.uid,
processing_node = INVOKE_ERROR_STATE)
# Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error
else:
# Lower priority
activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
priority = line.priority + 1)
get_transaction().commit()
except:
LOG('SQLQueue', ERROR, 'SQLQueue.dequeueMessage raised an exception during checking for the results of processed messages',
error=sys.exc_info())
raise
return 0 return 0
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
return 1 return 1
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment