Commit 94c6a887 authored by Julien Muchembled's avatar Julien Muchembled

Fully respect priorities and dates when distributing activities

- When deleting duplicate messages, keep the one the highest score (priority,
  date, uid).
- When several messages have the same serialization_tag, always validate first
  the one with the highest score.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@34632 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 54da2428
...@@ -507,62 +507,61 @@ class SQLDict(RAMDict, SQLBase): ...@@ -507,62 +507,61 @@ class SQLDict(RAMDict, SQLBase):
validation_text_dict = {'none': 1} validation_text_dict = {'none': 1}
message_dict = {} message_dict = {}
for line in result: for line in result:
message = self.loadMessage(line.message, uid = line.uid, message = self.loadMessage(line.message, uid=line.uid, line=line,
order_validation_text = line.order_validation_text) order_validation_text=line.order_validation_text)
self.getExecutableMessageList(activity_tool, message, message_dict, self.getExecutableMessageList(activity_tool, message, message_dict,
validation_text_dict, now_date=now_date) validation_text_dict, now_date=now_date)
if message_dict: if message_dict:
message_unique_set = set() def sort_message_key(message):
# same sort key as in SQLDict_readMessageList
return message.line.priority, message.line.date, message.uid
message_unique_dict = {}
serialization_tag_dict = {}
distributable_uid_set = set()
deletable_uid_list = [] deletable_uid_list = []
# remove duplicates # remove duplicates
# SQLDict considers object_path, method_id, tag to unify activities, # SQLDict considers object_path, method_id, tag to unify activities,
# but ignores method arguments. They are outside of semantics. # but ignores method arguments. They are outside of semantics.
for key in message_dict.keys(): for message in message_dict.itervalues():
# we manipulate message_dict below so that we cannot use message_unique_dict.setdefault(self.generateMessageUID(message),
# iterator here. []).append(message)
message = message_dict[key] for message_list in message_unique_dict.itervalues():
unique_key = self.generateMessageUID(message) if len(message_list) > 1:
if unique_key in message_unique_set: # Sort list of duplicates to keep the message with highest score
deletable_uid_list.append(message.uid) message_list.sort(key=sort_message_key)
del message_dict[message.uid] deletable_uid_list += [m.uid for m in message_list[1:]]
else: message = message_list[0]
message_unique_set.add(unique_key) distributable_uid_set.add(message.uid)
serialization_tag = message.activity_kw.get('serialization_tag')
if serialization_tag is not None:
serialization_tag_dict.setdefault(serialization_tag,
[]).append(message)
# Don't let through if there is the same serialization tag in the # Don't let through if there is the same serialization tag in the
# message dict. If there is the same serialization tag, only one can # message dict. If there is the same serialization tag, only one can
# be validated and others must wait. # be validated and others must wait.
# But messages with group_method_id are exceptions. serialization_tag # But messages with group_method_id are exceptions. serialization_tag
# does not stop validating together. Because those messages should # does not stop validating together. Because those messages should
# be processed together at once. # be processed together at once.
serialization_tag_set = set() for message_list in serialization_tag_dict.itervalues():
serialization_tag_group_method_id_dict = {} if len(message_list) == 1:
for key in message_dict.keys(): continue
message = message_dict[key] # Sort list of messages to validate the message with highest score
# serialize messages with serialization_tag. message_list.sort(key=sort_message_key)
serialization_tag = message.activity_kw.get('serialization_tag') group_method_id = message_list[0].activity_kw.get('group_method_id')
group_method_id = message.activity_kw.get('group_method_id') for message in message_list[1:]:
if serialization_tag is not None: if group_method_id is None or \
if serialization_tag in serialization_tag_set: group_method_id != message.activity_kw.get('group_method_id'):
if group_method_id is not None: distributable_uid_set.remove(message.uid)
# Only one group_method_id can pass through.
if serialization_tag_group_method_id_dict.get(
serialization_tag,None) != group_method_id:
del message_dict[message.uid]
else:
del message_dict[message.uid]
else:
serialization_tag_set.add(serialization_tag)
if group_method_id is not None:
serialization_tag_group_method_id_dict[serialization_tag] = group_method_id
if deletable_uid_list: if deletable_uid_list:
activity_tool.SQLBase_delMessage(table=self.sql_table, activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=deletable_uid_list) uid=deletable_uid_list)
distributable_count = len(distributable_uid_set)
distributable_count = len(message_dict) if distributable_count:
if distributable_count: activity_tool.SQLBase_assignMessage(table=self.sql_table,
activity_tool.SQLBase_assignMessage(table=self.sql_table, processing_node=0, uid=tuple(distributable_uid_set))
processing_node=0, uid=[m.uid for m in message_dict.itervalues()]) validated_count += distributable_count
validated_count += distributable_count
if validated_count < MAX_VALIDATED_LIMIT: if validated_count < MAX_VALIDATED_LIMIT:
offset += READ_MESSAGE_LIMIT offset += READ_MESSAGE_LIMIT
result = readMessageList(path=None, method_id=None, processing_node=-1, result = readMessageList(path=None, method_id=None, processing_node=-1,
......
...@@ -2720,14 +2720,16 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -2720,14 +2720,16 @@ class TestCMFActivity(ERP5TypeTestCase):
try: try:
Organisation.checkActivityCount = checkActivityCount Organisation.checkActivityCount = checkActivityCount
# Adds two same activities. # Adds two same activities.
organisation.activate(activity='SQLDict', tag='a').checkActivityCount(other_tag='a') organisation.activate(activity='SQLDict', tag='a', priority=2).checkActivityCount(other_tag='a')
get_transaction().commit() get_transaction().commit()
organisation.activate(activity='SQLDict', tag='a').checkActivityCount(other_tag='a') uid1, = [x.uid for x in activity_tool.getMessageList()]
organisation.activate(activity='SQLDict', tag='a', priority=1).checkActivityCount(other_tag='a')
get_transaction().commit() get_transaction().commit()
self.assertEqual(len(activity_tool.getMessageList()), 2) self.assertEqual(len(activity_tool.getMessageList()), 2)
activity_tool.distribute() activity_tool.distribute()
# After distribute, duplicate is deleted. # After distribute, duplicate is deleted.
self.assertEqual(len(activity_tool.getMessageList()), 1) uid2, = [x.uid for x in activity_tool.getMessageList()]
self.assertNotEqual(uid1, uid2)
self.tic() self.tic()
self.assertEqual(len(activity_tool.getMessageList()), 0) self.assertEqual(len(activity_tool.getMessageList()), 0)
self.assertEqual(len(check_result_dict), 1) self.assertEqual(len(check_result_dict), 1)
...@@ -2886,9 +2888,9 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -2886,9 +2888,9 @@ class TestCMFActivity(ERP5TypeTestCase):
self.assertEqual(len(result), 0) self.assertEqual(len(result), 0)
# Second scenario: activate, activate, distribute # Second scenario: activate, activate, distribute
# Both messages must be distributed (this is different from regular tags) # Both messages must be distributed (this is different from regular tags)
organisation.activate(activity=activity, serialization_tag='1').getTitle() organisation.activate(activity=activity, serialization_tag='1', priority=2).getTitle()
# Use a different method just so that SQLDict doesn't merge both activities prior to insertion. # Use a different method just so that SQLDict doesn't merge both activities prior to insertion.
organisation.activate(activity=activity, serialization_tag='1').getId() organisation.activate(activity=activity, serialization_tag='1', priority=1).getId()
get_transaction().commit() get_transaction().commit()
result = activity_tool.getMessageList() result = activity_tool.getMessageList()
self.assertEqual(len(result), 2) self.assertEqual(len(result), 2)
...@@ -2899,9 +2901,11 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -2899,9 +2901,11 @@ class TestCMFActivity(ERP5TypeTestCase):
# If activity is SQLQueue, this does not happen. # If activity is SQLQueue, this does not happen.
if activity=='SQLDict': if activity=='SQLDict':
# one is validated. # one is validated.
self.assertEqual(len([x for x in result if x.processing_node == 0]), 1) message, = [x for x in result if x.processing_node == 0]
self.assertEqual(message.method_id, 'getId')
# the other one is still waiting for validation. # the other one is still waiting for validation.
self.assertEqual(len([x for x in result if x.processing_node == -1]), 1) message, = [x for x in result if x.processing_node == -1]
self.assertEqual(message.method_id, 'getTitle')
else: else:
# both are validated at once. # both are validated at once.
self.assertEqual(len([x for x in result if x.processing_node == 0]), 2) self.assertEqual(len([x for x in result if x.processing_node == 0]), 2)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment