Commit 55bbf2e7 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: make SQLQueue behave like SQLDict for distribution of serialized grouped messages

Note 'distribute' method is not merged into SQLBase, for 2 reasons:
- SQLQueue still differs from SQLDict because it does not remove duplicate
  messages.
- 'order_validation_text' column only exists in 'message' table

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@37687 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent d881edd1
...@@ -300,9 +300,10 @@ class SQLDict(RAMDict, SQLBase): ...@@ -300,9 +300,10 @@ class SQLDict(RAMDict, SQLBase):
message_list.sort(key=sort_message_key) message_list.sort(key=sort_message_key)
deletable_uid_list += [m.uid for m in message_list[1:]] deletable_uid_list += [m.uid for m in message_list[1:]]
message = message_list[0] message = message_list[0]
distributable_uid_set.add(message.uid)
serialization_tag = message.activity_kw.get('serialization_tag') serialization_tag = message.activity_kw.get('serialization_tag')
if serialization_tag is not None: if serialization_tag is None:
distributable_uid_set.add(message.uid)
else:
serialization_tag_dict.setdefault(serialization_tag, serialization_tag_dict.setdefault(serialization_tag,
[]).append(message) []).append(message)
# Don't let through if there is the same serialization tag in the # Don't let through if there is the same serialization tag in the
...@@ -312,15 +313,15 @@ class SQLDict(RAMDict, SQLBase): ...@@ -312,15 +313,15 @@ class SQLDict(RAMDict, SQLBase):
# does not stop validating together. Because those messages should # does not stop validating together. Because those messages should
# be processed together at once. # be processed together at once.
for message_list in serialization_tag_dict.itervalues(): for message_list in serialization_tag_dict.itervalues():
if len(message_list) == 1:
continue
# Sort list of messages to validate the message with highest score # Sort list of messages to validate the message with highest score
message_list.sort(key=sort_message_key) message_list.sort(key=sort_message_key)
distributable_uid_set.add(message_list[0].uid)
group_method_id = message_list[0].activity_kw.get('group_method_id') group_method_id = message_list[0].activity_kw.get('group_method_id')
if group_method_id is None:
continue
for message in message_list[1:]: for message in message_list[1:]:
if group_method_id is None or \ if group_method_id == message.activity_kw.get('group_method_id'):
group_method_id != message.activity_kw.get('group_method_id'): distributable_uid_set.add(message.uid)
distributable_uid_set.remove(message.uid)
if deletable_uid_list: if deletable_uid_list:
activity_tool.SQLBase_delMessage(table=self.sql_table, activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=deletable_uid_list) uid=deletable_uid_list)
......
...@@ -255,6 +255,12 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -255,6 +255,12 @@ class SQLQueue(RAMQueue, SQLBase):
# Sort list of messages to validate the message with highest score # Sort list of messages to validate the message with highest score
message_list.sort(key=sort_message_key) message_list.sort(key=sort_message_key)
distributable_uid_set.add(message_list[0].uid) distributable_uid_set.add(message_list[0].uid)
group_method_id = message_list[0].activity_kw.get('group_method_id')
if group_method_id is None:
continue
for message in message_list[1:]:
if group_method_id == message.activity_kw.get('group_method_id'):
distributable_uid_set.add(message.uid)
distributable_count = len(distributable_uid_set) distributable_count = len(distributable_uid_set)
if distributable_count: if distributable_count:
activity_tool.SQLBase_assignMessage(table=self.sql_table, activity_tool.SQLBase_assignMessage(table=self.sql_table,
......
...@@ -3798,7 +3798,49 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -3798,7 +3798,49 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool.manageClearActivities(keep=0) activity_tool.manageClearActivities(keep=0)
finally: finally:
SQLQueue.MAX_MESSAGE_LIST_SIZE = old_MAX_MESSAGE_LIST_SIZE SQLQueue.MAX_MESSAGE_LIST_SIZE = old_MAX_MESSAGE_LIST_SIZE
def test_125_CheckDistributeWithSerializationTagAndGroupMethodId(self):
activity_tool = self.portal.portal_activities
obj1 = activity_tool.newActiveProcess()
obj2 = activity_tool.newActiveProcess()
transaction.commit()
self.tic()
group_method_call_list = []
def doSomething(self, message_list):
group_method_call_list.append(sorted((obj.getPath(), args, kw)
for obj, args, kw in message_list))
del message_list[:]
activity_tool.__class__.doSomething = doSomething
try:
for activity in 'SQLDict', 'SQLQueue':
activity_kw = dict(activity=activity, serialization_tag=self.id(),
group_method_id='portal_activities/doSomething')
obj1.activate(**activity_kw).dummy(1, x=None)
obj2.activate(**activity_kw).dummy(2, y=None)
transaction.commit()
activity_tool.distribute()
activity_tool.tic()
self.assertEqual(group_method_call_list.pop(),
sorted([(obj1.getPath(), (1,), dict(x=None)),
(obj2.getPath(), (2,), dict(y=None))]))
self.assertFalse(group_method_call_list)
self.assertFalse(activity_tool.getMessageList())
obj1.activate(priority=2, **activity_kw).dummy1(1, x=None)
obj1.activate(priority=1, **activity_kw).dummy2(2, y=None)
message1 = obj1.getPath(), (1,), dict(x=None)
message2 = obj1.getPath(), (2,), dict(y=None)
transaction.commit()
activity_tool.distribute()
self.assertEqual(len(activity_tool.getMessageList()), 2)
activity_tool.tic()
self.assertEqual(group_method_call_list.pop(),
dict(SQLDict=[message2],
SQLQueue=[message1, message2])[activity])
self.assertFalse(group_method_call_list)
self.assertFalse(activity_tool.getMessageList())
finally:
del activity_tool.__class__.doSomething
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity)) suite.addTest(unittest.makeSuite(TestCMFActivity))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment