Commit 80a963cb authored by Vincent Pelletier's avatar Vincent Pelletier

Implement inter-queue priorities.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@19389 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 28094ef6
......@@ -348,3 +348,13 @@ class Queue:
delay is provided in fractions of day
"""
pass
def getPriority(self, activity_tool):
"""
Get priority from this queue.
Lower number means higher priority value.
Legal value range is [1, 6].
Values out of this range might work, but are non-standard.
"""
return 6
......@@ -41,3 +41,12 @@ class SQLBase:
assert len(result) == 1
assert len(result[0]) == 1
return result[0][0]
def _getPriority(self, activity_tool, method, default):
now_date = self.getNow(activity_tool)
result = method(to_date=now_date)
assert len(result) == 1
priority = result[0]['priority']
if priority is None:
priority = default
return priority
......@@ -720,4 +720,9 @@ class SQLDict(RAMDict, SQLBase):
"""
activity_tool.SQLDict_timeShift(delay=delay, processing_node=processing_node,retry=retry)
def getPriority(self, activity_tool):
method = activity_tool.SQLDict_getPriority
default = RAMDict.getPriority(self, activity_tool)
return self._getPriority(activity_tool, method, default)
registerActivity(SQLDict)
......@@ -542,4 +542,9 @@ class SQLQueue(RAMQueue, SQLBase):
"""
activity_tool.SQLQueue_timeShift(delay=delay, processing_node=processing_node)
def getPriority(self, activity_tool):
method = activity_tool.SQLQueue_getPriority
default = RAMQueue.getPriority(self, activity_tool)
return self._getPriority(activity_tool, method, default)
registerActivity(SQLQueue)
......@@ -661,15 +661,22 @@ class ActivityTool (Folder, UniqueObject):
first_run = False
try:
#Sort activity list by priority
activity_list = activity_dict.values()
# Sort method must be local to access "self"
def cmpActivities(activity_1, activity_2):
return cmp(activity_1.getPriority(self), activity_2.getPriority(self))
activity_list.sort(cmpActivities)
# Wakeup each queue
for activity in activity_dict.itervalues():
for activity in activity_list:
activity.wakeup(inner_self, processing_node)
# Process messages on each queue in round robin
has_awake_activity = 1
while has_awake_activity:
has_awake_activity = 0
for activity in activity_dict.itervalues():
for activity in activity_list:
activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity
has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node)
finally:
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
to_date
</params>
SELECT MIN(`priority`) AS `priority` FROM
message
WHERE
processing_node = 0
AND date <= <dtml-sqlvar to_date type="datetime">
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
to_date
</params>
SELECT MIN(`priority`) AS `priority` FROM
message_queue
WHERE
processing_node = 0
AND date <= <dtml-sqlvar to_date type="datetime">
......@@ -2625,6 +2625,58 @@ class TestCMFActivity(ERP5TypeTestCase):
finally:
delattr(Organisation, 'checkActivityCount')
def test_103_interQueuePriorities(self, quiet=0, run=run_all_test):
"""
Important note: there is no way to really reliably check that this
feature is correctly implemented, as activity execution order is
non-deterministic.
The best which can be done is to check that under certain circumstances
the activity exeicution order match expectations.
"""
if not run: return
if not quiet:
message = '\nCheck inter-queue priorities'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
organisation = self.getPortal().organisation_module.newContent(portal_type='Organisation')
get_transaction().commit()
self.tic()
activity_tool = self.getActivityTool()
check_result_dict = {}
def runAndCheck():
check_result_dict.clear()
get_transaction().commit()
self.assertEqual(len(check_result_dict), 0)
self.tic()
self.assertEqual(len(check_result_dict), 2)
self.assertTrue(check_result_dict['before_ran'])
self.assertTrue(check_result_dict['after_ran'])
def mustRunBefore(self):
check_result_dict['before_ran'] = 'after_ran' not in check_result_dict
def mustRunAfter(self):
check_result_dict['after_ran'] = 'before_ran' in check_result_dict
Organisation.mustRunBefore = mustRunBefore
Organisation.mustRunAfter = mustRunAfter
try:
# Check that ordering looks good (SQLQueue first)
organisation.activate(activity='SQLQueue', priority=1).mustRunBefore()
organisation.activate(activity='SQLDict', priority=2).mustRunAfter()
runAndCheck()
# Check that ordering looks good (SQLDict first)
organisation.activate(activity='SQLDict', priority=1).mustRunBefore()
organisation.activate(activity='SQLQueue', priority=2).mustRunAfter()
runAndCheck()
# Check that tag takes precedence over priority (SQLQueue first by priority)
organisation.activate(activity='SQLQueue', priority=1, after_tag='a').mustRunAfter()
organisation.activate(activity='SQLDict', priority=2, tag='a').mustRunBefore()
runAndCheck()
# Check that tag takes precedence over priority (SQLDict first by priority)
organisation.activate(activity='SQLDict', priority=1, after_tag='a').mustRunAfter()
organisation.activate(activity='SQLQueue', priority=2, tag='a').mustRunBefore()
runAndCheck()
finally:
delattr(Organisation, 'mustRunBefore')
delattr(Organisation, 'mustRunAfter')
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment