Commit 797a8c29 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Support grouping active objects.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@3790 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent d72a36fb
......@@ -61,7 +61,8 @@ class SQLDict(RAMDict):
priority = m.activity_kw.get('priority', 1),
broadcast = m.activity_kw.get('broadcast', 0),
message = self.dumpMessage(m),
date = m.activity_kw.get('at_date', DateTime()))
date = m.activity_kw.get('at_date', DateTime()),
group_method_id = m.activity_kw.get('group_method_id', ''))
# Also store uid of activity
def prepareQueueMessageList(self, activity_tool, message_list):
......@@ -70,24 +71,28 @@ class SQLDict(RAMDict):
if message.is_registered:
registered_message_list.append(message)
if len(registered_message_list) > 0:
#LOG('SQLDict prepareQueueMessageList', 0, 'registered_message_list = %r' % (registered_message_list,))
path_list = ['/'.join(message.object_path) for message in registered_message_list]
method_id_list = [message.method_id for message in registered_message_list]
priority_list = [message.activity_kw.get('priority', 1) for message in registered_message_list]
broadcast_list = [message.activity_kw.get('broadcast', 0) for message in registered_message_list]
dumped_message_list = [self.dumpMessage(message) for message in registered_message_list]
date_list = [DateTime()] * len(registered_message_list)
datetime = DateTime()
date_list = [message.activity_kw.get('at_date', datetime) for message in registered_message_list]
group_method_id_list = [message.activity_kw.get('group_method_id', '') for message in registered_message_list]
activity_tool.SQLDict_writeMessageList( path_list = path_list,
method_id_list = method_id_list,
priority_list = priority_list,
broadcast_list = broadcast_list,
message_list = dumped_message_list,
date_list = date_list)
date_list = date_list,
group_method_id_list = group_method_id_list)
def prepareDeleteMessage(self, activity_tool, m):
# Erase all messages in a single transaction
path = '/'.join(m.object_path)
uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=m.method_id,processing_node=None)
uid_list = map(lambda x:x.uid, uid_list)
uid_list = [x.uid for x in uid_list]
if len(uid_list)>0:
activity_tool.SQLDict_delMessage(uid = uid_list)
......@@ -120,10 +125,37 @@ class SQLDict(RAMDict):
class_name = self.__class__.__name__
if hasattr(activity_buffer,'_%s_message_list' % class_name):
message_list = getattr(activity_buffer,'_%s_message_list' % class_name)
return filter(lambda m: m.is_registered, message_list)
return [m for m in message_list if m.is_registered]
else:
return ()
def validateMessage(self, activity_tool, message, uid_list, priority, next_processing_date):
validation_state = message.validate(self, activity_tool)
if validation_state is not VALID:
if validation_state in (EXCEPTION, INVALID_PATH):
# There is a serious validation error - we must lower priority
if priority > MAX_PRIORITY:
# This is an error
if len(uid_list) > 0:
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE)
# Assign message back to 'error' state
#m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else:
# Lower priority
if len(uid_list) > 0: # Add some delay before new processing
activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
priority = priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
else:
# We do not lower priority for INVALID_ORDER errors but we do postpone execution
if len(uid_list) > 0: # Add some delay before new processing
activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
priority = priority)
get_transaction().commit() # Release locks before starting a potentially long calculation
return 0
return 1
# Queue semantic
def dequeueMessage(self, activity_tool, processing_node):
if hasattr(activity_tool,'SQLDict_readMessage'):
......@@ -139,11 +171,14 @@ class SQLDict(RAMDict):
priority = None
result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority, to_date=now_date)
if len(result) > 0:
#LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result)))
line = result[0]
path = line.path
method_id = line.method_id
uid_list = activity_tool.SQLDict_readUidList( path=path, method_id= method_id, processing_node = None, to_date=now_date )
uid_list = map(lambda x:x.uid, uid_list)
uid_list = activity_tool.SQLDict_readUidList( path=path, method_id=method_id, processing_node=None, to_date=now_date )
uid_list = [x.uid for x in uid_list]
uid_list_list = [uid_list]
priority_list = [line.priority]
# Make sure message can not be processed anylonger
if len(uid_list) > 0:
# Set selected messages to processing
......@@ -151,57 +186,78 @@ class SQLDict(RAMDict):
get_transaction().commit() # Release locks before starting a potentially long calculation
# This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
m = self.loadMessage(line.message, uid = line.uid)
message_list = [m]
# Validate message (make sure object exists, priority OK, etc.)
validation_state = m.validate(self, activity_tool)
if validation_state is not VALID:
if validation_state in (EXCEPTION, INVALID_PATH):
# There is a serious validation error - we must lower priority
if line.priority > MAX_PRIORITY:
# This is an error
if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date):
group_method_id = m.activity_kw.get('group_method_id')
if group_method_id is not None:
group_method = activity_tool.restrictedTraverse(group_method_id)
# Retrieve objects which have the same group method.
result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority,
to_date=now_date, group_method_id=group_method_id)
#LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
for line in result:
path = line.path
method_id = line.method_id
uid_list = activity_tool.SQLDict_readUidList( path=path, method_id=method_id, processing_node=None, to_date=now_date )
uid_list = [x.uid for x in uid_list]
if len(uid_list) > 0:
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE)
# Assign message back to 'error' state
#m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else:
# Lower priority
if len(uid_list) > 0: # Add some delay before new processing
activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
priority = line.priority + 1)
# Set selected messages to processing
activity_tool.SQLDict_processMessage(uid = uid_list)
get_transaction().commit() # Release locks before starting a potentially long calculation
else:
# We do not lower priority for INVALID_ORDER errors but we do postpone execution
if len(uid_list) > 0: # Add some delay before new processing
activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
priority = line.priority)
get_transaction().commit() # Release locks before starting a potentially long calculation
else:
m = self.loadMessage(line.message, uid = line.uid)
if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date):
message_list.append(m)
uid_list_list.append(uid_list)
priority_list.append(line.priority)
get_transaction().commit() # Release locks before starting a potentially long calculation
# Try to invoke
activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
if m.is_executed: # Make sure message could be invoked
if len(uid_list) > 0:
activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it
get_transaction().commit() # If successful, commit
if m.active_process:
active_process = activity_tool.unrestrictedTraverse(m.active_process)
if not active_process.hasActivity():
# Not more activity
m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
if group_method_id is not None:
activity_tool.invokeGroup(group_method_id, message_list)
else:
get_transaction().abort() # If not, abort transaction and start a new one
if line.priority > MAX_PRIORITY:
# This is an error
if len(uid_list) > 0:
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
# Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
#LOG('SQLDict dequeueMessage', 0, 'invoke %s on %s' % (message_list[0].method_id, message_list[0].object_path))
activity_tool.invoke(message_list[0])
# Check if messages are executed successfully.
# When some of them are executed successfully, it may not be acceptable to
# abort the transaction, because these remain pending, only due to other
# invalid messages. This means that a group method should not be used if
# it has a side effect. For now, only indexing uses a group method, and this
# has no side effect.
for m in message_list:
if m.is_executed:
break
else:
get_transaction().abort()
for i in xrange(len(message_list)):
m = message_list[i]
uid_list = uid_list_list[i]
priority = priority_list[i]
if m.is_executed:
activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it
get_transaction().commit() # If successful, commit
if m.active_process:
active_process = activity_tool.unrestrictedTraverse(m.active_process)
if not active_process.hasActivity():
# No more activity
m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
else:
# Lower priority
if len(uid_list) > 0:
activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
if priority > MAX_PRIORITY:
# This is an error
if len(uid_list) > 0:
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
# Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else:
# Lower priority
if len(uid_list) > 0:
activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
priority = priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
return 0
get_transaction().commit() # Release locks before starting a potentially long calculation
return 1
......
......@@ -42,6 +42,7 @@ from AccessControl.SecurityManagement import newSecurityManager
import threading
import sys
from ZODB.POSException import ConflictError
from OFS.Traversable import NotFound
from zLOG import LOG
......@@ -86,34 +87,57 @@ class Message:
self.user_name = str(_getAuthenticatedUser(self))
# Store REQUEST Info ?
def getObject(self, activity_tool):
return activity_tool.unrestrictedTraverse(self.object_path)
def getObjectList(self, activity_tool):
try:
expand_method_id = self.activity_kw['expand_method_id']
except KeyError:
return [self.getObject()]
obj = self.getObject(activity_tool)
# FIXME: how to pass parameters?
return getattr(obj, expand_method_id)()
def hasExpandMethod(self):
return self.activity_kw.has_key('expand_method_id')
def changeUser(self, user_name, activity_tool):
uf = activity_tool.getPortalObject().acl_users
user = uf.getUserById(user_name)
if user is not None:
user = user.__of__(uf)
newSecurityManager(None, user)
return user
def activateResult(self, activity_tool, result, object):
if self.active_process is not None:
active_process = activity_tool.unrestrictedTraverse(self.active_process)
if isinstance(result,Error):
result.edit(object_path=object)
result.edit(method_id=self.method_id)
active_process.activateResult(result) # XXX Allow other method_id in future
else:
active_process.activateResult(Error(object_path=object,method_id=self.method_id,result=result)) # XXX Allow other method_id in future
def __call__(self, activity_tool):
try:
# LOG('WARNING ActivityTool', 0,
# 'Trying to call method %s on object %s' % (self.method_id, self.object_path))
object = activity_tool.unrestrictedTraverse(self.object_path)
object = self.getObject(activity_tool)
# Change user if required (TO BE DONE)
activity_tool._v_active_process = self.active_process # Store the active_process as volatile thread variable
# We will change the user only in order to execute this method
current_user = str(_getAuthenticatedUser(self))
uf = object.getPortalObject().acl_users
user = uf.getUserById(self.user_name)
if user is not None:
user = user.__of__(uf)
newSecurityManager(None, user)
user = self.changeUser(self.user_name, activity_tool)
result = getattr(object, self.method_id)(*self.args, **self.kw)
# Use again the previous user
if user is not None:
user = uf.getUserById(current_user).__of__(uf)
newSecurityManager(None, user)
if activity_tool._v_active_process is not None:
active_process = activity_tool.getActiveProcess()
if isinstance(result,Error):
result.edit(object_path=object)
result.edit(method_id=self.method_id)
active_process.activateResult(result) # XXX Allow other method_id in future
else:
active_process.activateResult(Error(object_path=object,method_id=self.method_id,result=result)) # XXX Allow other method_id in future
self.changeUser(current_user, activity_tool)
self.activateResult(activity_tool, result, object)
self.is_executed = 1
except ConflictError:
raise
except:
self.is_executed = 0
LOG('WARNING ActivityTool', 0,
......@@ -236,6 +260,8 @@ class ActivityTool (Folder, UniqueObject):
for activity in activity_list:
try:
activity.distribute(self, node_count)
except ConflictError:
raise
except:
LOG('CMFActivity:', 100, 'Core call to distribute failed for activity %s' % activity, error=sys.exc_info())
......@@ -280,6 +306,7 @@ class ActivityTool (Folder, UniqueObject):
try:
activity.tic(self, processing_node) # Transaction processing is the responsability of the activity
has_awake_activity = has_awake_activity or activity.isAwake(self, processing_node)
#LOG('ActivityTool tic', 0, 'has_awake_activity = %r, activity = %r, activity.isAwake(self, processing_node) = %r' % (has_awake_activity, activity, activity.isAwake(self, processing_node)))
except ConflictError:
raise
except:
......@@ -356,7 +383,65 @@ class ActivityTool (Folder, UniqueObject):
def invoke(self, message):
message(self)
def invokeGroup(self, method_id, message_list):
# Invoke a group method.
object_list = []
expanded_object_list = []
new_message_list = []
path_dict = {}
# Filter the list of messages. If an object is not available, ignore such a message.
# In addition, expand an object if necessary, and make sure that no duplication happens.
for m in message_list:
try:
obj = m.getObject(self)
object_list.append(obj)
if m.hasExpandMethod():
for obj in m.getObjectList(self):
path = obj.getPath()
if path not in path_dict:
path_dict[path] = None
expanded_object_list.append(obj)
else:
path = obj.getPath()
if path not in path_dict:
path_dict[path] = None
expanded_object_list.append(obj)
new_message_list.append(m)
except ConflictError:
raise
except:
m.is_executed = 0
LOG('WARNING ActivityTool', 0,
'Could not call method %s on object %s' % (m.method_id, m.object_path), error=sys.exc_info())
if len(expanded_object_list) > 0:
try:
method = self.unrestrictedTraverse(method_id)
# FIXME: how to pass parameters?
# FIXME: how to apply security here?
result = method(expanded_object_list)
except ConflictError:
raise
except:
for m in new_message_list:
m.is_executed = 0
LOG('WARNING ActivityTool', 0,
'Could not call method %s on objects %s' % (method_id, expanded_object_list), error=sys.exc_info())
else:
for i in xrange(len(object_list)):
object = object_list[i]
m = new_message_list[i]
try:
m.activateResult(self, result, object)
m.is_executed = 1
except ConflictError:
raise
except:
m.is_executed = 0
LOG('WARNING ActivityTool', 0,
'Could not call method %s on object %s' % (m.method_id, m.object_path), error=sys.exc_info())
def newMessage(self, activity, path, active_process, activity_kw, method_id, *args, **kw):
# Some Security Cheking should be made here XXX
global is_initialized
......@@ -412,12 +497,6 @@ class ActivityTool (Folder, UniqueObject):
def reindexObject(self):
self.immediateReindexObject()
def getActiveProcess(self):
active_process = getattr(self, '_v_active_process')
if active_process:
return self.unrestrictedTraverse(active_process)
return None
# Active synchronisation methods
def validateOrder(self, message, validator_id, validation_value):
global is_initialized
......
......@@ -9,15 +9,16 @@ class_file:
</dtml-comment>
<params></params>
CREATE TABLE `message` (
`uid` int(11) NOT NULL auto_increment,
`uid` INT UNSIGNED NOT NULL auto_increment,
`date` datetime,
`path` VARCHAR(255),
`method_id` VARCHAR(40),
`method_id` VARCHAR(255),
`processing_node` INT DEFAULT -1,
`processing` INT DEFAULT 0,
`processing` TINYINT DEFAULT 0,
`processing_date` datetime,
`priority` INT DEFAULT 0,
`broadcast` INT DEFAULT 0,
`priority` TINYINT DEFAULT 0,
`broadcast` TINYINT DEFAULT 0,
`group_method_id` VARCHAR(255) DEFAULT '',
`message` BLOB,
PRIMARY KEY (`uid`),
KEY `date` (`date`),
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1
max_rows:0
max_cache:0
cache_time:0
class_name:
......@@ -10,14 +10,23 @@ class_file:
<params>processing_node
priority
to_date
to_processing_date</params>
SELECT * FROM
to_processing_date
group_method_id</params>
SELECT DISTINCT * FROM
message
WHERE
processing <> 1
<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
<dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
<dtml-if to_date>AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
<dtml-if group_method_id>AND group_method_id = <dtml-sqlvar group_method_id type="string"> </dtml-if>
GROUP BY
path, method_id
ORDER BY
priority, date
<dtml-if group_method_id>
LIMIT 100
<dtml-else>
LIMIT 1
</dtml-if>
......@@ -13,7 +13,8 @@ message
priority
broadcast
date
processing_node=-1</params>
processing_node=-1
group_method_id</params>
INSERT INTO message
SET
path = <dtml-sqlvar path type="string">,
......@@ -23,4 +24,5 @@ SET
processing = -1,
priority = <dtml-sqlvar priority type="int">,
broadcast = <dtml-sqlvar broadcast type="int">,
group_method_id = <dtml-sqlvar group_method_id type="string">,
message = <dtml-sqlvar message type="string">
......@@ -13,20 +13,22 @@ message_list
priority_list
broadcast_list
date_list
processing_node_list</params>
processing_node_list
group_method_id_list</params>
INSERT INTO message
(path, date, method_id, processing_node, processing, priority, broadcast, message)
(path, date, method_id, processing_node, processing, priority, broadcast, group_method_id, message)
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(path_list))">
<dtml-if sequence-start><dtml-else>,</dtml-if>
(
<dtml-sqlvar expr="path_list[loop_item]" type="string">,
<dtml-if date_list><dtml-sqlvar expr="date_list[loop_item]" type="string"><dtml-else><dtml-sqlvar "_.DateTime()" type="datetime"></dtml-if>,
<dtml-if date_list><dtml-sqlvar expr="date_list[loop_item]" type="datetime"><dtml-else><dtml-sqlvar "_.DateTime()" type="datetime"></dtml-if>,
<dtml-sqlvar expr="method_id_list[loop_item]" type="string">,
<dtml-if processing_node_list><dtml-sqlvar expr="processing_node_list[loop_item]" type="int"><dtml-else>-1</dtml-if>,
-1,
<dtml-sqlvar expr="priority_list[loop_item]" type="int">,
<dtml-sqlvar expr="broadcast_list[loop_item]" type="int">,
<dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
<dtml-sqlvar expr="message_list[loop_item]" type="string">
)
</dtml-in>
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment