Commit 049fdd57 authored by Hardik Juneja's avatar Hardik Juneja Committed by Hardik Juneja

Some clean up

parent 53bf9d3e
import time
import numpy as np
from Products.ERP5Type.Log import log
import sklearn
from sklearn.externals import joblib
from sklearn.externals.joblib.parallel import parallel_backend
from sklearn.datasets import load_digits
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
def test(self, active_process_path):
digits = load_digits()
X, y = digits.data, digits.target
param_grid = {
'C': np.logspace(-10, 10, 3),
'gamma': np.logspace(-10, 10, 3),
'tol': [1e-4]
}
X = np.ascontiguousarray(X)
y = np.ascontiguousarray(y)
clf = GridSearchCV(SVC(), param_grid=param_grid, verbose=10)
active_process = self.portal_activities.unrestrictedTraverse(active_process_path)
tic = time.time()
with parallel_backend('CMFActivity', n_jobs=2, active_process=active_process):
clf.fit(X, y)
return 'ok', sklearn.__version__, joblib.__version__, time.time() - tic
\ No newline at end of file
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Extension Component" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_recorded_property_dict</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>joblibGridSearch</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>extension.erp5.joblibGridSearch</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Extension Component</string> </value>
</item>
<item>
<key> <string>sid</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>text_content_error_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple>
<string>W: 5, 0: Unused log imported from Products.ERP5Type.Log (unused-import)</string>
</tuple>
</value>
</item>
<item>
<key> <string>version</string> </key>
<value> <string>erp5</string> </value>
</item>
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary/>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary>
<item>
<key> <string>component_validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.patches.WorkflowTool"/>
</pickle>
<pickle>
<tuple>
<none/>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>validate</string> </value>
</item>
<item>
<key> <string>validation_state</string> </key>
<value> <string>validated</string> </value>
</item>
</dictionary>
</list>
</tuple>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Extension Component" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_recorded_property_dict</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>joblibRandomForest</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>extension.erp5.joblibRandomForest</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Extension Component</string> </value>
</item>
<item>
<key> <string>sid</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>text_content_error_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>version</string> </key>
<value> <string>erp5</string> </value>
</item>
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary/>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary>
<item>
<key> <string>component_validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.patches.WorkflowTool"/>
</pickle>
<pickle>
<tuple>
<none/>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>validate</string> </value>
</item>
<item>
<key> <string>validation_state</string> </key>
<value> <string>validated</string> </value>
</item>
</dictionary>
</list>
</tuple>
</pickle>
</record>
</ZopeData>
from sklearn.externals.joblib.parallel import parallel_backend, Parallel, delayed
from Products.CMFActivity.ActiveResult import ActiveResult
import time
from math import sqrt
def sleepAndSqrt(num):
time.sleep(2)
return sqrt(num)
def test(self, active_process_path):
active_process = self.portal_activities.unrestrictedTraverse(active_process_path)
# Use CMFActivity as a backend for joblob
with parallel_backend('CMFActivity', active_process=active_process):
result = Parallel(n_jobs=2, pre_dispatch='all', timeout=30, verbose=30)(delayed(sleepAndSqrt)(i**2) for i in range(5))
# Set result value and an id to the active result and post it
result = ActiveResult(result=result)
result.sig = 12345
active_process.postResult(result)
return result
\ No newline at end of file
from copy import copy
import time
import numpy as np
from copy import copy
from math import sqrt
from Products.ERP5Type.Log import log
from Products.CMFActivity.ActiveResult import ActiveResult
from sklearn.base import clone
......@@ -9,7 +11,34 @@ from sklearn.utils import check_random_state
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.externals import joblib
from sklearn.externals.joblib.parallel import parallel_backend
from sklearn.externals.joblib.parallel import parallel_backend, Parallel, delayed
from sklearn.datasets import load_digits
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
#
# Example: simple sqrt calculator
#
def example_simple_function(self, active_process_path):
""" simple function to calculate sqrt
"""
active_process = self.portal_activities.unrestrictedTraverse(active_process_path)
# Use CMFActivity as a backend for joblob
with parallel_backend('CMFActivity', active_process=active_process):
result = Parallel(n_jobs=2, pre_dispatch='all', timeout=30, verbose=30)(delayed(sqrt)(i**2) for i in range(5))
# Set result value and an id to the active result and post it
result = ActiveResult(result=result)
result.signature = 12345
active_process.postResult(result)
log("joblib activity result", result)
return result
#
# Example: random forest function
#
def combine(all_ensembles):
final_ensemble = copy(all_ensembles[0])
......@@ -20,7 +49,6 @@ def combine(all_ensembles):
return final_ensemble
def train_model(model, X, y, sample_weight=None, random_state=None):
model.set_params(random_state=random_state)
if sample_weight is not None:
......@@ -30,7 +58,6 @@ def train_model(model, X, y, sample_weight=None, random_state=None):
return model
def grow_ensemble(base_model, X, y, sample_weight=None, n_estimators=1,
n_jobs=1, random_state=None):
random_state = check_random_state(random_state)
......@@ -44,16 +71,15 @@ def grow_ensemble(base_model, X, y, sample_weight=None, n_estimators=1,
return combine(results)
def test_function(self, active_process_path):
from sklearn.datasets import load_digits
def example_random_forest_function(self, active_process_path):
digits = load_digits()
X_train, X_test, y_train, y_test = train_test_split(
digits.data, digits.target, random_state=0)
# Create an active process
active_process = self.portal_activities.unrestrictedTraverse(active_process_path)
# Use CMFActivity as a backend for joblib
with parallel_backend('CMFActivity', n_jobs=2, active_process=active_process):
final_model = grow_ensemble(RandomForestClassifier(), X_train, y_train,
......@@ -62,7 +88,30 @@ def test_function(self, active_process_path):
# Set result value and an id to the active result and post it
result = ActiveResult(result=score)
result.sig = 123
result.signature = 123
active_process.postResult(result)
log('ok', len(final_model.estimators_))
return 'ok', len(final_model.estimators_), score
#
# Example: grid search function
#
def example_grid_search_function(self, active_process_path):
digits = load_digits()
X, y = digits.data, digits.target
param_grid = {
'C': np.logspace(-10, 10, 3),
'gamma': np.logspace(-10, 10, 3),
'tol': [1e-4]
}
X = np.ascontiguousarray(X)
y = np.ascontiguousarray(y)
clf = GridSearchCV(SVC(), param_grid=param_grid, verbose=10)
active_process = self.portal_activities.unrestrictedTraverse(active_process_path)
tic = time.time()
with parallel_backend('CMFActivity', n_jobs=2, active_process=active_process):
clf.fit(X, y)
return 'ok', joblib.__version__, time.time() - tic
......@@ -14,7 +14,7 @@
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>joblibSimpleFunction</string> </value>
<value> <string>joblibUseCaseExamples</string> </value>
</item>
<item>
<key> <string>description</string> </key>
......@@ -24,7 +24,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>extension.erp5.joblibSimpleFunction</string> </value>
<value> <string>extension.erp5.joblibUseCaseExamples</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
......
......@@ -6,4 +6,4 @@ from Products.CMFActivity.ActiveResult import ActiveResult
result = batch_function()
return ActiveResult(result=result, sig=hash)
return ActiveResult(result=result, signature=hash)
import time
active_process = context.portal_activities.newActiveProcess()
active_process.useBTree()
active_process_id = active_process.getId()
path = active_process.getPhysicalPath()
context.portal_activities.activate(activity="SQLQueue", after_method_id="Base_callSafeFunction", active_process=active_process).Base_joblibRandomForestFunction(path)
return path
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>REQUEST=None</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>Base_driverScriptRandomForest</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
import time
active_process = context.portal_activities.newActiveProcess()
active_process.useBTree()
active_process_id = active_process.getId()
path = active_process.getPhysicalPath()
context.portal_activities.activate(activity="SQLQueue", after_method_id="Base_callSafeFunction", active_process=active_process).Base_joblibSimpleFunction(path)
return path
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>REQUEST=None</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>Base_driverScriptSquareRoot</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ExternalMethod" module="Products.ExternalMethod.ExternalMethod"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_function</string> </key>
<value> <string>test</string> </value>
</item>
<item>
<key> <string>_module</string> </key>
<value> <string>joblibGridSearch</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>Base_joblibGridSearchFunction</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ExternalMethod" module="Products.ExternalMethod.ExternalMethod"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_function</string> </key>
<value> <string>test_function</string> </value>
</item>
<item>
<key> <string>_module</string> </key>
<value> <string>joblibRandomForest</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>Base_joblibRandomForestFunction</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ExternalMethod" module="Products.ExternalMethod.ExternalMethod"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_function</string> </key>
<value> <string>test</string> </value>
</item>
<item>
<key> <string>_module</string> </key>
<value> <string>joblibSimpleFunction</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>Base_joblibSimpleFunction</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
......@@ -26,6 +26,7 @@
##############################################################################
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ExternalMethod.ExternalMethod import manage_addExternalMethod
class Test(ERP5TypeTestCase):
"""
......@@ -36,14 +37,52 @@ class Test(ERP5TypeTestCase):
return "TestJoblibUsecases"
def test_randomForest(self):
path = self.portal.Base_driverScriptRandomForest()
portal_skins = self.getPortal().portal_skins
manage_addExternalMethod(self.portal,
'Base_joblibRandomForestFunction',
'Base_joblibRandomForestFunction',
'joblibUseCaseExamples',
'example_random_forest_function')
skin_folder = getattr(portal_skins, "erp5_joblib")
skin_folder.manage_addProduct['PythonScripts'].manage_addPythonScript(id='Base_driverScriptRandomForest')
script = getattr(skin_folder, 'Base_driverScriptRandomForest')
script.ZPythonScript_edit('**kw', """import time
active_process = context.portal_activities.newActiveProcess()
active_process.useBTree()
active_process_id = active_process.getId()
path = active_process.getPhysicalPath()
context.portal_activities.activate(activity="SQLQueue", after_method_id="Base_callSafeFunction", active_process=active_process).Base_joblibRandomForestFunction(path)
return path""")
path = portal_skins.erp5_joblib.Base_driverScriptRandomForest()
self.tic(1)
active_process = self.portal.portal_activities.unrestrictedTraverse(path)
active_process = portal_skins.erp5_joblib.portal_activities.unrestrictedTraverse(path)
result = active_process.getResult(123)
self.assertEquals(0.98444444444444446, result.result)
def test_UnderRootOfSquaresFunction(self):
path = self.portal.Base_driverScriptSquareRoot()
portal_skins = self.getPortal().portal_skins
manage_addExternalMethod(self.portal,
'Base_joblibSimpleFunction',
'Base_joblibSimpleFunction',
'joblibUseCaseExamples',
'example_simple_function')
skin_folder = getattr(portal_skins, "erp5_joblib")
skin_folder.manage_addProduct['PythonScripts'].manage_addPythonScript(id='Base_driverScriptSquareRoot')
script = getattr(skin_folder, 'Base_driverScriptSquareRoot')
script.ZPythonScript_edit('**kw', """import time
active_process = context.portal_activities.newActiveProcess()
active_process.useBTree()
active_process_id = active_process.getId()
path = active_process.getPhysicalPath()
context.portal_activities.activate(activity="SQLQueue", after_method_id="Base_callSafeFunction", active_process=active_process).Base_joblibSimpleFunction(path)
return path""")
path = portal_skins.erp5_joblib.Base_driverScriptSquareRoot()
self.tic(1)
active_process = self.portal.portal_activities.unrestrictedTraverse(path)
result = active_process.getResult(12345)
......
......@@ -14,7 +14,7 @@
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>testJoblibActivityUsecases</string> </value>
<value> <string>testJoblibActivityUseCase</string> </value>
</item>
<item>
<key> <string>description</string> </key>
......@@ -24,7 +24,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>test.erp5.testJoblibActivityUsecases</string> </value>
<value> <string>test.erp5.testJoblibActivityUseCase</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
......
extension.erp5.joblibGridSearch
extension.erp5.joblibRandomForest
extension.erp5.joblibSimpleFunction
\ No newline at end of file
extension.erp5.joblibUseCaseExamples
\ No newline at end of file
test.erp5.testJoblibActivityUsecases
\ No newline at end of file
test.erp5.testJoblibActivityUseCase
\ No newline at end of file
......@@ -108,10 +108,10 @@ class ActiveProcess(Base):
self.result_list = result_list = ConflictFreeLog()
else:
if self.use_btree:
if not hasattr(result, 'sig'):
if not hasattr(result, 'signature'):
result_id = randrange(0, 10000 * (id(result) + 1))
else:
result_id = result.sig
result_id = result.signature
result_list.insert(result_id, result)
return
......@@ -126,7 +126,7 @@ class ActiveProcess(Base):
return
if self.use_btree:
signature = int(result.sig, 16)
signature = int(result.signature, 16)
result_list.insert(signature, result)
else:
result_list.append(result)
......@@ -158,7 +158,7 @@ class ActiveProcess(Base):
try:
result_list = self.result_list
result = result_list[key]
except:
except KeyError:
return None
return result
......
......@@ -44,7 +44,7 @@ from SQLDict import SQLDict
class SQLJoblib(SQLDict):
"""
XXX SQLJoblib
A simple OOBT based queue, It is non transatactional and follow always-excute paradigm.
"""
sql_table = 'message_job'
uid_group = 'portal_activity_job'
......@@ -78,7 +78,7 @@ class SQLJoblib(SQLDict):
if activity_buffer.activity_tool is None:
self.activity_tool = activity_tool
self.prepareMessage(activity_tool, message)
def prepareMessage(self, activity_tool, m):
portal = activity_tool.getPortalObject()
if m.is_registered:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment