From 283f265407cae2837a6f4d4b28f115152f16b093 Mon Sep 17 00:00:00 2001
From: Sebastien Robin <seb@nexedi.com>
Date: Tue, 28 Sep 2004 09:34:27 +0000
Subject: [PATCH] updated SQLQueue in order to support more validation

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@1790 20353a03-c40f-0410-a6d1-a30d3c3de9de
---
 product/CMFActivity/Activity/SQLQueue.py | 74 +++++++++++++++++++-----
 1 file changed, 60 insertions(+), 14 deletions(-)

diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py
index dd403e2f83..180ef10a4e 100755
--- a/product/CMFActivity/Activity/SQLQueue.py
+++ b/product/CMFActivity/Activity/SQLQueue.py
@@ -29,7 +29,8 @@
 import random
 from Products.CMFActivity.ActivityTool import registerActivity
 from RAMQueue import RAMQueue
-from Queue import VALID
+from DateTime import DateTime
+from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY, SECONDS_IN_DAY
 from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
 
 from zLOG import LOG
@@ -58,7 +59,8 @@ class SQLQueue(RAMQueue):
       activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) ,
                                           method_id = m.method_id,
                                           priority = m.activity_kw.get('priority', 1),
-                                          message = self.dumpMessage(m))
+                                          message = self.dumpMessage(m),
+                                          date = m.activity_kw.get('at_date', DateTime()))
 
   def prepareDeleteMessage(self, activity_tool, m):
     # Erase all messages in a single transaction
@@ -67,12 +69,16 @@ class SQLQueue(RAMQueue):
     
   def dequeueMessage(self, activity_tool, processing_node):
     if hasattr(activity_tool,'SQLQueue_readMessageList'):
+      now_date = DateTime()
+      # Next processing date in case of error
+      next_processing_date = now_date + VALIDATION_ERROR_DELAY
       priority = random.choice(priority_weight)
       # Try to find a message at given priority level
-      result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=priority)
+      result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=priority,
+                                                  to_date=now_date)
       if len(result) == 0:
         # If empty, take any message
-        result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=None)
+        result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=None,to_date=now_date)
       if len(result) > 0:
         line = result[0]
         path = line.path
@@ -82,16 +88,23 @@ class SQLQueue(RAMQueue):
         get_transaction().commit() # Release locks before starting a potentially long calculation
         m = self.loadMessage(line.message)
         # Make sure object exists
-        if m.validate(self, activity_tool) is not VALID:
-          if line.priority > MAX_PRIORITY:
-            # This is an error
-            activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE)
-                                                                              # Assign message back to 'error' state
-            #m.notifyUser(activity_tool)                                       # Notify Error
-            get_transaction().commit()                                        # and commit
+        validation_state = m.validate(self, activity_tool)
+        if validation_state is not VALID:
+          if validation_state in (EXCEPTION, INVALID_PATH):          
+            if line.priority > MAX_PRIORITY:
+              # This is an error
+              activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE)
+                                                                                # Assign message back to 'error' state
+              #m.notifyUser(activity_tool)                                       # Notify Error
+              get_transaction().commit()                                        # and commit
+            else:
+              # Lower priority
+              activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
+              get_transaction().commit() # Release locks before starting a potentially long calculation
           else:
-            # Lower priority
-            activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
+            # We do not lower priority for INVALID_ORDER errors but we do postpone execution
+            activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
+                                                priority = line.priority)
             get_transaction().commit() # Release locks before starting a potentially long calculation
         else:
           # Try to invoke
@@ -109,7 +122,8 @@ class SQLQueue(RAMQueue):
               get_transaction().commit()                                        # and commit
             else:
               # Lower priority
-              activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
+              activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
+                                                 priority = line.priority + 1)
               get_transaction().commit() # Release locks before starting a potentially long calculation
         return 0
       get_transaction().commit() # Release locks before starting a potentially long calculation
@@ -207,4 +221,36 @@ class SQLQueue(RAMQueue):
         if processing_node > node_count:
           processing_node = 1 # Round robin
 
+  # Validation private methods
+  def _validate_after_method_id(self, activity_tool, message, value):
+    # Count number of occurances of method_id
+    LOG('SQLQueue._validate_after_method_id, message',0,message)
+    LOG('SQLQueue._validate_after_method_id, value',0,value)
+    result = activity_tool.SQLQueue_validateMessageList(method_id=value, message_uid=None, path=None)
+    if result[0].uid_count > 0:
+      return INVALID_ORDER
+    return VALID
+            
+  def _validate_after_path(self, activity_tool, message, value):
+    # Count number of occurances of path
+    result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=None, path=value)
+    if result[0].uid_count > 0:
+      return INVALID_ORDER
+    return VALID
+            
+  def _validate_after_message_uid(self, activity_tool, message, value):
+    # Count number of occurances of message_uid
+    result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=value, path=None)
+    if result[0].uid_count > 0:
+      return INVALID_ORDER
+    return VALID
+
+  # Required for tests (time shift)        
+  def timeShift(self, activity_tool, delay):    
+    """
+      To simulate timeShift, we simply substract delay from
+      all dates in SQLDict message table
+    """
+    activity_tool.SQLQueue_timeShift(delay = delay * SECONDS_IN_DAY)
+
 registerActivity(SQLQueue)
-- 
2.30.9