From 5cb051d574711fe4409f1384e839fe1117d8e0e6 Mon Sep 17 00:00:00 2001
From: Vincent Pelletier <vincent@nexedi.com>
Date: Wed, 9 Jul 2008 14:20:19 +0000
Subject: [PATCH] Offer a method to trigger a clean shutdown.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@22388 20353a03-c40f-0410-a6d1-a30d3c3de9de
---
 product/CMFActivity/ActivityTool.py | 28 ++++++++++++++++++++++++++--
 1 file changed, 26 insertions(+), 2 deletions(-)

diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py
index 402753a344..4fde546b00 100644
--- a/product/CMFActivity/ActivityTool.py
+++ b/product/CMFActivity/ActivityTool.py
@@ -73,6 +73,7 @@ max_active_threads = 1 # 2 will cause more bug to appear (he he)
 is_initialized = False
 tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
 timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
+is_running_lock = threading.Lock()
 first_run = True
 currentNode = None
 ROLE_IDLE = 0
@@ -375,6 +376,11 @@ class ActivityTool (Folder, UniqueObject):
     
     distributingNode = ''
     _nodes = ()
+    # Set to False when shutting down. Access outside of process_shutdown must
+    # be done under the protection of is_running_lock lock.
+    _is_running = True
+    # True when activities cannot be executing any more.
+    _has_processed_shutdown = False
 
     def __init__(self):
         return Folder.__init__(self, ActivityTool.id)
@@ -592,6 +598,19 @@ class ActivityTool (Folder, UniqueObject):
           '/manageLoadBalancing?manage_tabs_message=' +
           urllib.quote(message))
 
+    def process_shutdown(self, phase, time_in_phase):
+        """
+          Prevent shutdown from happening while an activity queue is
+          processing a batch.
+        """
+        self._is_running = False
+        if phase == 3 and not self._has_processed_shutdown:
+          self._has_processed_shutdown = True
+          LOG('CMFActivity', INFO, "Shutdown: Waiting for activities to finish.")
+          is_running_lock.acquire()
+          LOG('CMFActivity', INFO, "Shutdown: Activities finished.")
+          is_running_lock.release()
+
     def process_timer(self, tick, interval, prev="", next=""):
         """
         Call distribute() if we are the Distributing Node and call tic()
@@ -706,8 +725,13 @@ class ActivityTool (Folder, UniqueObject):
         while has_awake_activity:
           has_awake_activity = 0
           for activity in activity_list:
-            activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity
-            has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node)
+            is_running_lock.acquire()
+            try:
+              if self._is_running:
+                activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity
+                has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node)
+            finally:
+              is_running_lock.release()
       finally:
         # decrease the number of active_threads
         tic_lock.acquire()
-- 
2.30.9