Commit d881edd1 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: add message grouping support for processing SQLQueue

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@37686 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent e954de07
...@@ -148,12 +148,15 @@ class SQLBase: ...@@ -148,12 +148,15 @@ class SQLBase:
This number is guaranted not to be exceeded. This number is guaranted not to be exceeded.
If None (or not given) no limit apply. If None (or not given) no limit apply.
""" """
result = not group_method_id and \ select = activity_tool.SQLBase_selectReservedMessageList
activity_tool.SQLDict_selectReservedMessageList( result = not group_method_id and select(table=self.sql_table, count=limit,
processing_node=processing_node, count=limit) processing_node=processing_node)
if not result: if not result:
activity_tool.SQLDict_reserveMessageList(count=limit, processing_node=processing_node, to_date=date, group_method_id=group_method_id) activity_tool.SQLBase_reserveMessageList(table=self.sql_table,
result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit) count=limit, processing_node=processing_node, to_date=date,
group_method_id=group_method_id)
result = select(table=self.sql_table,
processing_node=processing_node, count=limit)
return result return result
def makeMessageListAvailable(self, activity_tool, uid_list): def makeMessageListAvailable(self, activity_tool, uid_list):
...@@ -161,7 +164,8 @@ class SQLBase: ...@@ -161,7 +164,8 @@ class SQLBase:
Put messages back in processing_node=0 . Put messages back in processing_node=0 .
""" """
if len(uid_list): if len(uid_list):
activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list) activity_tool.SQLBase_makeMessageListAvailable(table=self.sql_table,
uid=uid_list)
def getProcessableMessageList(self, activity_tool, processing_node): def getProcessableMessageList(self, activity_tool, processing_node):
""" """
...@@ -222,7 +226,7 @@ class SQLBase: ...@@ -222,7 +226,7 @@ class SQLBase:
m = self.loadMessage(line.message, uid=uid, line=line) m = self.loadMessage(line.message, uid=uid, line=line)
message_list.append(m) message_list.append(m)
group_method_id = line.group_method_id group_method_id = line.group_method_id
activity_tool.SQLDict_processMessage(uid=[uid]) activity_tool.SQLBase_processMessage(table=self.sql_table, uid=[uid])
uid_to_duplicate_uid_list_dict.setdefault(uid, []) \ uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
.extend(getDuplicateMessageUidList(line)) .extend(getDuplicateMessageUidList(line))
if group_method_id not in (None, '', '\0'): if group_method_id not in (None, '', '\0'):
...@@ -243,22 +247,26 @@ class SQLBase: ...@@ -243,22 +247,26 @@ class SQLBase:
# We read each line once so lines have distinct uids. # We read each line once so lines have distinct uids.
# So what remains to be filtered on are path, method_id and # So what remains to be filtered on are path, method_id and
# order_validation_text. # order_validation_text.
key = (line.path, line.method_id, line.order_validation_text) try:
original_uid = path_and_method_id_dict.get(key) key = line.path, line.method_id, line.order_validation_text
if original_uid is not None: except AttributeError:
uid_to_duplicate_uid_list_dict.setdefault(original_uid, []) \ pass # message_queue does not have 'order_validation_text'
.append(line.uid) else:
continue original_uid = path_and_method_id_dict.get(key)
path_and_method_id_dict[key] = line.uid if original_uid is not None:
uid_to_duplicate_uid_list_dict.setdefault(line.uid, []) \ uid_to_duplicate_uid_list_dict.setdefault(original_uid, []) \
.extend(getDuplicateMessageUidList(line)) .append(line.uid)
continue
path_and_method_id_dict[key] = line.uid
uid_to_duplicate_uid_list_dict.setdefault(line.uid, []) \
.extend(getDuplicateMessageUidList(line))
if count < MAX_GROUPED_OBJECTS: if count < MAX_GROUPED_OBJECTS:
m = self.loadMessage(line.message, uid=line.uid, line=line) m = self.loadMessage(line.message, uid=line.uid, line=line)
count += len(m.getObjectList(activity_tool)) count += len(m.getObjectList(activity_tool))
message_list.append(m) message_list.append(m)
else: else:
unreserve_uid_list.append(line.uid) unreserve_uid_list.append(line.uid)
activity_tool.SQLDict_processMessage( activity_tool.SQLBase_processMessage(table=self.sql_table,
uid=[m.uid for m in message_list]) uid=[m.uid for m in message_list])
# Unreserve extra messages as soon as possible. # Unreserve extra messages as soon as possible.
self.makeMessageListAvailable(activity_tool=activity_tool, self.makeMessageListAvailable(activity_tool=activity_tool,
...@@ -298,7 +306,8 @@ class SQLBase: ...@@ -298,7 +306,8 @@ class SQLBase:
group_method_id = group_method_id.split('\0')[0] group_method_id = group_method_id.split('\0')[0]
if group_method_id not in (None, ""): if group_method_id not in (None, ""):
method = activity_tool.invokeGroup method = activity_tool.invokeGroup
args = (group_method_id, message_list) args = (group_method_id, message_list, self.__class__.__name__,
self.merge_duplicate)
activity_runtime_environment = ActivityRuntimeEnvironment(None) activity_runtime_environment = ActivityRuntimeEnvironment(None)
else: else:
method = activity_tool.invoke method = activity_tool.invoke
......
...@@ -52,6 +52,7 @@ class SQLDict(RAMDict, SQLBase): ...@@ -52,6 +52,7 @@ class SQLDict(RAMDict, SQLBase):
because use of OOBTree. because use of OOBTree.
""" """
sql_table = 'message' sql_table = 'message'
merge_duplicate = True
# Transaction commit methods # Transaction commit methods
def prepareQueueMessageList(self, activity_tool, message_list): def prepareQueueMessageList(self, activity_tool, message_list):
......
...@@ -29,15 +29,9 @@ ...@@ -29,15 +29,9 @@
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from RAMQueue import RAMQueue from RAMQueue import RAMQueue
from Queue import VALID, INVALID_PATH from Queue import VALID, INVALID_PATH
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError from Products.CMFActivity.Errors import ActivityFlushError
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
from types import ClassType
import sys
from time import time
from SQLBase import SQLBase, sort_message_key from SQLBase import SQLBase, sort_message_key
from Products.CMFActivity.ActivityRuntimeEnvironment import (
ActivityRuntimeEnvironment, getTransactionalVariable)
from zExceptions import ExceptionFormatter from zExceptions import ExceptionFormatter
import transaction import transaction
...@@ -48,17 +42,6 @@ from zLOG import LOG, WARNING, ERROR, INFO, PANIC, TRACE ...@@ -48,17 +42,6 @@ from zLOG import LOG, WARNING, ERROR, INFO, PANIC, TRACE
MAX_VALIDATED_LIMIT = 1000 MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate. # Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000 READ_MESSAGE_LIMIT = 1000
# Process this many messages in each dequeueMessage call.
# Downside of setting to a "small" value: the cost of reserving a batch of
# few messages increases relatively to the cost of executing activities,
# making CMFActivity overhead significant.
# Downside of setting to a "big" value: if there are many slow activities in
# a multi-activity-node environment, multiple slow activities will be reserved
# by a single node, making a suboptimal use of the parallelisation offered by
# the cluster.
# Before increasing this value, consider using SQLDict with group methods
# first.
MESSAGE_BUNDLE_SIZE = 1
MAX_MESSAGE_LIST_SIZE = 100 MAX_MESSAGE_LIST_SIZE = 100
...@@ -69,6 +52,7 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -69,6 +52,7 @@ class SQLQueue(RAMQueue, SQLBase):
because use of OOBTree. because use of OOBTree.
""" """
sql_table = 'message_queue' sql_table = 'message_queue'
merge_duplicate = False
def prepareQueueMessageList(self, activity_tool, message_list): def prepareQueueMessageList(self, activity_tool, message_list):
message_list = [m for m in message_list if m.is_registered] message_list = [m for m in message_list if m.is_registered]
...@@ -83,6 +67,8 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -83,6 +67,8 @@ class SQLQueue(RAMQueue, SQLBase):
method_id_list = [m.method_id for m in registered_message_list] method_id_list = [m.method_id for m in registered_message_list]
priority_list = [m.activity_kw.get('priority', 1) for m in registered_message_list] priority_list = [m.activity_kw.get('priority', 1) for m in registered_message_list]
date_list = [m.activity_kw.get('at_date', None) for m in registered_message_list] date_list = [m.activity_kw.get('at_date', None) for m in registered_message_list]
group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''), message.activity_kw.get('group_id', '')])
for message in registered_message_list]
tag_list = [m.activity_kw.get('tag', '') for m in registered_message_list] tag_list = [m.activity_kw.get('tag', '') for m in registered_message_list]
serialization_tag_list = [m.activity_kw.get('serialization_tag', '') for m in registered_message_list] serialization_tag_list = [m.activity_kw.get('serialization_tag', '') for m in registered_message_list]
dumped_message_list = [self.dumpMessage(m) for m in registered_message_list] dumped_message_list = [self.dumpMessage(m) for m in registered_message_list]
...@@ -92,6 +78,7 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -92,6 +78,7 @@ class SQLQueue(RAMQueue, SQLBase):
method_id_list=method_id_list, method_id_list=method_id_list,
priority_list=priority_list, priority_list=priority_list,
message_list=dumped_message_list, message_list=dumped_message_list,
group_method_id_list = group_method_id_list,
date_list=date_list, date_list=date_list,
tag_list=tag_list, tag_list=tag_list,
processing_node_list=None, processing_node_list=None,
...@@ -110,162 +97,14 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -110,162 +97,14 @@ class SQLQueue(RAMQueue, SQLBase):
# Nothing to do in SQLQueue. # Nothing to do in SQLQueue.
pass pass
def getReservedMessageList(self, activity_tool, date, processing_node, limit=None): def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
""" """
Get and reserve a list of messages. Reserve unreserved messages matching given line.
limit Return their uids.
Maximum number of messages to fetch.
This number is not garanted to be reached, because of:
- not enough messages being pending execution
- race condition (other nodes reserving the same messages at the same
time)
This number is guaranted not to be exceeded.
If None (or not given) no limit apply.
""" """
result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, count=limit) return ()
if len(result) == 0:
activity_tool.SQLQueue_reserveMessageList(count=limit, processing_node=processing_node, to_date=date)
result = activity_tool.SQLQueue_selectReservedMessageList(processing_node=processing_node, count=limit)
return result
def makeMessageListAvailable(self, activity_tool, uid_list):
"""
Put messages back in processing_node=0 .
"""
if len(uid_list):
activity_tool.SQLQueue_makeMessageListAvailable(uid_list=uid_list)
def getProcessableMessageList(self, activity_tool, processing_node):
"""
Always true:
For each reserved message, delete redundant messages when it gets
reserved (definitely lost, but they are expandable since redundant).
- reserve a message
- set reserved message to processing=1 state
- if this message has a group_method_id:
- reserve a bunch of BUNDLE_MESSAGE_COUNT messages
- untill number of impacted objects goes over MAX_GROUPED_OBJECTS
- get one message from the reserved bunch (this messages will be
"needed")
- increase the number of impacted object
- set "needed" reserved messages to processing=1 state
- unreserve "unneeded" messages
- return still-reserved message list
If any error happens in above described process, try to unreserve all
messages already reserved in that process.
If it fails, complain loudly that some messages might still be in an
unclean state.
Returned values:
list of messages
"""
def getReservedMessageList(limit):
line_list = self.getReservedMessageList(activity_tool=activity_tool,
date=now_date,
processing_node=processing_node,
limit=limit)
if len(line_list):
LOG('SQLQueue', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
return line_list
def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
now_date = self.getNow(activity_tool)
message_list = []
try:
result = getReservedMessageList(limit=MESSAGE_BUNDLE_SIZE)
for line in result:
m = self.loadMessage(line.message, uid=line.uid, line=line)
message_list.append(m)
if len(message_list):
activity_tool.SQLQueue_processMessage(uid=[m.uid for x in message_list])
return message_list
except:
LOG('SQLQueue', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
if len(message_list):
to_free_uid_list = [m.uid for m in message_list]
try:
makeMessageListAvailable(to_free_uid_list)
except:
LOG('SQLQueue', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
else:
if len(to_free_uid_list):
LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
else:
LOG('SQLQueue', TRACE, '(no message was reserved)')
return []
def dequeueMessage(self, activity_tool, processing_node):
def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
message_list = \
self.getProcessableMessageList(activity_tool, processing_node)
if message_list:
processing_stop_time = time() + 30 # Stop processing after more than 10 seconds were spent
processed_count = 0
# Commit right before executing messages.
# As MySQL transaction does not start exactly at the same time as ZODB
# transactions but a bit later, messages available might be called
# on objects which are not available - or available in an old
# version - to ZODB connector.
# So all connectors must be committed now that we have selected
# everything needed from MySQL to get a fresh view of ZODB objects.
transaction.commit()
tv = getTransactionalVariable(None)
for m in message_list:
tv['activity_runtime_environment'] = ActivityRuntimeEnvironment(m)
processed_count += 1
# Try to invoke
try:
activity_tool.invoke(m)
if m.getExecutionState() != MESSAGE_NOT_EXECUTED:
# Commit so that if a message raises it doesn't causes previous
# successfull messages to be rolled back. This commit might fail,
# so it is protected the same way as activity execution by the
# same "try" block.
transaction.commit()
else:
# This message failed, abort.
transaction.abort()
except:
value = m.uid, m.object_path, m.method_id
LOG('SQLQueue', WARNING, 'Exception raised when invoking message (uid, path, method_id) %r' % (value, ), error=sys.exc_info())
try:
transaction.abort()
except:
# Unfortunately, database adapters may raise an exception against abort.
LOG('SQLQueue', PANIC, 'abort failed, thus some objects may be modified accidentally')
raise
# We must make sure that the message is not set as executed.
# It is possible that the message is executed but the commit
# of the transaction fails
m.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
# XXX Is it still useful to free message now that this node is able
# to reselect it ?
try:
makeMessageListAvailable([m.uid])
except:
LOG('SQLQueue', ERROR, 'Failed to free message: %r' % (value, ), error=sys.exc_info())
else:
LOG('SQLQueue', TRACE, 'Freed message %r' % (value, ))
if time() > processing_stop_time:
LOG('SQLQueue', TRACE, 'Stop processing message batch because processing delay exceeded')
break
# Release all unprocessed messages
to_free_uid_list = [m.uid for m in message_list[processed_count:]]
if to_free_uid_list:
try:
makeMessageListAvailable(to_free_uid_list)
except:
LOG('SQLQueue', ERROR, 'Failed to free remaining messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
else:
LOG('SQLQueue', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
self.finalizeMessageExecution(activity_tool,
message_list[:processed_count])
transaction.commit()
return not message_list
dequeueMessage = SQLBase.dequeueMessage
def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None): def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None) hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
......
...@@ -1172,7 +1172,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -1172,7 +1172,7 @@ class ActivityTool (Folder, UniqueObject):
for held in my_self.REQUEST._held: for held in my_self.REQUEST._held:
self.REQUEST._hold(held) self.REQUEST._hold(held)
def invokeGroup(self, method_id, message_list): def invokeGroup(self, method_id, message_list, activity, merge_duplicate):
if self.activity_tracking: if self.activity_tracking:
activity_tracking_logger.info( activity_tracking_logger.info(
'invoking group messages: method_id=%s, paths=%s' 'invoking group messages: method_id=%s, paths=%s'
...@@ -1202,20 +1202,22 @@ class ActivityTool (Folder, UniqueObject): ...@@ -1202,20 +1202,22 @@ class ActivityTool (Folder, UniqueObject):
else: else:
subobject_list = (obj,) subobject_list = (obj,)
for subobj in subobject_list: for subobj in subobject_list:
path = subobj.getPath() if merge_duplicate:
if path not in path_set: path = subobj.getPath()
if path in path_set:
continue
path_set.add(path) path_set.add(path)
if alternate_method_id is not None \ if alternate_method_id is not None \
and hasattr(aq_base(subobj), alternate_method_id): and hasattr(aq_base(subobj), alternate_method_id):
# if this object is alternated, # if this object is alternated,
# generate a new single active object # generate a new single active object
activity_kw = m.activity_kw.copy() activity_kw = m.activity_kw.copy()
activity_kw.pop('group_method_id', None) activity_kw.pop('group_method_id', None)
activity_kw.pop('group_id', None) activity_kw.pop('group_id', None)
active_obj = subobj.activate(**activity_kw) active_obj = subobj.activate(activity=activity, **activity_kw)
getattr(active_obj, alternate_method_id)(*m.args, **m.kw) getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
else: else:
expanded_object_list.append((subobj, m.args, m.kw)) expanded_object_list.append((subobj, m.args, m.kw))
new_message_list.append((m, obj)) new_message_list.append((m, obj))
except: except:
m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self) m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
......
...@@ -7,15 +7,14 @@ cache_time:0 ...@@ -7,15 +7,14 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>uid_list</params> <params>table
uid</params>
UPDATE UPDATE
message <dtml-var table>
SET SET
processing_node=0, processing_node=0,
processing=0 processing=0
WHERE WHERE
uid IN ( <dtml-sqltest uid type="int" multiple>
<dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
)
<dtml-var sql_delimiter> <dtml-var sql_delimiter>
COMMIT COMMIT
...@@ -7,14 +7,14 @@ cache_time:0 ...@@ -7,14 +7,14 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>uid</params> <params>table
UPDATE message uid</params>
UPDATE
<dtml-var table>
SET SET
processing_date = UTC_TIMESTAMP(), processing_date = UTC_TIMESTAMP(),
processing = 1 processing = 1
WHERE WHERE
uid IN ( <dtml-sqltest uid type="int" multiple>
<dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
<dtml-var sql_delimiter> <dtml-var sql_delimiter>
COMMIT COMMIT
...@@ -7,19 +7,22 @@ cache_time:0 ...@@ -7,19 +7,22 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>processing_node <params>table
processing_node
to_date to_date
count count
group_method_id group_method_id
</params> </params>
UPDATE UPDATE
message <dtml-var table>
SET SET
processing_node=<dtml-sqlvar processing_node type="int"> processing_node=<dtml-sqlvar processing_node type="int">
WHERE WHERE
processing_node=0 processing_node=0
AND date <= <dtml-sqlvar to_date type="datetime"> AND date <= <dtml-sqlvar to_date type="datetime">
<dtml-if expr="group_method_id is not None"> AND group_method_id = <dtml-sqlvar group_method_id type="string"> </dtml-if> <dtml-if expr="group_method_id is not None">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
</dtml-if>
ORDER BY ORDER BY
<dtml-comment> <dtml-comment>
Explanation of the order by: Explanation of the order by:
......
...@@ -7,12 +7,13 @@ cache_time:0 ...@@ -7,12 +7,13 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>processing_node <params>table
processing_node
count</params> count</params>
SELECT SELECT
* *
FROM FROM
message_queue <dtml-var table>
WHERE WHERE
processing_node = <dtml-sqlvar processing_node type="int"> processing_node = <dtml-sqlvar processing_node type="int">
<dtml-if expr="count is not None"> <dtml-if expr="count is not None">
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>processing_node
count</params>
SELECT
*
FROM
message
WHERE
processing_node = <dtml-sqlvar processing_node type="int">
<dtml-if expr="count is not None">
LIMIT <dtml-sqlvar count type="int">
</dtml-if>
...@@ -18,6 +18,7 @@ CREATE TABLE `message_queue` ( ...@@ -18,6 +18,7 @@ CREATE TABLE `message_queue` (
`processing` TINYINT NOT NULL DEFAULT 0, `processing` TINYINT NOT NULL DEFAULT 0,
`processing_date` DATETIME, `processing_date` DATETIME,
`priority` TINYINT NOT NULL DEFAULT 0, `priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL, `tag` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL, `serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0, `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>uid_list</params>
UPDATE
message_queue
SET
processing_node=0,
processing=0
WHERE
uid IN (
<dtml-in prefix="uid" expr="uid_list"><dtml-sqlvar uid_item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>
)
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>processing_node
to_date
count
</params>
UPDATE
message_queue
SET
processing_node=<dtml-sqlvar processing_node type="int">
WHERE
processing_node=0
AND date <= <dtml-sqlvar to_date type="datetime">
ORDER BY
<dtml-comment>
Explanation of the order by:
- priority must be respected (it is a feature)
- when multiple nodes simultaneously try to fetch activities, they should not
be given the same set of lines as it would cause all minus one to wait for
a write lock (and be ultimately aborted), effectively serializing their
action (so breaking paralellism).
So we must force MySQL to update lines in a random order.
</dtml-comment>
priority, RAND()
LIMIT <dtml-sqlvar count type="int">
<dtml-var sql_delimiter>
COMMIT
...@@ -15,11 +15,12 @@ message_list ...@@ -15,11 +15,12 @@ message_list
priority_list priority_list
processing_node_list processing_node_list
date_list date_list
group_method_id_list
tag_list tag_list
serialization_tag_list serialization_tag_list
</params> </params>
INSERT INTO message_queue INSERT INTO message_queue
(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, tag, serialization_tag, message) (uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, message)
VALUES VALUES
<dtml-in prefix="loop" expr="_.range(_.len(path_list))"> <dtml-in prefix="loop" expr="_.range(_.len(path_list))">
<dtml-if sequence-start><dtml-else>,</dtml-if> <dtml-if sequence-start><dtml-else>,</dtml-if>
...@@ -32,6 +33,7 @@ VALUES ...@@ -32,6 +33,7 @@ VALUES
<dtml-if expr="processing_node_list is not None"><dtml-sqlvar expr="processing_node_list[loop_item]" type="int"><dtml-else>-1</dtml-if>, <dtml-if expr="processing_node_list is not None"><dtml-sqlvar expr="processing_node_list[loop_item]" type="int"><dtml-else>-1</dtml-if>,
0, 0,
<dtml-sqlvar expr="priority_list[loop_item]" type="int">, <dtml-sqlvar expr="priority_list[loop_item]" type="int">,
<dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="tag_list[loop_item]" type="string">, <dtml-sqlvar expr="tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="serialization_tag_list[loop_item]" type="string">, <dtml-sqlvar expr="serialization_tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="message_list[loop_item]" type="string"> <dtml-sqlvar expr="message_list[loop_item]" type="string">
......
...@@ -430,7 +430,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -430,7 +430,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
def TryActiveProcessInsideActivity(self, activity): def TryActiveProcessInsideActivity(self, activity):
""" """
Try two levels with active_process, we create one first Try two levels with active_process, we create one first
activity with an acitive process, then this new activity activity with an active process, then this new activity
uses another active process uses another active process
""" """
portal = self.getPortal() portal = self.getPortal()
...@@ -1872,27 +1872,26 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1872,27 +1872,26 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
getattr(organisation, 'uid') getattr(organisation, 'uid')
def test_80_CallWithGroupIdParamater(self, quiet=0, run=run_all_test): def callWithGroupIdParamater(self, activity, quiet, run):
"""
Test that group_id parameter is used to separate execution of the same method
"""
if not run: return if not run: return
if not quiet: if not quiet:
message = '\nTest Activity with group_id parameter' message = '\nTest Activity with group_id parameter (%s)' % activity
ZopeTestCase._print(message) ZopeTestCase._print(message)
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
portal = self.getPortal() portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id) organisation = portal.organisation._getOb(self.company_id)
# Defined a group method # Defined a group method
foobar_list = []
def setFoobar(self, object_list): def setFoobar(self, object_list):
foobar_list.append(len(object_list))
for obj, args, kw in object_list: for obj, args, kw in object_list:
number = kw.get('number', 1) number = kw.get('number', 1)
if getattr(obj,'foobar', None) is not None: if getattr(obj,'foobar', None) is not None:
obj.foobar = obj.foobar + number obj.foobar = obj.foobar + number
else: else:
obj.foobar = number obj.foobar = number
object_list[:] = [] del object_list[:]
from Products.ERP5Type.Document.Folder import Folder from Products.ERP5Type.Document.Folder import Folder
Folder.setFoobar = setFoobar Folder.setFoobar = setFoobar
...@@ -1905,46 +1904,65 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1905,46 +1904,65 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Test group_method_id is working without group_id # Test group_method_id is working without group_id
for x in xrange(5): for x in xrange(5):
organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar").reindexObject(number=1) organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar").reindexObject(number=1)
transaction.commit() transaction.commit()
message_list = portal.portal_activities.getMessageList() message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),5) self.assertEquals(len(message_list),5)
portal.portal_activities.distribute() portal.portal_activities.distribute()
portal.portal_activities.tic() portal.portal_activities.tic()
self.assertEquals(1, organisation.getFoobar()) expected = dict(SQLDict=1, SQLQueue=5)[activity]
self.assertEquals(expected, organisation.getFoobar())
# Test group_method_id is working with one group_id defined # Test group_method_id is working with one group_id defined
for x in xrange(5): for x in xrange(5):
organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1) organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
transaction.commit() transaction.commit()
message_list = portal.portal_activities.getMessageList() message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),5) self.assertEquals(len(message_list),5)
portal.portal_activities.distribute() portal.portal_activities.distribute()
portal.portal_activities.tic() portal.portal_activities.tic()
self.assertEquals(2, organisation.getFoobar()) self.assertEquals(expected * 2, organisation.getFoobar())
self.assertEquals([expected, expected], foobar_list)
del foobar_list[:]
# Test group_method_id is working with many group_id defined # Test group_method_id is working with many group_id defined
for x in xrange(5): for x in xrange(5):
organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1) organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
transaction.commit() transaction.commit()
organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="2").reindexObject(number=3) organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="2").reindexObject(number=3)
transaction.commit() transaction.commit()
organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1) organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
transaction.commit() transaction.commit()
organisation.activate(activity='SQLDict', group_method_id="organisation_module/setFoobar", group_id="3").reindexObject(number=5) organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="3").reindexObject(number=5)
transaction.commit() transaction.commit()
message_list = portal.portal_activities.getMessageList() message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),20) self.assertEquals(len(message_list),20)
portal.portal_activities.distribute() portal.portal_activities.distribute()
portal.portal_activities.tic() portal.portal_activities.tic()
self.assertEquals(11, organisation.getFoobar()) self.assertEquals(dict(SQLDict=11, SQLQueue=60)[activity],
organisation.getFoobar())
self.assertEquals(dict(SQLDict=[1, 1, 1], SQLQueue=[5, 5, 10])[activity],
sorted(foobar_list))
message_list = portal.portal_activities.getMessageList() message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list), 0) self.assertEquals(len(message_list), 0)
def test_80a_CallWithGroupIdParamaterSQLDict(self, quiet=0, run=run_all_test):
"""
Test that group_id parameter is used to separate execution of the same method
"""
self.callWithGroupIdParamater('SQLDict', quiet=quiet, run=run)
def test_80b_CallWithGroupIdParamaterSQLQueue(self, quiet=0,
run=run_all_test):
"""
Test that group_id parameter is used to separate execution of the same method
"""
self.callWithGroupIdParamater('SQLQueue', quiet=quiet, run=run)
def test_81_ActivateKwForWorkflowTransition(self, quiet=0, run=run_all_test): def test_81_ActivateKwForWorkflowTransition(self, quiet=0, run=run_all_test):
""" """
...@@ -3148,7 +3166,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -3148,7 +3166,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(len(message_list), 1) self.assertEqual(len(message_list), 1)
message = message_list[0] message = message_list[0]
portal.organisation_module._delOb(organisation.id) portal.organisation_module._delOb(organisation.id)
activity_tool.invokeGroup('getTitle', [message]) activity_tool.invokeGroup('getTitle', [message], 'SQLDict', True)
checkMessage(message, KeyError) checkMessage(message, KeyError)
activity_tool.manageCancel(message.object_path, message.method_id) activity_tool.manageCancel(message.object_path, message.method_id)
# 2: activity method does not exist when activity is executed # 2: activity method does not exist when activity is executed
...@@ -3157,7 +3175,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -3157,7 +3175,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
message_list = activity_tool.getMessageList() message_list = activity_tool.getMessageList()
self.assertEqual(len(message_list), 1) self.assertEqual(len(message_list), 1)
message = message_list[0] message = message_list[0]
activity_tool.invokeGroup('this_method_does_not_exist', [message]) activity_tool.invokeGroup('this_method_does_not_exist',
[message], 'SQLDict', True)
checkMessage(message, KeyError) checkMessage(message, KeyError)
activity_tool.manageCancel(message.object_path, message.method_id) activity_tool.manageCancel(message.object_path, message.method_id)
...@@ -3739,7 +3758,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -3739,7 +3758,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.TryNotificationSavedOnEventLogWhenSiteErrorLoggerRaises('SQLDict') self.TryNotificationSavedOnEventLogWhenSiteErrorLoggerRaises('SQLDict')
@expectedFailure
def test_123_userNotificationSavedOnEventLogWhenSiteErrorLoggerRaisesWithSQLQueue(self, quiet=0, run=run_all_test): def test_123_userNotificationSavedOnEventLogWhenSiteErrorLoggerRaisesWithSQLQueue(self, quiet=0, run=run_all_test):
if not run: return if not run: return
if not quiet: if not quiet:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment