Commit 5badc64c authored by Jean-Paul Smets's avatar Jean-Paul Smets

implementation of time parameters and validation order methods. Only SQLDict...

implementation of time parameters and validation order methods. Only SQLDict supports these extensions at this time


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@1290 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent e8132078
...@@ -47,6 +47,24 @@ class ActiveObject(ExtensionClass.Base): ...@@ -47,6 +47,24 @@ class ActiveObject(ExtensionClass.Base):
security = ClassSecurityInfo() security = ClassSecurityInfo()
def activate(self, activity=DEFAULT_ACTIVITY, active_process=None, passive_commit=0, **kw): def activate(self, activity=DEFAULT_ACTIVITY, active_process=None, passive_commit=0, **kw):
"""
Reserved Optional parameters
at_date -- request execution date for this activate call
after_method_id -- never validate message if after_method_id
is in the list of methods which are
going to be executed
after_message_uid -- never validate message if after_message_uid
is in the list of messages which are
going to be executed
after_path -- never validate message if after_path
is in the list of path which are
going to be executed
"""
activity_tool = getattr(self, 'portal_activities', None) activity_tool = getattr(self, 'portal_activities', None)
if activity_tool is None: return self # Do nothing if no portal_activities if activity_tool is None: return self # Do nothing if no portal_activities
# activate returns an ActiveWrapper # activate returns an ActiveWrapper
......
...@@ -26,7 +26,7 @@ ...@@ -26,7 +26,7 @@
# #
############################################################################## ##############################################################################
import pickle import pickle, sys
from Acquisition import aq_base from Acquisition import aq_base
from Products.CMFActivity.ActivityTool import Message from Products.CMFActivity.ActivityTool import Message
from zLOG import LOG from zLOG import LOG
...@@ -37,6 +37,11 @@ VALID = 0 ...@@ -37,6 +37,11 @@ VALID = 0
INVALID_PATH = 1 INVALID_PATH = 1
INVALID_ORDER = 2 INVALID_ORDER = 2
# Time global parameters
SECONDS_IN_DAY = 86400.0
MAX_PROCESSING_TIME = 900 / SECONDS_IN_DAY # in fractions of day
VALIDATION_ERROR_DELAY = 120 / SECONDS_IN_DAY # in fractions of day
class Queue: class Queue:
""" """
Step 1: use lists Step 1: use lists
...@@ -142,11 +147,12 @@ class Queue: ...@@ -142,11 +147,12 @@ class Queue:
'Object %s does not exist' % '/'.join(message.object_path)) 'Object %s does not exist' % '/'.join(message.object_path))
return INVALID_PATH return INVALID_PATH
for k, v in kw.items(): for k, v in kw.items():
if activity_tool.validateOrder(k, message, v): if activity_tool.validateOrder(message, k, v):
return INVALID_ORDER return INVALID_ORDER
except: except:
LOG('WARNING ActivityTool', 0, LOG('WARNING ActivityTool', 0,
'Object %s could not be accessed' % '/'.join(message.object_path)) 'Validation of Object %s raised exception' % '/'.join(message.object_path),
error=sys.exc_info())
# Do not try to call methods on objects which cause errors # Do not try to call methods on objects which cause errors
return EXCEPTION return EXCEPTION
return VALID return VALID
...@@ -219,3 +225,10 @@ class Queue: ...@@ -219,3 +225,10 @@ class Queue:
return filter(lambda m: m.is_registered, getattr(activity_buffer, '_%s_message_list' % class_name)) return filter(lambda m: m.is_registered, getattr(activity_buffer, '_%s_message_list' % class_name))
else: else:
return () return ()
# Required for tests (time shift)
def timeShift(self, activity_tool, delay):
"""
delay is provided in fractions of day
"""
pass
\ No newline at end of file
...@@ -27,8 +27,9 @@ ...@@ -27,8 +27,9 @@
############################################################################## ##############################################################################
import random import random
from DateTime import DateTime
from Products.CMFActivity.ActivityTool import registerActivity from Products.CMFActivity.ActivityTool import registerActivity
from Queue import VALID from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY, SECONDS_IN_DAY
from RAMDict import RAMDict from RAMDict import RAMDict
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
...@@ -55,10 +56,11 @@ class SQLDict(RAMDict): ...@@ -55,10 +56,11 @@ class SQLDict(RAMDict):
# Transaction commit methods # Transaction commit methods
def prepareQueueMessage(self, activity_tool, m): def prepareQueueMessage(self, activity_tool, m):
if m.is_registered: if m.is_registered:
activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) , activity_tool.SQLDict_writeMessage( path = '/'.join(m.object_path) ,
method_id = m.method_id, method_id = m.method_id,
priority = m.activity_kw.get('priority', 1), priority = m.activity_kw.get('priority', 1),
message = self.dumpMessage(m)) message = self.dumpMessage(m),
date = m.activity_kw.get('at_date', DateTime()))
# Also store uid of activity # Also store uid of activity
def prepareDeleteMessage(self, activity_tool, m): def prepareDeleteMessage(self, activity_tool, m):
...@@ -104,41 +106,56 @@ class SQLDict(RAMDict): ...@@ -104,41 +106,56 @@ class SQLDict(RAMDict):
# Queue semantic # Queue semantic
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
if hasattr(activity_tool,'SQLDict_readMessageList'): if hasattr(activity_tool,'SQLDict_readMessage'):
now_date = DateTime()
# Sticky processing messages should be set back to non processing
max_processing_date = now_date + MAX_PROCESSING_TIME
# Next processing date in case of error
next_processing_date = now_date + VALIDATION_ERROR_DELAY
priority = random.choice(priority_weight) priority = random.choice(priority_weight)
# Try to find a message at given priority level # Try to find a message at given priority level which is scheduled for now
result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority) result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority, to_date=now_date, to_processing_date = max_processing_date)
if len(result) == 0: if len(result) == 0:
# If empty, take any message # If empty, take any message which is scheduled for now
priority = None priority = None
result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority) result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority, to_date=now_date)
if len(result) > 0: if len(result) > 0:
line = result[0] line = result[0]
path = line.path path = line.path
method_id = line.method_id method_id = line.method_id
uid_list = activity_tool.SQLDict_readUidList( path=path, method_id= method_id, processing_node = None ) uid_list = activity_tool.SQLDict_readUidList( path=path, method_id= method_id, processing_node = None, to_date=now_date )
uid_list = map(lambda x:x.uid, uid_list) uid_list = map(lambda x:x.uid, uid_list)
# Make sure message can not be processed anylonger # Make sure message can not be processed anylonger
if len(uid_list) > 0: if len(uid_list) > 0:
# Set selected messages to processing
activity_tool.SQLDict_processMessage(uid = uid_list) activity_tool.SQLDict_processMessage(uid = uid_list)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
# This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid = line.uid)
# Validate message (make sure object exists, priority OK, etc.) # Validate message (make sure object exists, priority OK, etc.)
if m.validate(self, activity_tool) is not VALID: validation_state = m.validate(self, activity_tool)
if validation_state is not VALID:
if validation_state in (EXCEPTION, INVALID_PATH):
# There is a serious validation error
if line.priority > MAX_PRIORITY: if line.priority > MAX_PRIORITY:
# This is an error # This is an error
if len(uid_list) > 0: # Add some delay here if len(uid_list) > 0:
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE) activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE)
# Assign message back to 'error' state # Assign message back to 'error' state
#m.notifyUser(activity_tool) # Notify Error #m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit get_transaction().commit() # and commit
else: else:
# Lower priority # Lower priority
if len(uid_list) > 0: if len(uid_list) > 0: # Add some delay before new processing
activity_tool.SQLDict_setPriority(uid = uid_list, activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
priority = line.priority + 1) priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
else:
# We do not lower priority for INVALID_ORDER errors but we do postpone execution
if len(uid_list) > 0: # Add some delay before new processing
activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
priority = line.priority)
get_transaction().commit() # Release locks before starting a potentially long calculation
else: else:
# Try to invoke # Try to invoke
activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ? activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
...@@ -163,7 +180,7 @@ class SQLDict(RAMDict): ...@@ -163,7 +180,7 @@ class SQLDict(RAMDict):
else: else:
# Lower priority # Lower priority
if len(uid_list) > 0: if len(uid_list) > 0:
activity_tool.SQLDict_setPriority(uid = uid_list, activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
priority = line.priority + 1) priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
return 0 return 0
...@@ -268,4 +285,30 @@ class SQLDict(RAMDict): ...@@ -268,4 +285,30 @@ class SQLDict(RAMDict):
if processing_node > node_count: if processing_node > node_count:
processing_node = 1 # Round robin processing_node = 1 # Round robin
# Validation private methods
def _validate_after_method_id(self, activity_tool, message, value):
# Count number of occurances of method_id
result = activity_tool.SQLDict_validateMessageList(method_id=value, message_uid=None, path=None)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
def _validate_after_path(self, activity_tool, message, value):
# Count number of occurances of path
result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=None, path=value)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
def _validate_after_message_uid(self, activity_tool, message, value):
# Count number of occurances of message_uid
result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=value, path=None)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
# Required for tests (time shift)
def timeShift(self, activity_tool, delay):
activity_tool.SQLDict_timeShift(delay = delay * SECONDS_IN_DAY)
registerActivity(SQLDict) registerActivity(SQLDict)
...@@ -423,8 +423,16 @@ class ActivityTool (Folder, UniqueObject): ...@@ -423,8 +423,16 @@ class ActivityTool (Folder, UniqueObject):
for activity in activity_list: for activity in activity_list:
method_id = "_validate_%s" % validator_id method_id = "_validate_%s" % validator_id
if hasattr(activity, method_id): if hasattr(activity, method_id):
LOG('CMFActivity: ', 0, 'validateOrder calling method_id %s' % method_id)
if getattr(activity,method_id)(self, message, validation_value): if getattr(activity,method_id)(self, message, validation_value):
return 1 return 1
return 0 return 0
# Required for tests (time shift)
def timeShift(self, delay):
global is_initialized
if not is_initialized: self.initialize()
for activity in activity_list:
activity.timeShift(self, delay)
InitializeClass(ActivityTool) InitializeClass(ActivityTool)
...@@ -15,6 +15,7 @@ CREATE TABLE `message` ( ...@@ -15,6 +15,7 @@ CREATE TABLE `message` (
`method_id` VARCHAR(40), `method_id` VARCHAR(40),
`processing_node` INT DEFAULT -1, `processing_node` INT DEFAULT -1,
`processing` INT DEFAULT 0, `processing` INT DEFAULT 0,
`processing_date` datetime,
`priority` INT DEFAULT 0, `priority` INT DEFAULT 0,
`message` BLOB, `message` BLOB,
PRIMARY KEY (`uid`), PRIMARY KEY (`uid`),
...@@ -23,5 +24,6 @@ CREATE TABLE `message` ( ...@@ -23,5 +24,6 @@ CREATE TABLE `message` (
KEY `method_id` (`method_id`), KEY `method_id` (`method_id`),
KEY `processing_node` (`processing_node`), KEY `processing_node` (`processing_node`),
KEY `processing` (`processing`), KEY `processing` (`processing`),
KEY `processing_date` (`processing_date`),
KEY `priority` (`priority`) KEY `priority` (`priority`)
) TYPE = InnoDB; ) TYPE = InnoDB;
...@@ -9,7 +9,9 @@ class_file: ...@@ -9,7 +9,9 @@ class_file:
</dtml-comment> </dtml-comment>
<params>uid</params> <params>uid</params>
UPDATE message UPDATE message
SET processing=1 SET
processing_date = <dtml-sqlvar "_.DateTime()" type="string">,
processing = 1
WHERE WHERE
<dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> <dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>
OR </dtml-if></dtml-in> OR </dtml-if></dtml-in>
\ No newline at end of file
...@@ -8,13 +8,26 @@ class_name: ...@@ -8,13 +8,26 @@ class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>processing_node <params>processing_node
priority</params> priority
SELECT * FROM to_date
to_processing_date</params>
<dtml-if to_processing_date>UPDATE message
SET
processing = 0
WHERE
processing = 1
AND
processing_date < <dtml-sqlvar to_processing_date type="string">
<dtml-var "'\0'">
</dtml-if>SELECT * FROM
message message
WHERE WHERE
processing <> 1 processing <> 1
<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"></dtml-if> <dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
<dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if> <dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
<dtml-if to_date>AND date <= <dtml-sqlvar to_date type="string"> </dtml-if>
ORDER BY ORDER BY
priority, date priority, date
...@@ -9,11 +9,13 @@ class_file: ...@@ -9,11 +9,13 @@ class_file:
</dtml-comment> </dtml-comment>
<params>processing_node <params>processing_node
method_id method_id
path</params> path
to_date</params>
SELECT uid FROM SELECT uid FROM
message message
WHERE WHERE
processing <> 1 processing <> 1
<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"></dtml-if> <dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
<dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if> <dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if>
<dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if> <dtml-if path>AND path = <dtml-sqlvar path type="string"> </dtml-if>
\ No newline at end of file <dtml-if to_date>AND date <= <dtml-sqlvar to_date type="string"> </dtml-if>
...@@ -8,12 +8,14 @@ class_name: ...@@ -8,12 +8,14 @@ class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>uid:list <params>uid:list
priority</params> priority
date</params>
UPDATE UPDATE
message message
SET SET
priority = <dtml-sqlvar priority type="int">, priority = <dtml-sqlvar priority type="int">,
processing = 0 processing = 0,
date = <dtml-sqlvar date type="string">
WHERE WHERE
<dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> <dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>
OR </dtml-if></dtml-in> OR </dtml-if></dtml-in>
\ No newline at end of file
<dtml-comment>
title:
connection_id:erp5_sql_connection
max_rows:1
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>delay</params>
UPDATE
message
SET
date = date - <dtml-sqlvar delay type="int">,
processing_date = processing_date - <dtml-sqlvar delay type="int">
<dtml-comment>
title:
connection_id:erp5_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>method_id
message_uid
path
</params>
SELECT
COUNT(DISTINCT uid) as uid_count
FROM
message
WHERE
processing_node >= -1
<dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if>
<dtml-if message_uid>AND uid = <dtml-sqlvar message_uid type="int"> </dtml-if>
<dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if>
...@@ -10,11 +10,12 @@ class_file: ...@@ -10,11 +10,12 @@ class_file:
<params>path <params>path
method_id method_id
message message
priority</params> priority
date</params>
INSERT INTO message INSERT INTO message
SET SET
path = <dtml-sqlvar path type="string">, path = <dtml-sqlvar path type="string">,
date = <dtml-sqlvar "_.DateTime()" type="string">, <dtml-if date>date = <dtml-sqlvar date type="string">, <dtml-else>date = <dtml-sqlvar "_.DateTime()" type="string">, </dtml-if>
method_id = <dtml-sqlvar method_id type="string">, method_id = <dtml-sqlvar method_id type="string">,
processing_node = -1, processing_node = -1,
processing = -1, processing = -1,
......
...@@ -8,7 +8,9 @@ class_name: ...@@ -8,7 +8,9 @@ class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>uid</params> <params>uid</params>
UPDATE message_queue UPDATE
SET processing=1 message_queue
SET
processing=1
WHERE WHERE
uid = <dtml-sqlvar uid type="int"> uid = <dtml-sqlvar uid type="int">
...@@ -337,6 +337,7 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -337,6 +337,7 @@ class TestCMFActivity(ERP5TypeTestCase):
self.assertEquals(len(message_list),1) self.assertEquals(len(message_list),1)
portal.portal_activities.distribute() portal.portal_activities.distribute()
portal.portal_activities.tic() portal.portal_activities.tic()
# XXX HERE WE SHOULD USE TIME SHIFT IN ORDER TO SIMULATE MULTIPLE TICS
# Test if there is still the message after it crashed # Test if there is still the message after it crashed
message_list = portal.portal_activities.getMessageList() message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1) self.assertEquals(len(message_list),1)
...@@ -421,6 +422,42 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -421,6 +422,42 @@ class TestCMFActivity(ERP5TypeTestCase):
message_list = portal.portal_activities.getMessageList() message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0) self.assertEquals(len(message_list),0)
def TryMethodAfterMethod(self, activity):
"""
Try several activities
"""
portal = self.getPortal()
def DeferredSetDescription(self,value):
self.setDescription(value)
def DeferredSetTitle(self,value):
self.setTitle(value)
from Products.ERP5Type.Document.Organisation import Organisation
Organisation.DeferredSetTitle = DeferredSetTitle
Organisation.DeferredSetDescription = DeferredSetDescription
organisation = portal.organisation._getOb(self.company_id)
default_title = 'my_test_title'
organisation.setTitle(default_title)
organisation.setDescription(None)
organisation.activate(activity=activity,after_method_id='DeferredSetDescription').DeferredSetTitle(self.title1)
organisation.activate(activity=activity).DeferredSetDescription(self.title1)
get_transaction().commit()
portal.portal_activities.distribute()
portal.portal_activities.tic()
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
self.assertEquals(organisation.getTitle(), default_title) # Title should not be changed the first time
self.assertEquals(organisation.getDescription(),self.title1)
# Now wait some time and test again (this should be simulated by changing dates in SQL Queue)
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
portal.portal_activities.timeShift(2 * VALIDATION_ERROR_DELAY)
portal.portal_activities.tic()
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
self.assertEquals(organisation.getTitle(),self.title1)
self.assertEquals(organisation.getDescription(),self.title1)
def test_01_DeferedSetTitleSQLDict(self, quiet=0, run=run_all_test): def test_01_DeferedSetTitleSQLDict(self, quiet=0, run=run_all_test):
# Test if we can add a complete sales order # Test if we can add a complete sales order
if not run: return if not run: return
...@@ -899,6 +936,16 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -899,6 +936,16 @@ class TestCMFActivity(ERP5TypeTestCase):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.TryActiveProcessInsideActivity('RAMQueue') self.TryActiveProcessInsideActivity('RAMQueue')
def test_54_TryAfterMethodIdWithSQLDict(self, quiet=0, run=run_all_test):
# Test if after_method_id can be used
if not run: return
if not quiet:
message = '\nTry Active Method After Another Activate Method'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryMethodAfterMethod('SQLDict')
if __name__ == '__main__': if __name__ == '__main__':
framework() framework()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment