Commit eb5b8287 authored by Hardik's avatar Hardik Committed by Hardik Juneja

CMFActivity: Remove transactions and sleep and use conflict error instead

parent 49e3a5dd
...@@ -43,7 +43,7 @@ INVALID_ORDER = 2 ...@@ -43,7 +43,7 @@ INVALID_ORDER = 2
# Time global parameters # Time global parameters
MAX_PROCESSING_TIME = 900 # in seconds MAX_PROCESSING_TIME = 900 # in seconds
VALIDATION_ERROR_DELAY = 15 # in seconds VALIDATION_ERROR_DELAY = 1 # in seconds
class Queue(object): class Queue(object):
""" """
......
...@@ -117,6 +117,11 @@ class SQLBase(Queue): ...@@ -117,6 +117,11 @@ class SQLBase(Queue):
" The following added columns could not be initialized: %s" " The following added columns could not be initialized: %s"
% (self.sql_table, ", ".join(column_list))) % (self.sql_table, ", ".join(column_list)))
def register(self, activity_buffer, activity_tool, message):
activity_buffer.register(activity_tool)
assert not message.is_registered, message
self.registerMessage(activity_buffer, activity_tool, message)
def prepareQueueMessageList(self, activity_tool, message_list): def prepareQueueMessageList(self, activity_tool, message_list):
registered_message_list = [m for m in message_list if m.is_registered] registered_message_list = [m for m in message_list if m.is_registered]
portal = activity_tool.getPortalObject() portal = activity_tool.getPortalObject()
...@@ -134,6 +139,7 @@ class SQLBase(Queue): ...@@ -134,6 +139,7 @@ class SQLBase(Queue):
serialization_tag_list = [m.activity_kw.get('serialization_tag', '') serialization_tag_list = [m.activity_kw.get('serialization_tag', '')
for m in message_list] for m in message_list]
processing_node_list = [] processing_node_list = []
for m in message_list: for m in message_list:
m.order_validation_text = x = self.getOrderValidationText(m) m.order_validation_text = x = self.getOrderValidationText(m)
processing_node_list.append(0 if x == 'none' else -1) processing_node_list.append(0 if x == 'none' else -1)
......
...@@ -26,8 +26,75 @@ ...@@ -26,8 +26,75 @@
# #
############################################################################## ##############################################################################
# XXX: Note from Rafael
# only reimplment the minimal, and only custom the SQL that update this table.
# Always check if things are there (ie.: If the connection, or the script are present).
import copy
import hashlib
import sys
import transaction
from functools import total_ordering
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
from zExceptions import ExceptionFormatter
from ZODB.POSException import ConflictError
from SQLBase import SQLBase, sort_message_key
from Products.CMFActivity.ActivityTool import Message
from Products.CMFActivity.ActivityTool import (
Message, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, SkippedMessage)
from Products.CMFActivity.ActivityRuntimeEnvironment import (
DEFAULT_MAX_RETRY, ActivityRuntimeEnvironment, getTransactionalVariable)
from Queue import Queue, VALIDATION_ERROR_DELAY, VALID, INVALID_PATH
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000
_DequeueMessageException = Exception()
from SQLDict import SQLDict from SQLDict import SQLDict
# this is improvisation of
# http://stackoverflow.com/questions/5884066/hashing-a-python-dictionary/8714242#8714242
def make_hash(o):
"""
Makes a hash from a dictionary, list, tuple or set to any level, that contains
only other hashable types (including any lists, tuples, sets, and
dictionaries).
"""
if isinstance(o, (set, tuple, list)):
return hash(tuple([make_hash(e) for e in o]))
elif not isinstance(o, dict):
try:
return hash(o)
except TypeError:
return hash(int(hashlib.md5(o).hexdigest(), 16))
new_o = copy.deepcopy(o)
for k, v in new_o.items():
new_o[k] = make_hash(v)
return hash(tuple(frozenset(sorted(new_o.items()))))
@total_ordering
class MyBatchedSignature(object):
"""Create hashable signature"""
def __init__(self, batch):
#LOG('CMFActivity', INFO, batch.items)
items = batch.items[0]
self.func = items[0].__name__
self.args = items[1]
self.kwargs = items[2]
def __eq__(self, other):
return (self.func, self.args) == (other.func, other.args)
def __lt__(self, other):
return (self.func, self.args) < (other.func, other.args)
class SQLJoblib(SQLDict): class SQLJoblib(SQLDict):
""" """
XXX SQLJoblib XXX SQLJoblib
...@@ -35,3 +102,131 @@ class SQLJoblib(SQLDict): ...@@ -35,3 +102,131 @@ class SQLJoblib(SQLDict):
sql_table = 'message_job' sql_table = 'message_job'
uid_group = 'portal_activity_job' uid_group = 'portal_activity_job'
def initialize(self, activity_tool, clear):
"""
Initialize the message table using MYISAM Engine
"""
folder = activity_tool.getPortalObject().portal_skins.activity
try:
createMessageTable = folder.SQLJoblib_createMessageTable
except AttributeError:
return
if clear:
folder.SQLBase_dropMessageTable(table=self.sql_table)
createMessageTable(table=self.sql_table)
else:
src = createMessageTable._upgradeSchema(create_if_not_exists=1,
initialize=self._initialize,
table=self.sql_table)
if src:
LOG('CMFActivity', INFO, "%r table upgraded\n%s"
% (self.sql_table, src))
def register(self, activity_buffer, activity_tool, message):
"""
Send message to mysql directly
"""
assert not message.is_registered, message
message.is_registered = True
if activity_buffer.activity_tool is None:
self.activity_tool = activity_tool
self.prepareMessage(activity_tool, message)
def prepareMessage(self, activity_tool, m):
portal = activity_tool.getPortalObject()
if m.is_registered:
uid = portal.portal_ids.generateNewIdList(self.uid_group,
id_count=1, id_generator='uid')[0]
#import pdb; pdb.set_trace()
m.order_validation_text = x = self.getOrderValidationText(m)
processing_node = (0 if x == 'none' else -1)
portal.SQLJoblib_writeMessage(
table=self.sql_table,
uid=uid,
path='/'.join(m.object_path),
active_process_uid=m.active_process_uid,
method_id=m.method_id,
priority=m.activity_kw.get('priority', 1),
message=Message.dump(m),
group_method_id=m.getGroupId(),
date=m.activity_kw.get('at_date'),
tag=m.activity_kw.get('tag', ''),
processing_node=processing_node,
serialization_tag=m.activity_kw.get('serialization_tag', ''))
# Queue semantic
def dequeueMessage(self, activity_tool, processing_node):
message_list, group_method_id, uid_to_duplicate_uid_list_dict = \
self.getProcessableMessageList(activity_tool, processing_node)
if message_list:
# Remove group_id parameter from group_method_id
if group_method_id is not None:
group_method_id = group_method_id.split('\0')[0]
if group_method_id not in (None, ""):
method = activity_tool.invokeGroup
args = (group_method_id, message_list, self.__class__.__name__,
hasattr(self, 'generateMessageUID'))
activity_runtime_environment = ActivityRuntimeEnvironment(None)
else:
method = activity_tool.invoke
message = message_list[0]
args = (message, )
activity_runtime_environment = ActivityRuntimeEnvironment(message)
# Commit right before executing messages.
# As MySQL transaction does not start exactly at the same time as ZODB
# transactions but a bit later, messages available might be called
# on objects which are not available - or available in an old
# version - to ZODB connector.
# So all connectors must be committed now that we have selected
# everything needed from MySQL to get a fresh view of ZODB objects.
transaction.commit()
transaction.begin()
tv = getTransactionalVariable()
tv['activity_runtime_environment'] = activity_runtime_environment
# Try to invoke
try:
method(*args)
# Abort if at least 1 message failed. On next tic, only those that
# succeeded will be selected because their at_date won't have been
# increased.
for m in message_list:
if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
raise _DequeueMessageException
transaction.commit()
for m in message_list:
if m.getExecutionState() == MESSAGE_EXECUTED:
transaction.begin()
# Create a signature and then store result into the dict
signature = MyBatchedSignature(m.args[0].batch)
# get active process
active_process = activity_tool.unrestrictedTraverse(m.active_process)
active_process.process_result_map.update({signature: m.result})
transaction.commit()
except:
exc_info = sys.exc_info()
if exc_info[1] is not _DequeueMessageException:
self._log(WARNING,
'Exception raised when invoking messages (uid, path, method_id) %r'
% [(m.uid, m.object_path, m.method_id) for m in message_list])
for m in message_list:
m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
self._abort()
exc_info = message_list[0].exc_info
if exc_info:
try:
# Register it again.
tv['activity_runtime_environment'] = activity_runtime_environment
cancel = message.on_error_callback(*exc_info)
del exc_info, message.exc_info
transaction.commit()
if cancel:
message.setExecutionState(MESSAGE_EXECUTED)
except:
self._log(WARNING, 'Exception raised when processing error callbacks')
message.setExecutionState(MESSAGE_NOT_EXECUTED)
self._abort()
self.finalizeMessageExecution(activity_tool, message_list,
uid_to_duplicate_uid_list_dict)
transaction.commit()
return not message_list
\ No newline at end of file
...@@ -48,7 +48,7 @@ class ActivityBuffer(TM): ...@@ -48,7 +48,7 @@ class ActivityBuffer(TM):
def getUidSet(self, activity): def getUidSet(self, activity):
return self.uid_set_dict[activity] return self.uid_set_dict[activity]
def _register(self, activity_tool): def register(self, activity_tool):
TM._register(self) TM._register(self)
if self.activity_tool is None: if self.activity_tool is None:
self.activity_tool = activity_tool self.activity_tool = activity_tool
...@@ -70,9 +70,7 @@ class ActivityBuffer(TM): ...@@ -70,9 +70,7 @@ class ActivityBuffer(TM):
raise raise
def deferredQueueMessage(self, activity_tool, activity, message): def deferredQueueMessage(self, activity_tool, activity, message):
self._register(activity_tool) activity.register(self, activity_tool, message)
assert not message.is_registered, message
activity.registerMessage(self, activity_tool, message)
def sortKey(self, *ignored): def sortKey(self, *ignored):
"""Activities must be finished before databases commit transactions.""" """Activities must be finished before databases commit transactions."""
......
...@@ -30,7 +30,9 @@ import sys ...@@ -30,7 +30,9 @@ import sys
import time import time
import transaction import transaction
from BTrees.OOBTree import OOBTree
from zLOG import LOG, INFO, WARNING from zLOG import LOG, INFO, WARNING
from ZODB.POSException import ConflictError
try: try:
from sklearn.externals.joblib import register_parallel_backend from sklearn.externals.joblib import register_parallel_backend
...@@ -43,12 +45,17 @@ except ImportError: ...@@ -43,12 +45,17 @@ except ImportError:
LOG("CMFActivityBackend", WARNING, "CLASS NOT LOADED!!!") LOG("CMFActivityBackend", WARNING, "CLASS NOT LOADED!!!")
ENABLE_JOBLIB = False ENABLE_JOBLIB = False
from Activity.SQLJoblib import MyBatchedSignature
if ENABLE_JOBLIB: if ENABLE_JOBLIB:
class MySafeFunction(SafeFunction): class MySafeFunction(SafeFunction):
"""Wrapper around a SafeFunction that catches any exception """Wrapper around a SafeFunction that catches any exception
The exception can be handled in CMFActivityResult.get The exception can be handled in CMFActivityResult.get
""" """
def __init__(self, *args, **kwargs):
super(MySafeFunction, self).__init__(*args, **kwargs)
self.batch = args[0]
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
try: try:
return super(MySafeFunction, self).__call__(*args, **kwargs) return super(MySafeFunction, self).__call__(*args, **kwargs)
...@@ -56,10 +63,13 @@ if ENABLE_JOBLIB: ...@@ -56,10 +63,13 @@ if ENABLE_JOBLIB:
return exc return exc
class CMFActivityResult(object): class CMFActivityResult(object):
def __init__(self, active_process, callback): def __init__(self, active_process, active_process_sig, callback):
self.active_process = active_process self.active_process = active_process
self.active_process_sig = active_process_sig
self.callback = callback self.callback = callback
def get(self, timeout=None): def get(self, timeout=None):
'''
while not self.active_process.getResultList(): while not self.active_process.getResultList():
time.sleep(1) time.sleep(1)
if timeout is not None: if timeout is not None:
...@@ -67,7 +77,12 @@ if ENABLE_JOBLIB: ...@@ -67,7 +77,12 @@ if ENABLE_JOBLIB:
if timeout < 0: if timeout < 0:
raise RuntimeError('Timeout reached') raise RuntimeError('Timeout reached')
transaction.commit() transaction.commit()
result = self.active_process.getResultList()[0].result '''
if self.active_process.process_result_map[self.active_process_sig] is None:
raise ConflictError
result = self.active_process.process_result_map[self.active_process_sig]
# TODO raise before or after the callback? # TODO raise before or after the callback?
if isinstance(result, Exception): if isinstance(result, Exception):
raise result raise result
...@@ -77,7 +92,12 @@ if ENABLE_JOBLIB: ...@@ -77,7 +92,12 @@ if ENABLE_JOBLIB:
class CMFActivityBackend(ParallelBackendBase): class CMFActivityBackend(ParallelBackendBase):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.zope_context = kwargs['zope_context'] self.count = 1
self.active_process = kwargs['active_process']
if not hasattr(self.active_process, 'process_result_map'):
self.active_process.process_result_map = OOBTree()
transaction.commit()
def effective_n_jobs(self, n_jobs): def effective_n_jobs(self, n_jobs):
"""Dummy implementation to prevent n_jobs <=0 """Dummy implementation to prevent n_jobs <=0
...@@ -86,24 +106,23 @@ if ENABLE_JOBLIB: ...@@ -86,24 +106,23 @@ if ENABLE_JOBLIB:
if n_jobs == 0: if n_jobs == 0:
raise ValueError('n_jobs == 0 in Parallel has no meaning') raise ValueError('n_jobs == 0 in Parallel has no meaning')
return abs(n_jobs) return abs(n_jobs)
def apply_async(self, batch, callback=None): def apply_async(self, batch, callback=None):
"""Schedule a func to be run""" """Schedule a func to be run"""
portal_activities = self.zope_context.portal_activities portal_activities = self.active_process.portal_activities
active_process_id = self.active_process.getId()
# the creation of activitiy process here, might be removed. joblib_result = None
active_process = portal_activities.newActiveProcess()
sig = MyBatchedSignature(batch)
# SQLJoblib == JoblibActivity if not self.active_process.process_result_map.has_key(sig):
joblib_result = portal_activities.activate(activity='SQLQueue', self.active_process.process_result_map.insert(sig, None)
active_process=active_process).Base_callSafeFunction(MySafeFunction(batch)) joblib_result = portal_activities.activate(activity='SQLJoblib',
tag="joblib_%s" % active_process_id,
# While activate() don't return the joblib_result active_process=self.active_process).Base_callSafeFunction(MySafeFunction(batch))
if joblib_result is None: if joblib_result is None:
# Transaction commit, is a code crime. joblib_result = CMFActivityResult(self.active_process, sig, callback)
transaction.commit()
joblib_result = CMFActivityResult(active_process, callback)
return joblib_result return joblib_result
def configure(self, n_jobs=1, parallel=None, **backend_args): def configure(self, n_jobs=1, parallel=None, **backend_args):
"""Reconfigure the backend and return the number of workers. This """Reconfigure the backend and return the number of workers. This
makes it possible to reuse an existing backend instance for successive makes it possible to reuse an existing backend instance for successive
...@@ -115,9 +134,20 @@ if ENABLE_JOBLIB: ...@@ -115,9 +134,20 @@ if ENABLE_JOBLIB:
raise FallbackToBackend(SequentialBackend()) raise FallbackToBackend(SequentialBackend())
self.parallel = parallel self.parallel = parallel
# self.zope_context = backend_args['zope_context']
return self.effective_n_jobs(n_jobs) return self.effective_n_jobs(n_jobs)
def abort_everything(self, ensure_ready=True):
# All jobs will be aborted here while they are still processing our backend
# remove job with no results
#self.active_process.process_result_map = dict((k, v)
# for k, v in self.active_process.process_result_map.iteritems() if v)
transaction.commit()
if ensure_ready:
self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel,
**self.parallel._backend_args)
return
register_parallel_backend('CMFActivity', CMFActivityBackend) register_parallel_backend('CMFActivity', CMFActivityBackend)
else: else:
......
...@@ -177,6 +177,7 @@ class Message(BaseMessage): ...@@ -177,6 +177,7 @@ class Message(BaseMessage):
self.method_id = method_id self.method_id = method_id
self.args = args self.args = args
self.kw = kw self.kw = kw
self.result = None
if getattr(portal_activities, 'activity_creation_trace', False): if getattr(portal_activities, 'activity_creation_trace', False):
# Save current traceback, to make it possible to tell where a message # Save current traceback, to make it possible to tell where a message
# was generated. # was generated.
...@@ -315,12 +316,12 @@ class Message(BaseMessage): ...@@ -315,12 +316,12 @@ class Message(BaseMessage):
result = method(*self.args, **self.kw) result = method(*self.args, **self.kw)
finally: finally:
setSecurityManager(old_security_manager) setSecurityManager(old_security_manager)
if method is not None: if method is not None:
if self.active_process and result is not None: if self.active_process and result is not None:
self.activateResult( self.activateResult(
activity_tool.unrestrictedTraverse(self.active_process), activity_tool.unrestrictedTraverse(self.active_process),
result, obj) result, obj)
self.result = result
self.setExecutionState(MESSAGE_EXECUTED) self.setExecutionState(MESSAGE_EXECUTED)
except: except:
self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool) self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
...@@ -503,10 +504,11 @@ class Method(object): ...@@ -503,10 +504,11 @@ class Method(object):
request=self._request, request=self._request,
portal_activities=portal_activities, portal_activities=portal_activities,
) )
if portal_activities.activity_tracking:
activity_tracking_logger.info('queuing message: activity=%s, object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % (self._activity, '/'.join(m.object_path), m.method_id, m.args, m.kw, m.activity_kw, m.user_name))
portal_activities.getActivityBuffer().deferredQueueMessage( portal_activities.getActivityBuffer().deferredQueueMessage(
portal_activities, activity_dict[self._activity], m) portal_activities, activity_dict[self._activity], m)
if portal_activities.activity_tracking and m.is_registered:
activity_tracking_logger.info('queuing message: activity=%s, object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % (self._activity, '/'.join(m.object_path), m.method_id, m.args, m.kw, m.activity_kw, m.user_name))
allow_class(Method) allow_class(Method)
...@@ -1063,7 +1065,6 @@ class ActivityTool (Folder, UniqueObject): ...@@ -1063,7 +1065,6 @@ class ActivityTool (Folder, UniqueObject):
processing_node starts from 1 (there is not node 0) processing_node starts from 1 (there is not node 0)
""" """
global active_threads global active_threads
# return if the number of threads is too high # return if the number of threads is too high
# else, increase the number of active_threads and continue # else, increase the number of active_threads and continue
tic_lock.acquire() tic_lock.acquire()
......
...@@ -63,7 +63,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. ...@@ -63,7 +63,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
</td> </td>
</dtml-if> </dtml-if>
<td align="left" valign="top"><dtml-var uid> <td align="left" valign="top"><dtml-var uid>
<dtml-var expr="{'SQLDict':'(dict)','SQLQueue':'(queue)'}[activity]"> <dtml-var expr="{'SQLDict':'(dict)','SQLQueue':'(queue)','SQLJoblib':'(Joblib)'}[activity]">
</td> </td>
<td align="left" valign="top"><a href="<dtml-var expr="REQUEST.physicalPathToURL(path)">"><dtml-var path></a></td> <td align="left" valign="top"><a href="<dtml-var expr="REQUEST.physicalPathToURL(path)">"><dtml-var path></a></td>
<td align="left" valign="top"><dtml-var method_id></td> <td align="left" valign="top"><dtml-var method_id></td>
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table</params>
CREATE TABLE <dtml-var table> (
`uid` INT UNSIGNED NOT NULL,
`date` DATETIME NOT NULL,
`path` VARCHAR(255) NOT NULL,
`active_process_uid` INT UNSIGNED NULL,
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`processing` TINYINT NOT NULL DEFAULT 0,
`processing_date` DATETIME,
`priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`),
KEY (`path`),
KEY (`active_process_uid`),
KEY (`method_id`),
KEY `processing_node_processing` (`processing_node`, `processing`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`priority`),
KEY (`tag`)
) ENGINE=MYISAM
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
uid
path
active_process_uid
method_id
message
priority
processing_node
date
group_method_id
tag
serialization_tag
</params>
INSERT INTO <dtml-var table>
(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, message)
VALUES
(
<dtml-sqlvar expr="uid" type="int">,
<dtml-sqlvar expr="path" type="string">,
<dtml-sqlvar expr="active_process_uid" type="int" optional>,
<dtml-if expr="date is not None"><dtml-if expr="date is not None"><dtml-sqlvar expr="date" type="datetime"><dtml-else>UTC_TIMESTAMP()</dtml-if><dtml-else>UTC_TIMESTAMP()</dtml-if>,
<dtml-sqlvar expr="method_id" type="string">,
<dtml-sqlvar expr="processing_node" type="int">,
0,
<dtml-sqlvar expr="priority" type="int">,
<dtml-sqlvar expr="group_method_id" type="string">,
<dtml-sqlvar expr="tag" type="string">,
<dtml-sqlvar expr="serialization_tag" type="string">,
<dtml-sqlvar expr="message" type="string">
)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment