From e5eb6c703e1e694b0ea548c12647fb63ff4c5453 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?J=C3=A9rome=20Perrin?= <jerome@nexedi.com>
Date: Tue, 21 Apr 2015 09:43:39 +0200
Subject: [PATCH] ACO: simpler distribution using parrallel subprocesses

---
 dream/plugins/ACO.py                | 32 ++++++++++++++++++++++++-----
 dream/plugins/Batches/BatchesACO.py |  1 +
 dream/plugins/plugin.py             |  3 ++-
 3 files changed, 30 insertions(+), 6 deletions(-)

diff --git a/dream/plugins/ACO.py b/dream/plugins/ACO.py
index 7a8e9258..2fb15b2a 100644
--- a/dream/plugins/ACO.py
+++ b/dream/plugins/ACO.py
@@ -6,13 +6,19 @@ import time
 import random
 import operator
 import xmlrpclib
+import signal
+from multiprocessing import Pool
 
 from dream.simulation.Queue import Queue
 from dream.simulation.Operator import Operator
 from dream.simulation.Globals import getClassFromName
 
-class ACO(plugin.ExecutionPlugin):
+# run an ant in a subrocess. Can be parrallelized.
+def runAntInSubProcess(ant):
+  ant['result'] = plugin.ExecutionPlugin.runOneScenario(ant['input'])['result']
+  return ant
 
+class ACO(plugin.ExecutionPlugin):
   def _calculateAntScore(self, ant):
     """Calculate the score of this ant.
     """
@@ -57,6 +63,8 @@ class ACO(plugin.ExecutionPlugin):
     if distributor_url:
         distributor = xmlrpclib.Server(distributor_url)
 
+    multiprocessorCount = data['general'].get('multiprocessorCount')
+
     tested_ants = set()
     start = time.time()         # start counting execution time
 
@@ -97,10 +105,24 @@ class ACO(plugin.ExecutionPlugin):
                 scenario_list.append(ant)
                        
         if distributor is None:
-            # synchronous
-            for ant in scenario_list:
-                ant['result'] = self.runOneScenario(ant['input'])['result']
-
+            if multiprocessorCount:
+                self.logger.info("running multiprocessing ACO with %s processes" % multiprocessorCount)
+                # We unset our signal handler to print traceback at the end
+                # otherwise logs are confusing. 
+                sigterm_handler = signal.getsignal(signal.SIGTERM)
+                pool = Pool(processes=multiprocessorCount)
+                try:
+                    signal.signal(signal.SIGTERM, signal.SIG_DFL)
+                    scenario_list = pool.map(runAntInSubProcess, scenario_list)
+                    pool.close()
+                    pool.join()
+                finally:
+                    signal.signal(signal.SIGTERM, sigterm_handler)
+            else:
+                # synchronous
+                for ant in scenario_list:
+                    ant['result'] = self.runOneScenario(ant['input'])['result']
+        
         else: # asynchronous
             self.logger.info("Registering a job for %s scenarios" % len(scenario_list))
             start_register = time.time()
diff --git a/dream/plugins/Batches/BatchesACO.py b/dream/plugins/Batches/BatchesACO.py
index 2ff5c74a..65f6a7d1 100644
--- a/dream/plugins/Batches/BatchesACO.py
+++ b/dream/plugins/Batches/BatchesACO.py
@@ -83,6 +83,7 @@ class BatchesACO(ACO):
     # else run ACO
     data['general']['numberOfSolutions']=1  # default of 1 solution for this instance
     data["general"]["distributorURL"]=None  # no distributor currently, to be added in the GUI
+    data["general"]["multiprocessorCount"] = 8 # number of parrallel processes, to be added to the GUI
     ACO.run(self, data)
     data["result"]["result_list"][-1]["score"] = ''
     data["result"]["result_list"][-1]["key"] = "Go To Results Page"
diff --git a/dream/plugins/plugin.py b/dream/plugins/plugin.py
index 98ee4b3f..fe669471 100644
--- a/dream/plugins/plugin.py
+++ b/dream/plugins/plugin.py
@@ -59,7 +59,8 @@ class Plugin(object):
 class ExecutionPlugin(Plugin):
   """Plugin to handle the execution of multiple simulation runs.
   """
-  def runOneScenario(self, data):
+  @staticmethod
+  def runOneScenario(data):
     """default method for running one scenario
     """
     return json.loads(simulate_line_json(input_data=json.dumps(data)))
-- 
2.30.9