Commit c29d48a9 authored by Jérome Perrin's avatar Jérome Perrin

ACO: Distribute calculation over a cluster of machines

Currently implemented distributor is ERP5. See erp5_test_result from
https://github.com/erp5/erp5/tree/dream_distributor
The URL must be point to a DREAM Simulation Distributor, something like
http://localhost:55163/erp5/portal_task_distribution/dream_distributor
parent a3aae79d
...@@ -3,6 +3,7 @@ import json ...@@ -3,6 +3,7 @@ import json
import time import time
import random import random
import operator import operator
import xmlrpclib
from dream.simulation.GUI.Default import Simulation as DefaultSimulation from dream.simulation.GUI.Default import Simulation as DefaultSimulation
from dream.simulation.Queue import Queue from dream.simulation.Queue import Queue
...@@ -30,6 +31,14 @@ class Simulation(DefaultSimulation): ...@@ -30,6 +31,14 @@ class Simulation(DefaultSimulation):
"name": "Number of solutions", "name": "Number of solutions",
"_class": "Dream.Property", "_class": "Dream.Property",
"_default": 4} ) "_default": 4} )
conf["Dream-Configuration"]["property_list"].append(
{ "id": "distributorURL",
"type": "string",
"name": "Distributor URL",
"description": "URL of an ERP5 Distributor, see "
"https://github.com/erp5/erp5/tree/dream_distributor",
"_class": "Dream.Property",
"_default": ''} )
return conf return conf
def _preprocess(self, data): def _preprocess(self, data):
...@@ -59,8 +68,13 @@ class Simulation(DefaultSimulation): ...@@ -59,8 +68,13 @@ class Simulation(DefaultSimulation):
def run(self, data): def run(self, data):
data = self._preprocess(data) data = self._preprocess(data)
distributor_url = data['general']['distributorURL']
distributor = None
if distributor_url:
distributor = xmlrpclib.Server(distributor_url)
tested_ants = set() tested_ants = set()
start=time.time() # start counting execution time start = time.time() # start counting execution time
# the list of options collated into a dictionary for ease of referencing in # the list of options collated into a dictionary for ease of referencing in
# ManPy # ManPy
...@@ -74,9 +88,11 @@ class Simulation(DefaultSimulation): ...@@ -74,9 +88,11 @@ class Simulation(DefaultSimulation):
ants = [] #list of ants for keeping track of their performance ants = [] #list of ants for keeping track of their performance
# Number of times new ants are to be created, i.e. number of generations (a # Number of times new ants are to be created, i.e. number of generations (a
# generation can have more than 1 ant) # generation can have more than 1 ant)
for i in range(data["general"]["numberOfGenerations"]): for i in range(data["general"]["numberOfGenerations"]):
scenario_list = [] # for the distributor
# number of ants created per generation # number of ants created per generation
for j in range(data["general"]["numberOfAntsPerGenerations"]): for j in range(data["general"]["numberOfAntsPerGenerations"]):
# an ant dictionary to contain rule to queue assignment information # an ant dictionary to contain rule to queue assignment information
...@@ -91,22 +107,42 @@ class Simulation(DefaultSimulation): ...@@ -91,22 +107,42 @@ class Simulation(DefaultSimulation):
if ant_key not in tested_ants: if ant_key not in tested_ants:
tested_ants.add(ant_key) tested_ants.add(ant_key)
# the current ant to be simulated (evaluated) is added to the
# ants list
ants.append(ant)
# set scheduling rule on queues based on ant data # set scheduling rule on queues based on ant data
ant_data = copy(data) ant_data = copy(data)
for k, v in ant.items(): for k, v in ant.items():
ant_data["nodes"][k]['schedulingRule'] = v ant_data["nodes"][k]['schedulingRule'] = v
ant['key'] = ant_key ant['key'] = ant_key
# TODO: those two steps have to be parallelized
ant['result'] = DefaultSimulation.runOneScenario(self, ant_data)
ant['input'] = ant_data ant['input'] = ant_data
scenario_list.append(ant)
if distributor is None:
# synchronous
for ant in scenario_list:
ant['result'] = DefaultSimulation.runOneScenario(self, ant['input'])
else: # asynchronous
job_id = distributor.requestSimulationRun(
[json.dumps(x) for x in scenario_list])
print "Job registered", job_id
while True:
time.sleep(1.)
result_list = distributor.getJobResult(job_id)
# The distributor returns None when calculation is still ongoing,
# or the list of result in the same order.
if result_list is not None:
print "Job terminated"
break
for ant, result in zip(scenario_list, result_list):
ant['result'] = json.loads(result)
for ant in scenario_list:
ant['score'] = self._calculateAntScore(ant) ant['score'] = self._calculateAntScore(ant)
ants.extend(scenario_list)
# remove ants that outputs the same schedules # remove ants that outputs the same schedules
ants_without_duplicates = dict() ants_without_duplicates = dict()
for ant in ants: for ant in ants:
...@@ -130,4 +166,5 @@ class Simulation(DefaultSimulation): ...@@ -130,4 +166,5 @@ class Simulation(DefaultSimulation):
# selected by the next ants. # selected by the next ants.
collated[m].append(l[m]) collated[m].append(l[m])
print "ACO finished, execution time %0.2fs" % (time.time() - start)
return ants return ants
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment