Commit 1de2d44b authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: always process the queue with higher priority instead of round robin

parent 8bfcf2b3
...@@ -81,8 +81,6 @@ class Queue: ...@@ -81,8 +81,6 @@ class Queue:
#scriptable_method_id_list = ['appendMessage', 'nextMessage', 'delMessage'] #scriptable_method_id_list = ['appendMessage', 'nextMessage', 'delMessage']
def __init__(self): def __init__(self):
self.is_alive = {}
self.is_awake = {}
self.is_initialized = 0 self.is_initialized = 0
def initialize(self, activity_tool): def initialize(self, activity_tool):
...@@ -105,24 +103,9 @@ class Queue: ...@@ -105,24 +103,9 @@ class Queue:
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
pass pass
def tic(self, activity_tool, processing_node):
# Tic should return quickly to prevent locks or commit transactions at some point
if self.dequeueMessage(activity_tool, processing_node):
self.sleep(activity_tool, processing_node)
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
pass pass
def sleep(self, activity_tool, processing_node):
self.is_awake[processing_node] = 0
def wakeup(self, activity_tool, processing_node):
self.is_awake[processing_node] = 1
def terminate(self, activity_tool, processing_node):
self.is_awake[processing_node] = 0
self.is_alive[processing_node] = 0
def validate(self, activity_tool, message, check_order_validation=1, **kw): def validate(self, activity_tool, message, check_order_validation=1, **kw):
""" """
This is the place where activity semantics is implemented This is the place where activity semantics is implemented
...@@ -220,9 +203,6 @@ class Queue: ...@@ -220,9 +203,6 @@ class Queue:
else: else:
pass pass
def isAwake(self, activity_tool, processing_node):
return self.is_awake[processing_node]
def hasActivity(self, activity_tool, object, processing_node=None, active_process=None, **kw): def hasActivity(self, activity_tool, object, processing_node=None, active_process=None, **kw):
return 0 return 0
...@@ -320,8 +300,7 @@ class Queue: ...@@ -320,8 +300,7 @@ class Queue:
""" """
Get priority from this queue. Get priority from this queue.
Lower number means higher priority value. Lower number means higher priority value.
Legal value range is [1, 6]. Legal value range is [-128, 127].
Values out of this range might work, but are non-standard. Values out of this range might work, but are non-standard.
""" """
return 6 return 128
...@@ -962,25 +962,26 @@ class ActivityTool (Folder, UniqueObject): ...@@ -962,25 +962,26 @@ class ActivityTool (Folder, UniqueObject):
inner_self = aq_inner(self) inner_self = aq_inner(self)
try: try:
#Sort activity list by priority # Loop as long as there are activities. Always process the queue with
activity_list = sorted(activity_dict.itervalues(), # "highest" priority. If several queues have same highest priority, do
key=lambda activity: activity.getPriority(self)) # not choose one that has just been processed.
# This algorithm is fair enough because we actually use only 2 queues.
# Wakeup each queue # Otherwise, a round-robin of highest-priority queues would be required.
for activity in activity_list: # XXX: We always finish by iterating over all queues, in case that
activity.wakeup(inner_self, processing_node) # getPriority does not see messages dequeueMessage would process.
last = None
# Process messages on each queue in round robin def sort_key(activity):
has_awake_activity = 1 return activity.getPriority(self), activity is last
while has_awake_activity: while is_running_lock.acquire(0):
has_awake_activity = 0 try:
for activity in activity_list: for last in sorted(activity_dict.itervalues(), key=sort_key):
if is_running_lock.acquire(0): # Transaction processing is the responsability of the activity
try: if not last.dequeueMessage(inner_self, processing_node):
activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity break
has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node) else:
finally: break
is_running_lock.release() finally:
is_running_lock.release()
finally: finally:
# decrease the number of active_threads # decrease the number of active_threads
tic_lock.acquire() tic_lock.acquire()
......
...@@ -3184,16 +3184,16 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -3184,16 +3184,16 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
rendez_vous_event.set() rendez_vous_event.set()
# When this event is available, it means test has called process_shutdown. # When this event is available, it means test has called process_shutdown.
activity_event.wait() activity_event.wait()
from Products.CMFActivity.Activity.Queue import Queue from Products.CMFActivity.Activity.SQLDict import SQLDict
original_queue_tic = Queue.tic original_dequeue = SQLDict.dequeueMessage
queue_tic_test_dict = {} queue_tic_test_dict = {}
def Queue_tic(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
result = original_queue_tic(self, activity_tool, processing_node)
queue_tic_test_dict['isAlive'] = process_shutdown_thread.isAlive()
# This is a one-shot method, revert after execution # This is a one-shot method, revert after execution
Queue.tic = original_queue_tic SQLDict.dequeueMessage = original_dequeue
result = self.dequeueMessage(activity_tool, processing_node)
queue_tic_test_dict['isAlive'] = process_shutdown_thread.isAlive()
return result return result
Queue.tic = Queue_tic SQLDict.dequeueMessage = dequeueMessage
Organisation.waitingActivity = waitingActivity Organisation.waitingActivity = waitingActivity
try: try:
# Use SQLDict with no group method so that both activities won't be # Use SQLDict with no group method so that both activities won't be
...@@ -3258,7 +3258,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -3258,7 +3258,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
pass pass
finally: finally:
delattr(Organisation, 'waitingActivity') delattr(Organisation, 'waitingActivity')
Queue.tic = original_queue_tic SQLDict.dequeueMessage = original_dequeue
def test_hasActivity(self): def test_hasActivity(self):
active_object = self.portal.organisation_module.newContent( active_object = self.portal.organisation_module.newContent(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment