From c5edcba25f52534d49794d51a2e09d530b2541e6 Mon Sep 17 00:00:00 2001
From: Jean-Paul Smets <jp@nexedi.com>
Date: Wed, 28 Jul 2004 09:44:27 +0000
Subject: [PATCH] Moved sticky processing to readMessageList with delay var to
 reduce calls and potential conflicts

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@1302 20353a03-c40f-0410-a6d1-a30d3c3de9de
---
 product/CMFActivity/Activity/Queue.py         |  2 ++
 product/CMFActivity/Activity/SQLDict.py       | 22 ++++++++++++-------
 .../skins/activity/SQLDict_readMessage.zsql   | 12 +---------
 .../activity/SQLDict_readMessageList.zsql     | 15 +++++++++++--
 product/CMFActivity/tests/testCMFActivity.py  |  9 ++++++++
 5 files changed, 39 insertions(+), 21 deletions(-)

diff --git a/product/CMFActivity/Activity/Queue.py b/product/CMFActivity/Activity/Queue.py
index 7283e04fea..4a500d0a90 100755
--- a/product/CMFActivity/Activity/Queue.py
+++ b/product/CMFActivity/Activity/Queue.py
@@ -28,6 +28,7 @@
 
 import pickle, sys
 from Acquisition import aq_base
+from DateTime import DateTime
 from Products.CMFActivity.ActivityTool import Message
 from zLOG import LOG
 
@@ -81,6 +82,7 @@ class Queue:
     self.is_alive = {}
     self.is_awake = {}
     self.is_initialized = 0
+    self.max_processing_date = DateTime()
 
   def initialize(self, activity_tool):
     # This is the only moment when
diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py
index 3aa8ed89f9..1aac3ed728 100755
--- a/product/CMFActivity/Activity/SQLDict.py
+++ b/product/CMFActivity/Activity/SQLDict.py
@@ -43,7 +43,7 @@ priority_weight = \
   [3] * 10 + \
   [4] * 5 + \
   [5] * 1
-
+   
 class ActivityFlushError(Exception):
     """Error during active message flush"""
 
@@ -108,14 +108,12 @@ class SQLDict(RAMDict):
   def dequeueMessage(self, activity_tool, processing_node):
     if hasattr(activity_tool,'SQLDict_readMessage'):
       now_date = DateTime()
-      # Sticky processing messages should be set back to non processing
-      max_processing_date = now_date - MAX_PROCESSING_TIME
       # Next processing date in case of error
       next_processing_date = now_date + VALIDATION_ERROR_DELAY
       priority = random.choice(priority_weight)
       # Try to find a message at given priority level which is scheduled for now
       result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority,
-                                                 to_date=now_date, to_processing_date = max_processing_date)
+                                                 to_date=now_date)
       if len(result) == 0:
         # If empty, take any message which is scheduled for now
         priority = None
@@ -261,18 +259,26 @@ class SQLDict(RAMDict):
     # YO: reading all lines might cause a deadlock
     message_list = []
     if hasattr(activity_tool,'SQLDict_readMessageList'):
-      result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None)
+      result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None, to_processing_date=None)
       for line in result:
         m = self.loadMessage(line.message, uid = line.uid)
         m.processing_node = line.processing_node
         m.priority = line.priority
         message_list.append(m)
-    return message_list
-
+    return message_list        
+      
   def distribute(self, activity_tool, node_count):
     processing_node = 1
     if hasattr(activity_tool,'SQLDict_readMessageList'):
-      result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages
+      now_date = DateTime()
+      if (now_date - self.max_processing_date) > MAX_PROCESSING_TIME:
+        # Sticky processing messages should be set back to non processing
+        max_processing_date = now_date - MAX_PROCESSING_TIME
+        self.max_processing_date = now_date
+      else:
+        max_processing_date = None     
+      result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1,
+                                                     to_processing_date = max_processing_date) # Only assign non assigned messages
       get_transaction().commit() # Release locks before starting a potentially long calculation
       path_dict = {}
       for line in result:
diff --git a/product/CMFActivity/skins/activity/SQLDict_readMessage.zsql b/product/CMFActivity/skins/activity/SQLDict_readMessage.zsql
index b3411f0e09..2580562a84 100755
--- a/product/CMFActivity/skins/activity/SQLDict_readMessage.zsql
+++ b/product/CMFActivity/skins/activity/SQLDict_readMessage.zsql
@@ -11,17 +11,7 @@ class_file:
 priority
 to_date
 to_processing_date</params>
-<dtml-if to_processing_date>UPDATE message
-SET
-  processing = 0
-WHERE
-  processing = 1
-AND
-  processing_date < <dtml-sqlvar to_processing_date type="string">
-
-<dtml-var "'\0'">
-
-</dtml-if>SELECT * FROM
+SELECT * FROM
     message
 WHERE
     processing <> 1
diff --git a/product/CMFActivity/skins/activity/SQLDict_readMessageList.zsql b/product/CMFActivity/skins/activity/SQLDict_readMessageList.zsql
index 3e3c043ddb..e42b7def0a 100755
--- a/product/CMFActivity/skins/activity/SQLDict_readMessageList.zsql
+++ b/product/CMFActivity/skins/activity/SQLDict_readMessageList.zsql
@@ -10,8 +10,19 @@ class_file:
 <params>path
 method_id
 processing_node
-priority</params>
-SELECT * FROM
+priority
+to_processing_date</params>
+<dtml-if to_processing_date>UPDATE message
+SET
+  processing = 0
+WHERE
+  processing = 1
+AND
+  processing_date < <dtml-sqlvar to_processing_date type="string">
+  
+<dtml-var "'\0'">
+
+</dtml-if>SELECT * FROM
     message
 WHERE
     processing <> 1
diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py
index dec9e1cff4..3ee307bc8b 100755
--- a/product/CMFActivity/tests/testCMFActivity.py
+++ b/product/CMFActivity/tests/testCMFActivity.py
@@ -207,10 +207,19 @@ class TestCMFActivity(ERP5TypeTestCase):
     organisation.setTitle(self.title1)
     organisation.activate(activity=activity).setTitle(self.title2)
     organisation.flushActivity(invoke=1)
+    self.assertEquals(organisation.getTitle(),self.title2)
     get_transaction().commit()
     message_list = portal.portal_activities.getMessageList()
     self.assertEquals(len(message_list),0)
     self.assertEquals(organisation.getTitle(),self.title2)
+    # Try again with different commit order
+    organisation.setTitle(self.title1)
+    organisation.activate(activity=activity).setTitle(self.title2)
+    get_transaction().commit()
+    organisation.flushActivity(invoke=1)
+    self.assertEquals(len(message_list),0)
+    self.assertEquals(organisation.getTitle(),self.title2)
+    get_transaction().commit()
 
   def TryActivateInsideFlush(self, activity):
     """
-- 
2.30.9