Commit 0793cd95 authored by Julien Muchembled's avatar Julien Muchembled

Refactoring: move finalizeMessageExecution to SQLBase

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@32876 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent b54a8418
...@@ -26,8 +26,17 @@ ...@@ -26,8 +26,17 @@
# #
############################################################################## ##############################################################################
from zLOG import LOG, INFO, WARNING import sys
from zLOG import LOG, TRACE, INFO, WARNING, ERROR
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
from Products.CMFActivity.ActivityTool import (
MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED)
from Products.CMFActivity.ActiveObject import (
INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE)
from Queue import VALIDATION_ERROR_DELAY
MAX_PRIORITY = 5
class SQLBase: class SQLBase:
""" """
...@@ -74,19 +83,14 @@ class SQLBase: ...@@ -74,19 +83,14 @@ class SQLBase:
priority = default priority = default
return priority return priority
def _retryOnLockError(self, method, args=(), kw=None): def _retryOnLockError(self, method, args=(), kw={}):
if kw is None:
kw = {}
while True: while True:
try: try:
result = method(*args, **kw) return method(*args, **kw)
except ConflictError: except ConflictError:
# Note that this code assumes that a database adapter translates # Note that this code assumes that a database adapter translates
# a lock error into a conflict error. # a lock error into a conflict error.
LOG('SQLBase', INFO, 'Got a lock error, retrying...') LOG('SQLBase', INFO, 'Got a lock error, retrying...')
else:
break
return result
def _validate_after_method_id(self, activity_tool, message, value): def _validate_after_method_id(self, activity_tool, message, value):
return self._validate(activity_tool, method_id=value) return self._validate(activity_tool, method_id=value)
...@@ -117,3 +121,115 @@ class SQLBase: ...@@ -117,3 +121,115 @@ class SQLBase:
def _validate_serialization_tag(self, activity_tool, message, value): def _validate_serialization_tag(self, activity_tool, message, value):
return self._validate(activity_tool, serialization_tag=value) return self._validate(activity_tool, serialization_tag=value)
def _log(self, severity, summary):
LOG(self.__class__.__name__, severity, summary,
error=severity>INFO and sys.exc_info() or None)
def finalizeMessageExecution(self, activity_tool, message_list,
uid_to_duplicate_uid_list_dict=None):
"""
If everything was fine, delete all messages.
If anything failed, make successful messages available (if any), and
the following rules apply to failed messages:
- Failures due to ConflictErrors cause messages to be postponed,
but their priority is *not* increased.
- Failures of messages already above maximum priority cause them to
be put in a permanent-error state.
- In all other cases, priority is increased and message is delayed.
"""
deletable_uid_list = []
delay_uid_list = []
final_error_uid_list = []
make_available_uid_list = []
notify_user_list = []
non_executable_message_list = []
executed_uid_list = deletable_uid_list
if uid_to_duplicate_uid_list_dict is not None:
for m in message_list:
if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
executed_uid_list = make_available_uid_list
break
for m in message_list:
uid = m.uid
if m.getExecutionState() == MESSAGE_EXECUTED:
executed_uid_list.append(uid)
if uid_to_duplicate_uid_list_dict is not None:
executed_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
# Should duplicate messages follow strictly the original message, or
# should they be just made available again ?
if uid_to_duplicate_uid_list_dict is not None:
make_available_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
priority = m.line.priority
# BACK: Only exceptions can be classes in Python 2.6.
# Once we drop support for Python 2.4,
# please, remove the "type(m.exc_type) is type(ConflictError)" check
# and leave only the "issubclass(m.exc_type, ConflictError)" check.
if type(m.exc_type) is type(ConflictError) and \
issubclass(m.exc_type, ConflictError):
delay_uid_list.append(uid)
elif priority > MAX_PRIORITY:
notify_user_list.append(m)
final_error_uid_list.append(uid)
else:
try:
# Immediately update, because values different for every message
activity_tool.SQLBase_reactivate(table=self.sql_table,
uid=[uid],
delay=None,
priority=priority + 1)
except:
self._log(WARNING, 'Failed to increase priority of %r' % uid)
delay_uid_list.append(uid)
else:
# Internal CMFActivity error: the message can not be executed because
# something is missing (context object cannot be found, method cannot
# be accessed on object).
non_executable_message_list.append(uid)
if deletable_uid_list:
try:
self._retryOnLockError(activity_tool.SQLBase_delMessage,
kw={'table': self.sql_table,
'uid': deletable_uid_list})
except:
self._log(ERROR, 'Failed to delete messages %r' % deletable_uid_list)
else:
self._log(TRACE, 'Deleted messages %r' % deletable_uid_list)
if delay_uid_list:
try:
# If this is a conflict error, do not lower the priority but only delay.
activity_tool.SQLBase_reactivate(table=self.sql_table,
uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY, priority=None)
except:
self._log(ERROR, 'Failed to delay %r' % delay_uid_list)
make_available_uid_list += delay_uid_list
if final_error_uid_list:
try:
activity_tool.SQLBase_assignMessage(table=self.sql_table,
uid=final_error_uid_list, processing_node=INVOKE_ERROR_STATE)
except:
self._log(ERROR, 'Failed to set message to error state for %r'
% final_error_uid_list)
if non_executable_message_list:
try:
activity_tool.SQLBase_assignMessage(table=self.sql_table,
uid=non_executable_message_list, processing_node=VALIDATE_ERROR_STATE)
except:
self._log(ERROR, 'Failed to set message to invalid path state for %r'
% non_executable_message_list)
if make_available_uid_list:
try:
self.makeMessageListAvailable(activity_tool=activity_tool,
uid_list=make_available_uid_list)
except:
self._log(ERROR, 'Failed to unreserve %r' % make_available_uid_list)
else:
self._log(TRACE, 'Freed messages %r' % make_available_uid_list)
try:
for m in notify_user_list:
m.notifyUser(activity_tool)
except:
# Notification failures must not cause this method to raise.
self._log(WARNING,
'Exception during notification phase of finalizeMessageExecution')
...@@ -27,8 +27,7 @@ ...@@ -27,8 +27,7 @@
############################################################################## ##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \ from Queue import VALID, INVALID_PATH, abortTransactionSynchronously
abortTransactionSynchronously
from RAMDict import RAMDict from RAMDict import RAMDict
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError from Products.CMFActivity.Errors import ActivityFlushError
...@@ -47,7 +46,6 @@ except ImportError: ...@@ -47,7 +46,6 @@ except ImportError:
from zLOG import LOG, TRACE, WARNING, ERROR, INFO, PANIC from zLOG import LOG, TRACE, WARNING, ERROR, INFO, PANIC
MAX_PRIORITY = 5
# Stop validating more messages when this limit is reached # Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000 MAX_VALIDATED_LIMIT = 1000
# Read up to this number of messages to validate. # Read up to this number of messages to validate.
...@@ -64,6 +62,8 @@ class SQLDict(RAMDict, SQLBase): ...@@ -64,6 +62,8 @@ class SQLDict(RAMDict, SQLBase):
and provide sequentiality. Should not create conflict and provide sequentiality. Should not create conflict
because use of OOBTree. because use of OOBTree.
""" """
sql_table = 'message'
# Transaction commit methods # Transaction commit methods
def prepareQueueMessageList(self, activity_tool, message_list): def prepareQueueMessageList(self, activity_tool, message_list):
message_list = [m for m in message_list if m.is_registered] message_list = [m for m in message_list if m.is_registered]
...@@ -104,7 +104,7 @@ class SQLDict(RAMDict, SQLBase): ...@@ -104,7 +104,7 @@ class SQLDict(RAMDict, SQLBase):
order_validation_text = order_validation_text) order_validation_text = order_validation_text)
uid_list = [x.uid for x in uid_list] uid_list = [x.uid for x in uid_list]
if len(uid_list)>0: if len(uid_list)>0:
activity_tool.SQLDict_delMessage(uid = uid_list) activity_tool.SQLBase_delMessage(table=self.sql_table, uid=uid_list)
def finishQueueMessage(self, activity_tool_path, m): def finishQueueMessage(self, activity_tool_path, m):
# Nothing to do in SQLDict. # Nothing to do in SQLDict.
...@@ -131,26 +131,6 @@ class SQLDict(RAMDict, SQLBase): ...@@ -131,26 +131,6 @@ class SQLDict(RAMDict, SQLBase):
message_list = activity_buffer.getMessageList(self) message_list = activity_buffer.getMessageList(self)
return [m for m in message_list if m.is_registered] return [m for m in message_list if m.is_registered]
def validateMessage(self, activity_tool, message, uid_list, priority, processing_node):
validation_state = message.validate(self, activity_tool, check_order_validation=0)
if validation_state is not VALID:
# There is a serious validation error - we must lower priority
if priority > MAX_PRIORITY:
# This is an error
if len(uid_list) > 0:
activity_tool.SQLDict_assignMessage(uid=uid_list, processing_node=VALIDATE_ERROR_STATE)
# Assign message back to 'error' state
#m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else:
# Lower priority
if len(uid_list) > 0: # Add some delay before new processing
activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY,
priority=priority + 1, retry=1)
get_transaction().commit() # Release locks before starting a potentially long calculation
return 0
return 1
def getReservedMessageList(self, activity_tool, date, processing_node, limit=None, group_method_id=None): def getReservedMessageList(self, activity_tool, date, processing_node, limit=None, group_method_id=None):
""" """
Get and reserve a list of messages. Get and reserve a list of messages.
...@@ -321,105 +301,6 @@ class SQLDict(RAMDict, SQLBase): ...@@ -321,105 +301,6 @@ class SQLDict(RAMDict, SQLBase):
LOG('SQLDict', TRACE, '(no message was reserved)') LOG('SQLDict', TRACE, '(no message was reserved)')
return [], 0, None, {} return [], 0, None, {}
def finalizeMessageExecution(self, activity_tool, message_list, uid_to_duplicate_uid_list_dict):
"""
If everything was fine, delete all messages.
If anything failed, make successful messages available (if any), and
the following rules apply to failed messages:
- Failures due to ConflictErrors cause messages to be postponed,
but their priority is *not* increased.
- Failures of messages already above maximum priority cause them to
be put in a permanent-error state.
- In all other cases, priority is increased and message is delayed.
"""
def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
deletable_uid_list = []
delay_uid_list = []
final_error_uid_list = []
make_available_uid_list = []
notify_user_list = []
non_executable_message_list = []
something_failed = (len([m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]) != 0)
for m in message_list:
uid = m.uid
if m.getExecutionState() == MESSAGE_EXECUTED:
if something_failed:
make_available_uid_list.append(uid)
make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
else:
deletable_uid_list.append(uid)
deletable_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
# Should duplicate messages follow strictly the original message, or
# should they be just made available again ?
make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
priority = m.line.priority
# BACK: Only exceptions can be classes in Python 2.6.
# Once we drop support for Python 2.4,
# please, remove the "type(m.exc_type) is type(ConflictError)" check
# and leave only the "issubclass(m.exc_type, ConflictError)" check.
if type(m.exc_type) is type(ConflictError) and \
issubclass(m.exc_type, ConflictError):
delay_uid_list.append(uid)
elif priority > MAX_PRIORITY:
notify_user_list.append(m)
final_error_uid_list.append(uid)
else:
try:
# Immediately update, because values different for every message
activity_tool.SQLDict_setPriority(
uid=[uid],
delay=None,
retry=None,
priority=priority + 1)
except:
LOG('SQLDict', WARNING, 'Failed to increase priority of %r' % (uid, ), error=sys.exc_info())
delay_uid_list.append(uid)
else:
# Internal CMFActivity error: the message can not be executed because
# something is missing (context object cannot be found, method cannot
# be accessed on object).
non_executable_message_list.append(uid)
if len(deletable_uid_list):
try:
self._retryOnLockError(activity_tool.SQLDict_delMessage, kw={'uid': deletable_uid_list})
except:
LOG('SQLDict', ERROR, 'Failed to delete messages %r' % (deletable_uid_list, ), error=sys.exc_info())
else:
LOG('SQLDict', TRACE, 'Deleted messages %r' % (deletable_uid_list, ))
if len(delay_uid_list):
try:
# If this is a conflict error, do not lower the priority but only delay.
activity_tool.SQLDict_setPriority(uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY, priority=None, retry=None)
except:
LOG('SQLDict', ERROR, 'Failed to delay %r' % (delay_uid_list, ), error=sys.exc_info())
make_available_uid_list += delay_uid_list
if len(final_error_uid_list):
try:
activity_tool.SQLDict_assignMessage(uid=final_error_uid_list,
processing_node=INVOKE_ERROR_STATE)
except:
LOG('SQLDict', ERROR, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info())
if len(non_executable_message_list):
try:
activity_tool.SQLDict_assignMessage(uid=non_executable_message_list, processing_node=VALIDATE_ERROR_STATE)
except:
LOG('SQLDict', ERROR, 'Failed to set message to invalid path state for %r' % (non_executable_message_list, ), error=sys.exc_info())
if len(make_available_uid_list):
try:
makeMessageListAvailable(make_available_uid_list)
except:
LOG('SQLDict', ERROR, 'Failed to unreserve %r' % (make_available_uid_list, ), error=sys.exc_info())
else:
LOG('SQLDict', TRACE, 'Freed messages %r' % (make_available_uid_list, ))
try:
for m in notify_user_list:
m.notifyUser(activity_tool)
except:
# Notification failures must not cause this method to raise.
LOG('SQLDict', WARNING, 'Exception during notification phase of finalizeMessageExecution', error=sys.exc_info())
# Queue semantic # Queue semantic
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict): def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
...@@ -595,7 +476,8 @@ class SQLDict(RAMDict, SQLBase): ...@@ -595,7 +476,8 @@ class SQLDict(RAMDict, SQLBase):
uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id, uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
order_validation_text=None) order_validation_text=None)
if len(uid_list)>0: if len(uid_list)>0:
activity_tool.SQLDict_delMessage(uid = [x.uid for x in uid_list]) activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=[x.uid for x in uid_list])
getMessageList = SQLBase.getMessageList getMessageList = SQLBase.getMessageList
...@@ -672,11 +554,13 @@ class SQLDict(RAMDict, SQLBase): ...@@ -672,11 +554,13 @@ class SQLDict(RAMDict, SQLBase):
if group_method_id is not None: if group_method_id is not None:
serialization_tag_group_method_id_dict[serialization_tag] = group_method_id serialization_tag_group_method_id_dict[serialization_tag] = group_method_id
if deletable_uid_list: if deletable_uid_list:
activity_tool.SQLDict_delMessage(uid=deletable_uid_list) activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=deletable_uid_list)
distributable_count = len(message_dict) distributable_count = len(message_dict)
if distributable_count: if distributable_count:
activity_tool.SQLDict_assignMessage(processing_node=0, uid=[message.uid for message in message_dict.itervalues()]) activity_tool.SQLBase_assignMessage(table=self.sql_table,
processing_node=0, uid=[m.uid for m in message_dict.itervalues()])
validated_count += distributable_count validated_count += distributable_count
if validated_count < MAX_VALIDATED_LIMIT: if validated_count < MAX_VALIDATED_LIMIT:
offset += READ_MESSAGE_LIMIT offset += READ_MESSAGE_LIMIT
......
...@@ -28,8 +28,7 @@ ...@@ -28,8 +28,7 @@
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from RAMQueue import RAMQueue from RAMQueue import RAMQueue
from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \ from Queue import VALID, INVALID_PATH, abortTransactionSynchronously
abortTransactionSynchronously
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError from Products.CMFActivity.Errors import ActivityFlushError
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
...@@ -48,7 +47,6 @@ except ImportError: ...@@ -48,7 +47,6 @@ except ImportError:
from zLOG import LOG, WARNING, ERROR, INFO, PANIC, TRACE from zLOG import LOG, WARNING, ERROR, INFO, PANIC, TRACE
MAX_PRIORITY = 5
# Stop validating more messages when this limit is reached # Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000 MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate. # Read this many messages to validate.
...@@ -73,6 +71,7 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -73,6 +71,7 @@ class SQLQueue(RAMQueue, SQLBase):
and provide sequentiality. Should not create conflict and provide sequentiality. Should not create conflict
because use of OOBTree. because use of OOBTree.
""" """
sql_table = 'message_queue'
def prepareQueueMessageList(self, activity_tool, message_list): def prepareQueueMessageList(self, activity_tool, message_list):
message_list = [m for m in message_list if m.is_registered] message_list = [m for m in message_list if m.is_registered]
...@@ -103,7 +102,7 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -103,7 +102,7 @@ class SQLQueue(RAMQueue, SQLBase):
def prepareDeleteMessage(self, activity_tool, m): def prepareDeleteMessage(self, activity_tool, m):
# Erase all messages in a single transaction # Erase all messages in a single transaction
#LOG("prepareDeleteMessage", 0, str(m.__dict__)) #LOG("prepareDeleteMessage", 0, str(m.__dict__))
activity_tool.SQLQueue_delMessage(uid = [m.uid]) activity_tool.SQLBase_delMessage(table=self.sql_table, uid=[m.uid])
def finishQueueMessage(self, activity_tool_path, m): def finishQueueMessage(self, activity_tool_path, m):
# Nothing to do in SQLQueue. # Nothing to do in SQLQueue.
...@@ -199,88 +198,6 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -199,88 +198,6 @@ class SQLQueue(RAMQueue, SQLBase):
LOG('SQLQueue', TRACE, '(no message was reserved)') LOG('SQLQueue', TRACE, '(no message was reserved)')
return [] return []
def finalizeMessageExecution(self, activity_tool, message_list):
def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
deletable_uid_list = []
delay_uid_list = []
final_error_uid_list = []
notify_user_list = []
non_executable_message_list = []
for m in message_list:
uid = m.uid
if m.getExecutionState() == MESSAGE_EXECUTED:
deletable_uid_list.append(uid)
elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
priority = m.line.priority
# BACK: Only exceptions can be classes in Python 2.6.
# Once we drop support for Python 2.4,
# please, remove the "type(m.exc_type) is type(ConflictError)" check
# and leave only the "issubclass(m.exc_type, ConflictError)" check.
if type(m.exc_type) is type(ConflictError) and \
issubclass(m.exc_type, ConflictError):
delay_uid_list.append(uid)
elif priority > MAX_PRIORITY:
notify_user_list.append(m)
final_error_uid_list.append(uid)
else:
try:
# Immediately update, because values different for every message
activity_tool.SQLQueue_setPriority(
uid=[uid],
delay=VALIDATION_ERROR_DELAY,
priority=priority + 1)
except:
LOG('SQLQueue', WARNING, 'Failed to increase priority of %r' % (uid, ), error=sys.exc_info())
try:
makeMessageListAvailable(delay_uid_list)
except:
LOG('SQLQueue', ERROR, 'Failed to unreserve %r' % (uid, ), error=sys.exc_info())
else:
LOG('SQLQueue', TRACE, 'Freed message %r' % (uid, ))
else:
# Internal CMFActivity error: the message can not be executed because
# something is missing (context object cannot be found, method cannot
# be accessed on object).
non_executable_message_list.append(uid)
if len(deletable_uid_list):
try:
self._retryOnLockError(activity_tool.SQLQueue_delMessage, kw={'uid': deletable_uid_list})
except:
LOG('SQLQueue', ERROR, 'Failed to delete messages %r' % (deletable_uid_list, ), error=sys.exc_info())
else:
LOG('SQLQueue', TRACE, 'Deleted messages %r' % (deletable_uid_list, ))
if len(delay_uid_list):
try:
# If this is a conflict error, do not lower the priority but only delay.
activity_tool.SQLQueue_setPriority(uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY, priority=None)
except:
LOG('SQLQueue', ERROR, 'Failed to delay %r' % (delay_uid_list, ), error=sys.exc_info())
try:
makeMessageListAvailable(delay_uid_list)
except:
LOG('SQLQueue', ERROR, 'Failed to unreserve %r' % (delay_uid_list, ), error=sys.exc_info())
else:
LOG('SQLQueue', TRACE, 'Freed messages %r' % (delay_uid_list, ))
if len(final_error_uid_list):
try:
activity_tool.SQLQueue_assignMessage(uid=final_error_uid_list,
processing_node=INVOKE_ERROR_STATE)
except:
LOG('SQLQueue', ERROR, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info())
if len(non_executable_message_list):
try:
activity_tool.SQLQueue_assignMessage(uid=non_executable_message_list,
processing_node=VALIDATE_ERROR_STATE)
except:
LOG('SQLQueue', ERROR, 'Failed to set message to invalid path state for %r' % (final_error_uid_list, ), error=sys.exc_info())
try:
for m in notify_user_list:
m.notifyUser(activity_tool)
except:
# Notification failures must not cause this method to raise.
LOG('SQLQueue', WARNING, 'Exception during notification phase of finalizeMessageExecution', error=sys.exc_info())
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
def makeMessageListAvailable(uid_list): def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list) self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
...@@ -430,7 +347,8 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -430,7 +347,8 @@ class SQLQueue(RAMQueue, SQLBase):
'Could not validate %s on %s' % (m.method_id , path)) 'Could not validate %s on %s' % (m.method_id , path))
if len(result): if len(result):
activity_tool.SQLQueue_delMessage(uid = [line.uid for line in result]) activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=[line.uid for line in result])
getMessageList = SQLBase.getMessageList getMessageList = SQLBase.getMessageList
...@@ -489,7 +407,8 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -489,7 +407,8 @@ class SQLQueue(RAMQueue, SQLBase):
validation_text_dict, now_date=now_date) validation_text_dict, now_date=now_date)
distributable_count = len(message_dict) distributable_count = len(message_dict)
if distributable_count: if distributable_count:
activity_tool.SQLQueue_assignMessage(processing_node=0, uid=[message.uid for message in message_dict.itervalues()]) activity_tool.SQLBase_assignMessage(table=self.sql_table,
processing_node=0, uid=[m.uid for m in message_dict.itervalues()])
validated_count += distributable_count validated_count += distributable_count
if validated_count < MAX_VALIDATED_LIMIT: if validated_count < MAX_VALIDATED_LIMIT:
offset += READ_MESSAGE_LIMIT offset += READ_MESSAGE_LIMIT
......
...@@ -7,17 +7,16 @@ cache_time:0 ...@@ -7,17 +7,16 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params> <params>table
processing_node processing_node
uid uid:list
</params> </params>
UPDATE message UPDATE
<dtml-var table>
SET SET
processing_node=<dtml-sqlvar processing_node type="int">, processing_node=<dtml-sqlvar processing_node type="int">,
processing=0 processing=0
WHERE WHERE
uid IN ( <dtml-sqltest uid type="int" multiple>
<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
<dtml-var sql_delimiter> <dtml-var sql_delimiter>
COMMIT COMMIT
...@@ -7,9 +7,10 @@ cache_time:0 ...@@ -7,9 +7,10 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>uid:list</params> <params>table
uid:list
</params>
DELETE FROM DELETE FROM
message <dtml-var table>
WHERE WHERE
<dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> OR </dtml-if> <dtml-sqltest uid type="int" multiple>
</dtml-in>
...@@ -7,25 +7,20 @@ cache_time:0 ...@@ -7,25 +7,20 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>uid:list <params>table
uid:list
priority priority
retry
delay delay
</params> </params>
UPDATE UPDATE
message <dtml-var table>
SET SET
processing = 0 processing = 0
<dtml-if expr="priority is not None"> <dtml-if expr="priority is not None">
, priority = <dtml-sqlvar priority type="int"> , priority = <dtml-sqlvar priority type="int">
</dtml-if> </dtml-if>
<dtml-if expr="delay is not None"> <dtml-if expr="delay is not None">
, date = DATE_ADD(UTC_TIMESTAMP(), INTERVAL <dtml-sqlvar delay type="int"> * (retry + 1) SECOND) , date = DATE_ADD(UTC_TIMESTAMP(), INTERVAL <dtml-sqlvar delay type="int"> SECOND)
</dtml-if>
<dtml-if expr="retry is not None">
, retry = retry + <dtml-sqlvar retry type="int">
</dtml-if> </dtml-if>
WHERE WHERE
uid IN ( <dtml-sqltest uid type="int" multiple>
<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
processing_node
uid</params>
UPDATE message_queue
SET
processing_node=<dtml-sqlvar processing_node type="int">,
processing=0
WHERE
uid IN (
<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>uid</params>
DELETE FROM
message_queue
WHERE
<dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> OR </dtml-if>
</dtml-in>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>uid
priority
delay
</params>
UPDATE
message_queue
SET
processing = 0
<dtml-if expr="priority is not None">
, priority = <dtml-sqlvar priority type="int">
</dtml-if>
<dtml-if expr="delay is not None">
, date = DATE_ADD(UTC_TIMESTAMP(), INTERVAL <dtml-sqlvar delay type="int"> SECOND)
</dtml-if>
WHERE
uid IN (
<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
...@@ -35,6 +35,7 @@ from Products.ERP5Type.tests.utils import DummyMailHost ...@@ -35,6 +35,7 @@ from Products.ERP5Type.tests.utils import DummyMailHost
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE,\ from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE,\
VALIDATE_ERROR_STATE VALIDATE_ERROR_STATE
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
from Products.CMFActivity.Activity.SQLDict import SQLDict
from Products.CMFActivity.Errors import ActivityPendingError, ActivityFlushError from Products.CMFActivity.Errors import ActivityPendingError, ActivityFlushError
#from Products.ERP5Type.Document.Organisation import Organisation #from Products.ERP5Type.Document.Organisation import Organisation
# The above cannot be imported at top level because it doesn't exist until # The above cannot be imported at top level because it doesn't exist until
...@@ -3512,7 +3513,8 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -3512,7 +3513,8 @@ class TestCMFActivity(ERP5TypeTestCase):
1) 1)
finally: finally:
# Clear activities from all nodes # Clear activities from all nodes
activity_tool.SQLDict_delMessage(uid=[message.uid for message in result]) activity_tool.SQLBase_delMessage(table=SQLDict.sql_table,
uid=[message.uid for message in result])
get_transaction().commit() get_transaction().commit()
def test_116_RaiseInCommitBeforeMessageExecution(self): def test_116_RaiseInCommitBeforeMessageExecution(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment