From b070a73cb7234a808362bb12d1fef252c54edccd Mon Sep 17 00:00:00 2001
From: Yoshinori Okuji <yo@nexedi.com>
Date: Tue, 25 Jan 2005 16:43:51 +0000
Subject: [PATCH] Add support for multiple after methods and broadcast.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@2293 20353a03-c40f-0410-a6d1-a30d3c3de9de
---
 product/CMFActivity/Activity/SQLQueue.py      | 50 +++++++++++++------
 .../activity/SQLQueue_assignMessage.zsql      |  5 ++
 .../activity/SQLQueue_createMessageTable.zsql |  1 +
 .../SQLQueue_validateMessageList.zsql         |  8 ++-
 .../skins/activity/SQLQueue_writeMessage.zsql |  5 +-
 5 files changed, 51 insertions(+), 18 deletions(-)

diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py
index 180ef10a4e..f188c24840 100755
--- a/product/CMFActivity/Activity/SQLQueue.py
+++ b/product/CMFActivity/Activity/SQLQueue.py
@@ -59,6 +59,7 @@ class SQLQueue(RAMQueue):
       activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) ,
                                           method_id = m.method_id,
                                           priority = m.activity_kw.get('priority', 1),
+                                          broadcast = m.activity_kw.get('broadcast', 0),
                                           message = self.dumpMessage(m),
                                           date = m.activity_kw.get('at_date', DateTime()))
 
@@ -66,7 +67,7 @@ class SQLQueue(RAMQueue):
     # Erase all messages in a single transaction
     LOG("prepareDeleteMessage", 0, str(m.__dict__))
     activity_tool.SQLQueue_delMessage(uid = m.uid)
-    
+
   def dequeueMessage(self, activity_tool, processing_node):
     if hasattr(activity_tool,'SQLQueue_readMessageList'):
       now_date = DateTime()
@@ -90,7 +91,7 @@ class SQLQueue(RAMQueue):
         # Make sure object exists
         validation_state = m.validate(self, activity_tool)
         if validation_state is not VALID:
-          if validation_state in (EXCEPTION, INVALID_PATH):          
+          if validation_state in (EXCEPTION, INVALID_PATH):
             if line.priority > MAX_PRIORITY:
               # This is an error
               activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE)
@@ -212,32 +213,49 @@ class SQLQueue(RAMQueue):
       #LOG('distribute count',0,str(len(result)) )
       #LOG('distribute count',0,str(map(lambda x:x.uid, result)))
       #get_transaction().commit() # Release locks before starting a potentially long calculation
-      uid_list = map(lambda x:x.uid, result)[0:100]
-      for uid in uid_list:
-        #LOG("distribute", 0, "assign %s" % uid)
-        activity_tool.SQLQueue_assignMessage(uid=uid, processing_node=processing_node)
-        #get_transaction().commit() # Release locks immediately to allow processing of messages
-        processing_node = processing_node + 1
-        if processing_node > node_count:
-          processing_node = 1 # Round robin
+      result = list(result)[0:100]
+      for line in result:
+        broadcast = line.broadcast
+        uid = line.uid
+        if broadcast:
+          # Broadcast messages must be distributed into all nodes.
+          activity_tool.SQLQueue_assignMessage(processing_node=1, uid=uid)
+          for node in range(2, node_count+1):
+            activity_tool.SQLQueue_writeMessage( path = line.path,
+                                                method_id = line.method_id,
+                                                priority = line.priority,
+                                                broadcast = 1,
+                                                processing_node = node,
+                                                message = line.message,
+                                                date = line.date)
+        else:
+          #LOG("distribute", 0, "assign %s" % uid)
+          activity_tool.SQLQueue_assignMessage(uid=uid, processing_node=processing_node)
+          #get_transaction().commit() # Release locks immediately to allow processing of messages
+          processing_node = processing_node + 1
+          if processing_node > node_count:
+            processing_node = 1 # Round robin
 
   # Validation private methods
   def _validate_after_method_id(self, activity_tool, message, value):
     # Count number of occurances of method_id
-    LOG('SQLQueue._validate_after_method_id, message',0,message)
-    LOG('SQLQueue._validate_after_method_id, value',0,value)
+    #get_transaction().commit()
+    if type(value) == type(''):
+      value = [value]
     result = activity_tool.SQLQueue_validateMessageList(method_id=value, message_uid=None, path=None)
+    LOG('SQLQueue._validate_after_method_id, method_id',0,value)
+    LOG('SQLQueue._validate_after_method_id, result[0].uid_count',0,result[0].uid_count)
     if result[0].uid_count > 0:
       return INVALID_ORDER
     return VALID
-            
+
   def _validate_after_path(self, activity_tool, message, value):
     # Count number of occurances of path
     result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=None, path=value)
     if result[0].uid_count > 0:
       return INVALID_ORDER
     return VALID
-            
+
   def _validate_after_message_uid(self, activity_tool, message, value):
     # Count number of occurances of message_uid
     result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=value, path=None)
@@ -245,8 +263,8 @@ class SQLQueue(RAMQueue):
       return INVALID_ORDER
     return VALID
 
-  # Required for tests (time shift)        
-  def timeShift(self, activity_tool, delay):    
+  # Required for tests (time shift)
+  def timeShift(self, activity_tool, delay):
     """
       To simulate timeShift, we simply substract delay from
       all dates in SQLDict message table
diff --git a/product/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql b/product/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql
index 4bddd48b80..74dd888ddf 100755
--- a/product/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql
+++ b/product/CMFActivity/skins/activity/SQLQueue_assignMessage.zsql
@@ -10,6 +10,7 @@ class_file:
 <params>path
 processing_node
 method_id
+broadcast
 uid:int=0</params>
 UPDATE message_queue
 SET
@@ -19,3 +20,7 @@ WHERE
 <dtml-if path> path = <dtml-sqlvar path type="string"> 
 <dtml-else> uid = <dtml-sqlvar uid type="int"> </dtml-if>
 <dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
+<dtml-if broadcast>
+  AND broadcast = <dtml-sqlvar broadcast type="int">
+</dtml-if>
+
diff --git a/product/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql b/product/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
index dc41980598..20a88e99a6 100755
--- a/product/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
+++ b/product/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
@@ -17,6 +17,7 @@ CREATE TABLE `message_queue` (
   `processing` INT DEFAULT 0,
   `processing_date` datetime,
   `priority` INT DEFAULT 0,
+  `broadcast` INT DEFAULT 0,
   `message` BLOB,
   PRIMARY KEY  (`uid`),
   KEY `date` (`date`),
diff --git a/product/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql b/product/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql
index 10c1e77255..37292bcd34 100755
--- a/product/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql
+++ b/product/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql
@@ -17,6 +17,12 @@ FROM
     message_queue
 WHERE
     processing_node >= -1
-<dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if>
+<dtml-if method_id>
+    AND (
+<dtml-in method_id>
+        method_id = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
+</dtml-in>
+    )
+</dtml-if>
 <dtml-if message_uid>AND uid = <dtml-sqlvar message_uid type="int"> </dtml-if>
 <dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if>
diff --git a/product/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql b/product/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql
index dd80be8463..2600808db4 100755
--- a/product/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql
+++ b/product/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql
@@ -11,13 +11,16 @@ class_file:
 method_id
 message
 priority
+broadcast
+processing_node=-1
 date</params>
 INSERT INTO message_queue
 SET
 	path = <dtml-sqlvar path type="string">,
   <dtml-if date>date = <dtml-sqlvar date type="string">, <dtml-else>date = <dtml-sqlvar "_.DateTime()" type="string">, </dtml-if> 
 	method_id = <dtml-sqlvar method_id type="string">,
-	processing_node = -1,
+	processing_node = <dtml-sqlvar processing_node type="int">,
+	broadcast = <dtml-sqlvar broadcast type="int">,
 	processing = -1,
 	priority = <dtml-sqlvar priority type="int">,
 	message = <dtml-sqlvar message type="string">
-- 
2.30.9