Commit 31b4bb58 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: small optimizations

......@@ -203,30 +203,27 @@ class SQLBase(Queue):
processing_node=processing_node,
limit=limit,
group_method_id=group_method_id)
if len(line_list):
if line_list:
self._log(TRACE, 'Reserved messages: %r' % [x.uid for x in line_list])
return line_list
def getDuplicateMessageUidList(line):
uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool,
line=line, processing_node=processing_node)
if len(uid_list):
if uid_list:
self._log(TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
return uid_list
now_date = self.getNow(activity_tool)
message_list = []
group_method_id = None
uid_to_duplicate_uid_list_dict = {}
try:
result = getReservedMessageList(limit=1)
uid_to_duplicate_uid_list_dict = {}
if len(result) > 0:
result = getReservedMessageList(1)
if result:
line = result[0]
uid = line.uid
m = self.loadMessage(line.message, uid=uid, line=line)
message_list.append(m)
message_list = [m]
uid_to_duplicate_uid_list_dict[uid] = getDuplicateMessageUidList(line)
group_method_id = line.group_method_id
activity_tool.SQLBase_processMessage(table=self.sql_table, uid=[uid])
uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
.extend(getDuplicateMessageUidList(line))
if group_method_id != '\0':
# Count the number of objects to prevent too many objects.
cost = m.activity_kw.get('group_method_cost', .01)
......@@ -238,9 +235,9 @@ class SQLBase(Queue):
if limit > 1: # <=> cost * count < 1
cost *= count
# Retrieve objects which have the same group method.
result = getReservedMessageList(limit=limit,
group_method_id=group_method_id)
path_and_method_id_dict = {}
result = getReservedMessageList(limit, group_method_id)
if self.merge_duplicate:
path_and_method_id_dict = {(line.path, line.method_id): uid}
unreserve_uid_list = []
for line in result:
if line.uid == uid:
......@@ -254,12 +251,11 @@ class SQLBase(Queue):
key = line.path, line.method_id
original_uid = path_and_method_id_dict.get(key)
if original_uid is not None:
uid_to_duplicate_uid_list_dict.setdefault(original_uid, []) \
.append(line.uid)
uid_to_duplicate_uid_list_dict[original_uid].append(line.uid)
continue
path_and_method_id_dict[key] = line.uid
uid_to_duplicate_uid_list_dict.setdefault(line.uid, []) \
.extend(getDuplicateMessageUidList(line))
uid_to_duplicate_uid_list_dict[line.uid] = \
getDuplicateMessageUidList(line)
if cost < 1:
m = self.loadMessage(line.message, uid=line.uid, line=line)
cost += len(m.getObjectList(activity_tool)) * \
......@@ -272,22 +268,24 @@ class SQLBase(Queue):
# Unreserve extra messages as soon as possible.
self.makeMessageListAvailable(activity_tool=activity_tool,
uid_list=unreserve_uid_list)
return message_list, group_method_id, uid_to_duplicate_uid_list_dict
return message_list, group_method_id, uid_to_duplicate_uid_list_dict
except:
self._log(WARNING, 'Exception while reserving messages.')
if len(message_list):
to_free_uid_list = [m.uid for m in message_list]
if uid_to_duplicate_uid_list_dict:
to_free_uid_list = uid_to_duplicate_uid_list_dict.keys()
for uid_list in uid_to_duplicate_uid_list_dict.itervalues():
to_free_uid_list += uid_list
try:
self.makeMessageListAvailable(activity_tool=activity_tool,
uid_list=to_free_uid_list)
except:
self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
else:
if len(to_free_uid_list):
if to_free_uid_list:
self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
else:
self._log(TRACE, '(no message was reserved)')
return [], None, {}
return [], None, uid_to_duplicate_uid_list_dict
def _abort(self):
try:
......
......@@ -121,11 +121,9 @@ class SQLDict(SQLBase):
group_method_id=line.group_method_id,
)
uid_list = [x.uid for x in result]
if len(uid_list):
if uid_list:
activity_tool.SQLDict_reserveDuplicatedLineList(
processing_node=processing_node,
uid_list=uid_list
)
processing_node=processing_node, uid=uid_list)
else:
# Release locks
activity_tool.SQLDict_commit()
......
......@@ -9,13 +9,13 @@ class_file:
</dtml-comment>
<params>
processing_node
uid_list
uid
</params>
UPDATE
message
SET
processing_node=<dtml-sqlvar processing_node type="int">
WHERE
uid IN (<dtml-in uid_list><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>)
<dtml-sqltest uid type="int" multiple>
<dtml-var sql_delimiter>
COMMIT
......@@ -3650,6 +3650,23 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
finally:
del obj.__class__.doSomething
def test_duplicateGroupedMessage(self):
activity_tool = self.portal.portal_activities
obj = activity_tool.newActiveProcess()
obj.reindexObject(activate_kw={'tag': 'foo', 'after_tag': 'bar'})
transaction.commit()
invoked = []
def invokeGroup(self, *args):
invoked.append(len(args[1]))
return ActivityTool_invokeGroup(self, *args)
ActivityTool_invokeGroup = activity_tool.__class__.invokeGroup
try:
activity_tool.__class__.invokeGroup = invokeGroup
self.tic()
finally:
activity_tool.__class__.invokeGroup = ActivityTool_invokeGroup
self.assertEqual(invoked, [1])
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment