Commit d2c88bd6 authored by Hardik Juneja's avatar Hardik Juneja

CMFActivity: Use CMFActivity as a backend for joblib

This commit:

- Adds a new Activity called "SQLJoblib"
- Adds a Backend to be used by joblib
- Uses OOBTree to store results instead of ConflictFreeLog
- Adds a getResultDict API to fetch resut Dict

It uses the original work from rafael@nexedi.com and loic.esteve@inria.fr
parent 8455256c
...@@ -32,6 +32,7 @@ from Products.CMFCore import permissions as CMFCorePermissions ...@@ -32,6 +32,7 @@ from Products.CMFCore import permissions as CMFCorePermissions
from Products.ERP5Type.Base import Base from Products.ERP5Type.Base import Base
from Products.ERP5Type import PropertySheet from Products.ERP5Type import PropertySheet
from Products.ERP5Type.ConflictFree import ConflictFreeLog from Products.ERP5Type.ConflictFree import ConflictFreeLog
from BTrees.OOBTree import OOBTree
from BTrees.Length import Length from BTrees.Length import Length
from random import randrange from random import randrange
from .ActiveResult import ActiveResult from .ActiveResult import ActiveResult
...@@ -87,6 +88,9 @@ class ActiveProcess(Base): ...@@ -87,6 +88,9 @@ class ActiveProcess(Base):
security.declareProtected(CMFCorePermissions.ManagePortal, 'postResult') security.declareProtected(CMFCorePermissions.ManagePortal, 'postResult')
def postResult(self, result): def postResult(self, result):
try:
result_id = result.id
except AttributeError:
try: try:
result_list = self.result_list result_list = self.result_list
except AttributeError: except AttributeError:
...@@ -103,6 +107,11 @@ class ActiveProcess(Base): ...@@ -103,6 +107,11 @@ class ActiveProcess(Base):
self.result_len.change(1) self.result_len.change(1)
return return
result_list.append(result) result_list.append(result)
else:
try:
self.result_dict[result_id] = result
except AttributeError:
self.result_dict = OOBTree({result.id: result})
security.declareProtected(CMFCorePermissions.ManagePortal, 'postActiveResult') security.declareProtected(CMFCorePermissions.ManagePortal, 'postActiveResult')
def postActiveResult(self, *args, **kw): def postActiveResult(self, *args, **kw):
...@@ -126,6 +135,17 @@ class ActiveProcess(Base): ...@@ -126,6 +135,17 @@ class ActiveProcess(Base):
return result_list.values() return result_list.values()
return list(result_list) return list(result_list)
security.declareProtected(CMFCorePermissions.ManagePortal, 'getResultDict')
def getResultDict(self, **kw):
"""
Returns the result Dict
"""
try:
return self.result_dict
except AttributeError:
self.result_dict = result_dict = OOBTree()
return result_dict
security.declareProtected( CMFCorePermissions.View, 'hasActivity' ) security.declareProtected( CMFCorePermissions.View, 'hasActivity' )
def hasActivity(self, **kw): def hasActivity(self, **kw):
""" """
......
##############################################################################
#
# Copyright (c) 2002,2007 Nexedi SA and Contributors. All Rights Reserved.
# Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
from SQLBase import SQLBase, sort_message_key, MAX_MESSAGE_LIST_SIZE
from Products.CMFActivity.ActivityTool import Message
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000
from SQLDict import SQLDict
class SQLJoblib(SQLDict):
"""
An extention of SQLDict, It is non transatactional and follow always-excute paradigm.
It uses a dictionary to store results and with hash of arguments as keys
"""
sql_table = 'message_job'
uid_group = 'portal_activity_job'
def initialize(self, activity_tool, clear):
"""
Initialize the message table using MYISAM Engine
"""
folder = activity_tool.getPortalObject().portal_skins.activity
try:
createMessageTable = folder.SQLJoblib_createMessageTable
except AttributeError:
return
if clear:
folder.SQLBase_dropMessageTable(table=self.sql_table)
createMessageTable()
else:
src = createMessageTable._upgradeSchema(create_if_not_exists=1,
initialize=self._initialize,
table=self.sql_table)
if src:
LOG('CMFActivity', INFO, "%r table upgraded\n%s"
% (self.sql_table, src))
def generateMessageUID(self, m):
return (tuple(m.object_path), m.method_id, m.activity_kw.get('signature'),
m.activity_kw.get('tag'), m.activity_kw.get('group_id'))
def prepareQueueMessageList(self, activity_tool, message_list):
registered_message_list = [m for m in message_list if m.is_registered]
portal = activity_tool.getPortalObject()
for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE):
message_list = registered_message_list[i:i+MAX_MESSAGE_LIST_SIZE]
uid_list = portal.portal_ids.generateNewIdList(self.uid_group,
id_count=len(message_list), id_generator='uid')
path_list = ['/'.join(m.object_path) for m in message_list]
active_process_uid_list = [m.active_process_uid for m in message_list]
method_id_list = [m.method_id for m in message_list]
priority_list = [m.activity_kw.get('priority', 1) for m in message_list]
date_list = [m.activity_kw.get('at_date') for m in message_list]
group_method_id_list = [m.getGroupId() for m in message_list]
tag_list = [m.activity_kw.get('tag', '') for m in message_list]
signature_list=[m.activity_kw.get('signature', '') for m in message_list]
serialization_tag_list = [m.activity_kw.get('serialization_tag', '')
for m in message_list]
processing_node_list = []
for m in message_list:
m.order_validation_text = x = self.getOrderValidationText(m)
processing_node_list.append(0 if x == 'none' else -1)
portal.SQLJoblib_writeMessage(
uid_list=uid_list,
path_list=path_list,
active_process_uid_list=active_process_uid_list,
method_id_list=method_id_list,
priority_list=priority_list,
message_list=map(Message.dump, message_list),
group_method_id_list=group_method_id_list,
date_list=date_list,
tag_list=tag_list,
processing_node_list=processing_node_list,
signature_list=signature_list,
serialization_tag_list=serialization_tag_list)
def getProcessableMessageLoader(self, activity_tool, processing_node):
path_and_method_id_dict = {}
def load(line):
# getProcessableMessageList already fetch messages with the same
# group_method_id, so what remains to be filtered on are path, method_id
# and signature
path = line.path
method_id = line.method_id
key = path, method_id
uid = line.uid
original_uid = path_and_method_id_dict.get(key)
if original_uid is None:
m = Message.load(line.message, uid=uid, line=line)
try:
result = activity_tool.SQLJoblib_selectDuplicatedLineList(
path=path,
method_id=method_id,
group_method_id=line.group_method_id,
signature=line.signature)
reserve_uid_list = uid_list = [x.uid for x in result]
if reserve_uid_list:
activity_tool.SQLBase_reserveMessageList(
table=self.sql_table,
processing_node=processing_node,
uid=reserve_uid_list)
except:
self._log(WARNING, 'getDuplicateMessageUidList got an exception')
raise
if uid_list:
self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list)
path_and_method_id_dict[key] = uid
return m, uid, uid_list
# We know that original_uid != uid because caller skips lines we returned
# earlier.
return None, original_uid, [uid]
return load
...@@ -288,6 +288,9 @@ class Message(BaseMessage): ...@@ -288,6 +288,9 @@ class Message(BaseMessage):
def activateResult(self, active_process, result, object): def activateResult(self, active_process, result, object):
if not isinstance(result, ActiveResult): if not isinstance(result, ActiveResult):
result = ActiveResult(result=result) result = ActiveResult(result=result)
signature = self.activity_kw.get('signature')
if signature:
result.edit(id=signature)
# XXX Allow other method_id in future # XXX Allow other method_id in future
result.edit(object_path=object, method_id=self.method_id) result.edit(object_path=object, method_id=self.method_id)
active_process.postResult(result) active_process.postResult(result)
...@@ -459,7 +462,7 @@ allow_class(GroupedMessage) ...@@ -459,7 +462,7 @@ allow_class(GroupedMessage)
# Activity Registration # Activity Registration
def activity_dict(): def activity_dict():
from Activity import SQLDict, SQLQueue from Activity import SQLDict, SQLQueue, SQLJoblib
return {k: getattr(v, k)() for k, v in locals().iteritems()} return {k: getattr(v, k)() for k, v in locals().iteritems()}
activity_dict = activity_dict() activity_dict = activity_dict()
...@@ -655,6 +658,9 @@ class ActivityTool (Folder, UniqueObject): ...@@ -655,6 +658,9 @@ class ActivityTool (Folder, UniqueObject):
for activity in activity_dict.itervalues(): for activity in activity_dict.itervalues():
activity.initialize(self, clear=False) activity.initialize(self, clear=False)
def _callSafeFunction(self, batch_function):
return batch_function()
security.declareProtected(Permissions.manage_properties, 'isSubscribed') security.declareProtected(Permissions.manage_properties, 'isSubscribed')
def isSubscribed(self): def isSubscribed(self):
""" """
......
...@@ -41,6 +41,7 @@ document_classes = updateGlobals(this_module, globals(), ...@@ -41,6 +41,7 @@ document_classes = updateGlobals(this_module, globals(),
def initialize( context ): def initialize( context ):
# Define object classes and tools # Define object classes and tools
import ActivityTool, ActiveProcess, ActivityConnection import ActivityTool, ActiveProcess, ActivityConnection
from .joblib import CMFActivityParallelBackend
object_classes = (ActiveProcess.ActiveProcess, object_classes = (ActiveProcess.ActiveProcess,
#ActivityConnection.ActivityConnection #ActivityConnection.ActivityConnection
) )
......
...@@ -63,7 +63,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. ...@@ -63,7 +63,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
</td> </td>
</dtml-if> </dtml-if>
<td align="left" valign="top"><dtml-var uid> <td align="left" valign="top"><dtml-var uid>
<dtml-var expr="{'SQLDict':'(dict)','SQLQueue':'(queue)'}[activity]"> <dtml-var expr="{'SQLDict':'(dict)','SQLQueue':'(queue)','SQLJoblib':'(Joblib)'}[activity]">
</td> </td>
<td align="left" valign="top"><a href="<dtml-var expr="REQUEST.physicalPathToURL(path)">"><dtml-var path></a></td> <td align="left" valign="top"><a href="<dtml-var expr="REQUEST.physicalPathToURL(path)">"><dtml-var path></a></td>
<td align="left" valign="top"><dtml-var method_id></td> <td align="left" valign="top"><dtml-var method_id></td>
......
##############################################################################
#
# Copyright (c) 2016 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from zLOG import LOG, INFO, WARNING
from ZODB.POSException import ConflictError
from Products.CMFActivity.ActivityRuntimeEnvironment import \
getActivityRuntimeEnvironment
try:
from sklearn.externals.joblib import register_parallel_backend
from sklearn.externals.joblib.parallel import ParallelBackendBase, parallel_backend
from sklearn.externals.joblib.parallel import FallbackToBackend, SequentialBackend
from sklearn.externals.joblib.hashing import hash as joblib_hash
except ImportError:
LOG(__name__, WARNING, "Joblib cannot be imported, support disabled")
else:
class JoblibResult(object):
def __init__(self, result, callback):
self.result = result
self.callback = callback
def get(self, timeout=None):
result = self.result.result
callback = self.callback
if callback is not None:
callback(result)
return result
class JoblibDispatch(object):
def __init__(self, backend):
self.backend = backend
def get(self, timeout=None):
backend = self.backend
def onError(exc_type, exc_value, traceback):
active_process = backend.active_process
activate = active_process.getParentValue().activate
kw = {
'active_process': active_process,
'activity': 'SQLJoblib',
'tag': "joblib_" + active_process.getId(),
}
for sig, batch in backend.job_list:
activate(signature=sig, **kw)._callSafeFunction(batch)
getActivityRuntimeEnvironment().edit(on_error_callback=onError)
raise ConflictError
class CMFActivityBackend(ParallelBackendBase):
def __init__(self, *args, **kw):
self.active_process = active_process = kw['active_process']
self.job_list = []
self.result_dict = active_process.getResultDict()
def effective_n_jobs(self, n_jobs):
"""Dummy implementation to prevent n_jobs <=0
and allow (sequential) n_jobs=1 and n_jobs != 1 (parallel) behaviour
"""
if n_jobs == 0:
raise ValueError('n_jobs == 0 in Parallel has no meaning')
return abs(n_jobs)
def apply_async(self, batch, callback=None):
"""Schedule a func to be run"""
sig = joblib_hash(batch)
result = self.result_dict.get(sig)
if result is None:
self.job_list.append((sig, batch))
return JoblibDispatch(self)
return JoblibResult(result, callback)
def configure(self, n_jobs=1, parallel=None, **backend_args):
"""Reconfigure the backend and return the number of workers. This
makes it possible to reuse an existing backend instance for successive
independent calls to Parallel with different parameters."""
if n_jobs == 1:
raise FallbackToBackend(SequentialBackend())
self.parallel = parallel
return self.effective_n_jobs(n_jobs)
def abort_everything(self, ensure_ready=True):
# All jobs will be aborted here while they are still processing our backend
if ensure_ready:
self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel,
**self.parallel._backend_args)
return
register_parallel_backend('CMFActivity', CMFActivityBackend)
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params></params>
CREATE TABLE message_job (
`uid` INT UNSIGNED NOT NULL,
`date` DATETIME NOT NULL,
`path` VARCHAR(255) NOT NULL,
`active_process_uid` INT UNSIGNED NULL,
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`processing` TINYINT NOT NULL DEFAULT 0,
`processing_date` DATETIME,
`priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`signature` BINARY(16) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`),
KEY (`path`),
KEY (`active_process_uid`),
KEY (`method_id`),
KEY `processing_node_processing` (`processing_node`, `processing`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`priority`),
KEY (`tag`)
) ENGINE=InnoDB
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
path
method_id
group_method_id
signature
</params>
SELECT uid FROM
message_job
WHERE
processing_node = 0
AND path = <dtml-sqlvar path type="string">
AND method_id = <dtml-sqlvar method_id type="string">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
AND signature = <dtml-sqlvar signature type="string">
FOR UPDATE
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
uid_list
path_list
active_process_uid_list
method_id_list
message_list
priority_list
processing_node_list
date_list
group_method_id_list
tag_list
signature_list
serialization_tag_list
</params>
INSERT INTO message_job
(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, signature, serialization_tag, message)
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(path_list))">
<dtml-if sequence-start><dtml-else>,</dtml-if>
(
<dtml-sqlvar expr="uid_list[loop_item]" type="int">,
<dtml-sqlvar expr="path_list[loop_item]" type="string">,
<dtml-sqlvar expr="active_process_uid_list[loop_item]" type="int" optional>,
<dtml-if expr="date_list is not None"><dtml-if expr="date_list[loop_item] is not None"><dtml-sqlvar expr="date_list[loop_item]" type="datetime"><dtml-else>UTC_TIMESTAMP()</dtml-if><dtml-else>UTC_TIMESTAMP()</dtml-if>,
<dtml-sqlvar expr="method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="processing_node_list[loop_item]" type="int">,
0,
<dtml-sqlvar expr="priority_list[loop_item]" type="int">,
<dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="signature_list[loop_item]" type="string">,
<dtml-sqlvar expr="serialization_tag_list[loop_item]" type="string">,
<dtml-sqlvar expr="message_list[loop_item]" type="string">
)
</dtml-in>
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment