Commit 5cf1ee58 authored by Hardik Juneja's avatar Hardik Juneja

progress

parent a577d8f0
...@@ -5,14 +5,14 @@ import time ...@@ -5,14 +5,14 @@ import time
from math import sqrt from math import sqrt
def abc(num): def abc(num):
time.sleep(5) time.sleep(2)
return sqrt(num) return sqrt(num)
def test(self, active_process_path): def test(self, active_process_path):
active_process = self.portal_activities.unrestrictedTraverse(active_process_path) active_process = self.portal_activities.unrestrictedTraverse(active_process_path)
with parallel_backend('CMFActivity', active_process=active_process): with parallel_backend('CMFActivity', active_process=active_process):
result = Parallel(n_jobs=2, timeout=30, verbose=30)(delayed(abc)(i**2) for i in range(4)) result = Parallel(n_jobs=2, pre_dispatch='all', timeout=30, verbose=30)(delayed(abc)(i**2) for i in range(20))
log("I am here", result) log("I am here", result)
return result return result
\ No newline at end of file
...@@ -24,8 +24,8 @@ def test(self, active_process_path): ...@@ -24,8 +24,8 @@ def test(self, active_process_path):
y = np.ascontiguousarray(y) y = np.ascontiguousarray(y)
clf = GridSearchCV(SVC(), param_grid=param_grid, verbose=10) clf = GridSearchCV(SVC(), param_grid=param_grid, verbose=10)
active_process = self.portal_activities.unrestrictedTraverse(active_process_path) active_process = self.portal_activities.unrestrictedTraverse(active_process_path)
with parallel_backend('CMFActivity', n_jobs=2, active_process=active_process):
tic = time.time() tic = time.time()
with parallel_backend('CMFActivity', n_jobs=2, active_process=active_process):
clf.fit(X, y) clf.fit(X, y)
log("I am here", time.time()-tic) log("I am here", time.time()-tic)
return 'ok', sklearn.__version__, joblib.__version__, time.time() - tic return 'ok', sklearn.__version__, joblib.__version__, time.time() - tic
...@@ -5,7 +5,8 @@ from Products.ERP5Type.Log import log ...@@ -5,7 +5,8 @@ from Products.ERP5Type.Log import log
timeout = 10 timeout = 10
active_process = context.portal_activities.newActiveProcess() active_process = context.portal_activities.newActiveProcess()
active_process.useBTree() active_process.useBTree()
active_process_id = active_process.getId() active_process_id = active_process.getId()
path = active_process.getPhysicalPath() path = active_process.getPhysicalPath()
context.portal_activities.activate(activity="SQLQueue", after_method_id="Base_callSafeFunction", active_process=active_process, tag='abc').Base_joblibGridSearchFunction(path) context.portal_activities.activate(activity="SQLQueue", after_method_id="Base_callSafeFunction", active_process=active_process, tag='abc').Base_joblibRandomForestFunction(path)
return path return path
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ExternalMethod" module="Products.ExternalMethod.ExternalMethod"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_function</string> </key>
<value> <string>test_function</string> </value>
</item>
<item>
<key> <string>_module</string> </key>
<value> <string>joblibRandomForest</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>Base_joblibRandomForestFunction</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
...@@ -124,7 +124,8 @@ class ActiveProcess(Base): ...@@ -124,7 +124,8 @@ class ActiveProcess(Base):
return return
if self.use_btree: if self.use_btree:
result_list.insert(result.sig, result) signature = int(result.sig, 16)
result_list.insert(signature, result)
else: else:
result_list.append(result) result_list.append(result)
......
...@@ -53,7 +53,6 @@ from SQLDict import SQLDict ...@@ -53,7 +53,6 @@ from SQLDict import SQLDict
class MyBatchedSignature(object): class MyBatchedSignature(object):
"""Create hashable signature""" """Create hashable signature"""
def __init__(self, batch): def __init__(self, batch):
#LOG('CMFActivity', INFO, batch.items)
items = batch.items[0] items = batch.items[0]
self.func = items[0].__name__ self.func = items[0].__name__
self.args = items[1] self.args = items[1]
...@@ -108,8 +107,6 @@ class SQLJoblib(SQLDict): ...@@ -108,8 +107,6 @@ class SQLJoblib(SQLDict):
if m.is_registered: if m.is_registered:
uid = portal.portal_ids.generateNewIdList(self.uid_group, uid = portal.portal_ids.generateNewIdList(self.uid_group,
id_count=1, id_generator='uid')[0] id_count=1, id_generator='uid')[0]
#import pdb; pdb.set_trace()
LOG("CMFActivityBackendEntered", INFO, m.activity_kw.get('signature', 0))
m.order_validation_text = x = self.getOrderValidationText(m) m.order_validation_text = x = self.getOrderValidationText(m)
processing_node = (0 if x == 'none' else -1) processing_node = (0 if x == 'none' else -1)
portal.SQLJoblib_writeMessage( portal.SQLJoblib_writeMessage(
...@@ -123,7 +120,7 @@ class SQLJoblib(SQLDict): ...@@ -123,7 +120,7 @@ class SQLJoblib(SQLDict):
group_method_id=m.getGroupId(), group_method_id=m.getGroupId(),
date=m.activity_kw.get('at_date'), date=m.activity_kw.get('at_date'),
tag=m.activity_kw.get('tag', ''), tag=m.activity_kw.get('tag', ''),
signature=m.activity_kw.get('signature', 0), signature=m.activity_kw.get('signature', ''),
processing_node=processing_node, processing_node=processing_node,
serialization_tag=m.activity_kw.get('serialization_tag', '')) serialization_tag=m.activity_kw.get('serialization_tag', ''))
...@@ -150,7 +147,6 @@ class SQLJoblib(SQLDict): ...@@ -150,7 +147,6 @@ class SQLJoblib(SQLDict):
signature=signature) signature=signature)
reserve_uid_list = uid_list = [x.uid for x in result] reserve_uid_list = uid_list = [x.uid for x in result]
if reserve_uid_list: if reserve_uid_list:
LOG("CMFActivityBackendMarked", INFO, signature, uid_list)
activity_tool.SQLJoblib_reserveDuplicatedLineList( activity_tool.SQLJoblib_reserveDuplicatedLineList(
processing_node=processing_node, uid=reserve_uid_list) processing_node=processing_node, uid=reserve_uid_list)
except: except:
...@@ -210,13 +206,13 @@ class SQLJoblib(SQLDict): ...@@ -210,13 +206,13 @@ class SQLJoblib(SQLDict):
if result: if result:
load = self.getProcessableMessageLoader(activity_tool, processing_node) load = self.getProcessableMessageLoader(activity_tool, processing_node)
m, uid, uid_list = load(result[0]) m, uid, uid_list = load(result[0])
LOG("CMFActivityBackendExecuting", INFO, m.signature)
# This handles cases wehre the result has been already calculated # This handles cases wehre the result has been already calculated
# but the duplicate message(s) somehow landed in the queue, # but the duplicate message(s) somehow landed in the queue,
# we should not execute these messages and its duplicates again # we should not execute these messages and its duplicates again
# hence just delete them. # hence just delete them.
active_process = activity_tool.unrestrictedTraverse(m.active_process) active_process = activity_tool.unrestrictedTraverse(m.active_process)
if active_process.getResult(m.signature): sigint = int(m.signature, 16) % (10 ** 16)
if active_process.getResult(sigint):
uid_list.append(uid) uid_list.append(uid)
LOG("CMFActivityBackendDeleting", INFO, m.signature) LOG("CMFActivityBackendDeleting", INFO, m.signature)
self.finalizeMessageExecution(activity_tool, [], None, uid_list) self.finalizeMessageExecution(activity_tool, [], None, uid_list)
......
...@@ -36,6 +36,7 @@ from ZODB.POSException import ConflictError ...@@ -36,6 +36,7 @@ from ZODB.POSException import ConflictError
try: try:
from sklearn.externals.joblib import register_parallel_backend from sklearn.externals.joblib import register_parallel_backend
from sklearn.externals.joblib.hashing import hash
from sklearn.externals.joblib.parallel import ParallelBackendBase, parallel_backend from sklearn.externals.joblib.parallel import ParallelBackendBase, parallel_backend
from sklearn.externals.joblib.parallel import FallbackToBackend, SequentialBackend from sklearn.externals.joblib.parallel import FallbackToBackend, SequentialBackend
from sklearn.externals.joblib._parallel_backends import SafeFunction from sklearn.externals.joblib._parallel_backends import SafeFunction
...@@ -123,15 +124,15 @@ if ENABLE_JOBLIB: ...@@ -123,15 +124,15 @@ if ENABLE_JOBLIB:
portal_activities = self.active_process.portal_activities portal_activities = self.active_process.portal_activities
active_process_id = self.active_process.getId() active_process_id = self.active_process.getId()
joblib_result = None joblib_result = None
sig = make_hash(batch.items[0]) sig = hash(batch.items[0])
sigint = int(sig, 16) % (10 ** 16)
if not self.active_process.getResult(sig): if not self.active_process.getResult(sigint):
joblib_result = portal_activities.activate(activity='SQLJoblib', joblib_result = portal_activities.activate(activity='SQLJoblib',
tag="joblib_%s" % active_process_id, tag="joblib_%s" % active_process_id,
signature=sig, signature=sig,
active_process=self.active_process).Base_callSafeFunction(sig, MySafeFunction(batch)) active_process=self.active_process).Base_callSafeFunction(sigint, MySafeFunction(batch))
if joblib_result is None: if joblib_result is None:
joblib_result = CMFActivityResult(self.active_process, sig, callback) joblib_result = CMFActivityResult(self.active_process, sigint, callback)
return joblib_result return joblib_result
def configure(self, n_jobs=1, parallel=None, **backend_args): def configure(self, n_jobs=1, parallel=None, **backend_args):
......
...@@ -20,7 +20,7 @@ CREATE TABLE <dtml-var table> ( ...@@ -20,7 +20,7 @@ CREATE TABLE <dtml-var table> (
`priority` TINYINT NOT NULL DEFAULT 0, `priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '', `group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL, `tag` VARCHAR(255) NOT NULL,
`signature` BIGINT NOT NULL, `signature` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL, `serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0, `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`message` LONGBLOB NOT NULL, `message` LONGBLOB NOT NULL,
......
...@@ -20,5 +20,5 @@ WHERE ...@@ -20,5 +20,5 @@ WHERE
AND path = <dtml-sqlvar path type="string"> AND path = <dtml-sqlvar path type="string">
AND method_id = <dtml-sqlvar method_id type="string"> AND method_id = <dtml-sqlvar method_id type="string">
AND group_method_id = <dtml-sqlvar group_method_id type="string"> AND group_method_id = <dtml-sqlvar group_method_id type="string">
AND signature = <dtml-sqlvar signature type="int"> AND signature = <dtml-sqlvar signature type="string">
FOR UPDATE FOR UPDATE
...@@ -35,7 +35,7 @@ VALUES ...@@ -35,7 +35,7 @@ VALUES
<dtml-sqlvar expr="priority" type="int">, <dtml-sqlvar expr="priority" type="int">,
<dtml-sqlvar expr="group_method_id" type="string">, <dtml-sqlvar expr="group_method_id" type="string">,
<dtml-sqlvar expr="tag" type="string">, <dtml-sqlvar expr="tag" type="string">,
<dtml-sqlvar expr="signature" type="int">, <dtml-sqlvar expr="signature" type="string">,
<dtml-sqlvar expr="serialization_tag" type="string">, <dtml-sqlvar expr="serialization_tag" type="string">,
<dtml-sqlvar expr="message" type="string"> <dtml-sqlvar expr="message" type="string">
) )
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment