Commit 7b7da0af authored by Hardik Juneja's avatar Hardik Juneja

CMFActivity: add tests

parent 0b9a7dc5
...@@ -37,6 +37,7 @@ from Products.ERP5Type.Base import Base ...@@ -37,6 +37,7 @@ from Products.ERP5Type.Base import Base
from Products.CMFActivity.Activity.SQLBase import INVOKE_ERROR_STATE from Products.CMFActivity.Activity.SQLBase import INVOKE_ERROR_STATE
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
from Products.CMFActivity.Activity.SQLDict import SQLDict from Products.CMFActivity.Activity.SQLDict import SQLDict
from Products.CMFActivity.Activity.SQLJoblib import SQLJoblib
import Products.CMFActivity.ActivityTool import Products.CMFActivity.ActivityTool
from Products.CMFActivity.Errors import ActivityPendingError, ActivityFlushError from Products.CMFActivity.Errors import ActivityPendingError, ActivityFlushError
from erp5.portal_type import Organisation from erp5.portal_type import Organisation
...@@ -54,6 +55,9 @@ import transaction ...@@ -54,6 +55,9 @@ import transaction
class CommitFailed(Exception): class CommitFailed(Exception):
pass pass
def pickleable_function():
return "result1"
def registerFailingTransactionManager(*args, **kw): def registerFailingTransactionManager(*args, **kw):
from Shared.DC.ZRDB.TM import TM from Shared.DC.ZRDB.TM import TM
class dummy_tm(TM): class dummy_tm(TM):
...@@ -82,7 +86,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -82,7 +86,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
""" """
Return the list of business templates. Return the list of business templates.
""" """
return ('erp5_base',) return ('erp5_base', 'erp5_joblib')
def getCategoriesTool(self): def getCategoriesTool(self):
return getattr(self.getPortal(), 'portal_categories', None) return getattr(self.getPortal(), 'portal_categories', None)
...@@ -402,6 +406,30 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -402,6 +406,30 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
message_list = portal.portal_activities.getMessageList() message_list = portal.portal_activities.getMessageList()
self.assertEqual(len(message_list),0) self.assertEqual(len(message_list),0)
def TryActiveProcessWithResultDictActivated(self, activity):
"""
Try to store the result inside an active process using result list
"""
portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id)
organisation._setTitle(self.title1)
active_process = portal.portal_activities.newActiveProcess()
self.assertEqual(self.title1,organisation.getTitle())
organisation.activate(activity=activity,active_process=active_process)._setTitle(self.title2)
# Needed so that the follow up messages which uses other queues
# are commited into the queue
self.commit()
portal.portal_activities.distribute()
portal.portal_activities.tic()
from Products.CMFActivity.Activity.SQLJoblib import sqljoblib_hash
# Test getResultDict API
result_dict = active_process.getResultDict()
self.assertEqual(result_dict[sqljoblib_hash((self.title2, ))].method_id, '_setTitle')
self.assertEqual(self.title2,organisation.getTitle())
message_list = portal.portal_activities.getMessageList()
self.assertEqual(len(message_list),0)
def TryMethodAfterMethod(self, activity): def TryMethodAfterMethod(self, activity):
""" """
Ensure the order of an execution by a method id Ensure the order of an execution by a method id
...@@ -718,6 +746,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -718,6 +746,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.DeferredSetTitleActivity('SQLQueue') self.DeferredSetTitleActivity('SQLQueue')
def test_03_DeferredSetTitleSQLJoblib(self, quiet=0, run=run_all_test):
# Test if we can add a complete sales order
if not run: return
if not quiet:
message = '\nTest Deferred Set Title SQLJoblib '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.DeferredSetTitleActivity('SQLJoblib')
def test_05_InvokeAndCancelSQLDict(self, quiet=0, run=run_all_test): def test_05_InvokeAndCancelSQLDict(self, quiet=0, run=run_all_test):
# Test if we can add a complete sales order # Test if we can add a complete sales order
if not run: return if not run: return
...@@ -736,6 +773,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -736,6 +773,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.InvokeAndCancelActivity('SQLQueue') self.InvokeAndCancelActivity('SQLQueue')
def test_07_InvokeAndCancelSQLJoblib(self, quiet=0, run=run_all_test):
# Test if we can add a complete sales order
if not run: return
if not quiet:
message = '\nTest Invoke And Cancel SQLJoblib '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.InvokeAndCancelActivity('SQLJoblib')
def test_09_CallOnceWithSQLDict(self, quiet=0, run=run_all_test): def test_09_CallOnceWithSQLDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once # Test if we call methods only once
if not run: return if not run: return
...@@ -754,6 +800,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -754,6 +800,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.CallOnceWithActivity('SQLQueue') self.CallOnceWithActivity('SQLQueue')
def test_11_CallOnceWithSQLJoblib(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nCall Once With SQLJoblib '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.CallOnceWithActivity('SQLJoblib')
def test_13_TryMessageWithErrorOnSQLDict(self, quiet=0, run=run_all_test): def test_13_TryMessageWithErrorOnSQLDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once # Test if we call methods only once
if not run: return if not run: return
...@@ -771,6 +826,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -771,6 +826,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
ZopeTestCase._print(message) ZopeTestCase._print(message)
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.TryMessageWithErrorOnActivity('SQLQueue') self.TryMessageWithErrorOnActivity('SQLQueue')
def test_15_TryMessageWithErrorOnSQLJoblib(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Message With Error On SQL Joblib '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryMessageWithErrorOnActivity('SQLJoblib')
def test_17_TryFlushActivityWithSQLDict(self, quiet=0, run=run_all_test): def test_17_TryFlushActivityWithSQLDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once # Test if we call methods only once
...@@ -790,6 +854,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -790,6 +854,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.TryFlushActivity('SQLQueue') self.TryFlushActivity('SQLQueue')
def test_19_TryFlushActivityWithSQLJoblib(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Flush Activity With SQL Joblib '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryFlushActivity('SQLJoblib')
def test_21_TryActivateInsideFlushWithSQLDict(self, quiet=0, run=run_all_test): def test_21_TryActivateInsideFlushWithSQLDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once # Test if we call methods only once
if not run: return if not run: return
...@@ -808,6 +881,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -808,6 +881,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.TryActivateInsideFlush('SQLQueue') self.TryActivateInsideFlush('SQLQueue')
def test_23_TryActivateInsideFlushWithSQLQueue(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Activate Inside Flush With SQL Joblib '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActivateInsideFlush('SQLJoblib')
def test_25_TryTwoMethodsWithSQLDict(self, quiet=0, run=run_all_test): def test_25_TryTwoMethodsWithSQLDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once # Test if we call methods only once
if not run: return if not run: return
...@@ -826,6 +908,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -826,6 +908,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.TryTwoMethods('SQLQueue') self.TryTwoMethods('SQLQueue')
def test_27_TryTwoMethodsWithSQLJoblib(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Two Methods With SQL Joblib '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryTwoMethods('SQLJoblib')
def test_29_TryTwoMethodsAndFlushThemWithSQLDict(self, quiet=0, run=run_all_test): def test_29_TryTwoMethodsAndFlushThemWithSQLDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once # Test if we call methods only once
if not run: return if not run: return
...@@ -844,6 +935,16 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -844,6 +935,16 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.TryTwoMethodsAndFlushThem('SQLQueue') self.TryTwoMethodsAndFlushThem('SQLQueue')
def test_31_TryTwoMethodsAndFlushThemWithSQLJoblib(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Two Methods And Flush Them With SQL Joblib '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryTwoMethodsAndFlushThem('SQLJoblib')
def test_33_TryActivateFlushActivateTicWithSQLDict(self, quiet=0, run=run_all_test): def test_33_TryActivateFlushActivateTicWithSQLDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once # Test if we call methods only once
if not run: return if not run: return
...@@ -908,6 +1009,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -908,6 +1009,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.DeferredSetTitleWithRenamedObject('SQLQueue') self.DeferredSetTitleWithRenamedObject('SQLQueue')
def test_44_TryRenameObjectWithSQLJoblib(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Rename Object With SQL Joblib '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.DeferredSetTitleWithRenamedObject('SQLJoblib')
def test_46_TryActiveProcessWithSQLDict(self, quiet=0, run=run_all_test): def test_46_TryActiveProcessWithSQLDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once # Test if we call methods only once
if not run: return if not run: return
...@@ -926,6 +1036,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -926,6 +1036,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.TryActiveProcess('SQLQueue') self.TryActiveProcess('SQLQueue')
def test_47_TryActiveProcessWithSQLJoblib(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Active Process With SQL Joblib '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActiveProcessWithResultDictActivated('SQLJoblib')
def test_54_TryAfterMethodIdWithSQLDict(self, quiet=0, run=run_all_test): def test_54_TryAfterMethodIdWithSQLDict(self, quiet=0, run=run_all_test):
# Test if after_method_id can be used # Test if after_method_id can be used
if not run: return if not run: return
...@@ -944,7 +1063,16 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -944,7 +1063,16 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.TryMethodAfterMethod('SQLQueue') self.TryMethodAfterMethod('SQLQueue')
def test_56_TryCallActivityWithRightUser(self, quiet=0, run=run_all_test): def test_56_TryAfterMethodIdWithSQLJoblib(self, quiet=0, run=run_all_test):
# Test if after_method_id can be used
if not run: return
if not quiet:
message = '\nTry Active Method After Another Activate Method With SQLJoblib'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryMethodAfterMethod('SQLJoblib')
def test_57_TryCallActivityWithRightUser(self, quiet=0, run=run_all_test):
# Test if me execute methods with the right user # Test if me execute methods with the right user
# This should be independant of the activity used # This should be independant of the activity used
if not run: return if not run: return
...@@ -990,7 +1118,16 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -990,7 +1118,16 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.TryAfterTag('SQLQueue') self.TryAfterTag('SQLQueue')
def test_61_CheckSchedulingWithSQLDict(self, quiet=0, run=run_all_test): def test_61_TryAfterTagWithSQLJoblib(self, quiet=0, run=run_all_test):
# Test if after_tag can be used
if not run: return
if not quiet:
message = '\nTry After Tag With SQL Joblib'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryAfterTag('SQLJoblib')
def test_62_CheckSchedulingWithSQLDict(self, quiet=0, run=run_all_test):
# Test if scheduling is correct with SQLDict # Test if scheduling is correct with SQLDict
if not run: return if not run: return
if not quiet: if not quiet:
...@@ -999,7 +1136,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -999,7 +1136,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.CheckScheduling('SQLDict') self.CheckScheduling('SQLDict')
def test_62_CheckSchedulingWithSQLQueue(self, quiet=0, run=run_all_test): def test_63_CheckSchedulingWithSQLQueue(self, quiet=0, run=run_all_test):
# Test if scheduling is correct with SQLQueue # Test if scheduling is correct with SQLQueue
if not run: return if not run: return
if not quiet: if not quiet:
...@@ -1008,7 +1145,16 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1008,7 +1145,16 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.CheckScheduling('SQLQueue') self.CheckScheduling('SQLQueue')
def test_61_CheckSchedulingAfterTagListWithSQLDict(self, quiet=0, run=run_all_test): def test_64_CheckSchedulingWithSQLJoblib(self, quiet=0, run=run_all_test):
# Test if scheduling is correct with SQLQueue
if not run: return
if not quiet:
message = '\nCheck Scheduling With SQL Joblib'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.CheckScheduling('SQLJoblib')
def test_65_CheckSchedulingAfterTagListWithSQLDict(self, quiet=0, run=run_all_test):
# Test if scheduling is correct with SQLDict # Test if scheduling is correct with SQLDict
if not run: return if not run: return
if not quiet: if not quiet:
...@@ -1017,7 +1163,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1017,7 +1163,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.CheckSchedulingAfterTagList('SQLDict') self.CheckSchedulingAfterTagList('SQLDict')
def test_62_CheckSchedulingWithAfterTagListSQLQueue(self, quiet=0, run=run_all_test): def test_66_CheckSchedulingWithAfterTagListSQLQueue(self, quiet=0, run=run_all_test):
# Test if scheduling is correct with SQLQueue # Test if scheduling is correct with SQLQueue
if not run: return if not run: return
if not quiet: if not quiet:
...@@ -1026,6 +1172,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1026,6 +1172,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.CheckSchedulingAfterTagList('SQLQueue') self.CheckSchedulingAfterTagList('SQLQueue')
def test_66_CheckSchedulingWithAfterTagListSQLJoblib(self, quiet=0, run=run_all_test):
# Test if scheduling is correct with SQLQueue
if not run: return
if not quiet:
message = '\nCheck Scheduling After Tag List With SQL Joblib'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.CheckSchedulingAfterTagList('SQLJoblib')
def flushAllActivities(self, silent=0, loop_size=1000): def flushAllActivities(self, silent=0, loop_size=1000):
"""Executes all messages until the queue only contains failed """Executes all messages until the queue only contains failed
messages. messages.
...@@ -1047,7 +1202,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1047,7 +1202,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
if not silent: if not silent:
self.fail('flushAllActivities maximum loop count reached') self.fail('flushAllActivities maximum loop count reached')
def test_65_TestMessageValidationAndFailedActivities(self, def test_66_TestMessageValidationAndFailedActivities(self,
quiet=0, run=run_all_test): quiet=0, run=run_all_test):
"""after_method_id and failed activities. """after_method_id and failed activities.
...@@ -1069,13 +1224,12 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1069,13 +1224,12 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
obj = self.getPortal().organisation_module.newContent( obj = self.getPortal().organisation_module.newContent(
portal_type='Organisation', portal_type='Organisation',
title=original_title) title=original_title)
# Monkey patch Organisation to add a failing method # Monkey patch Organisation to add a failing method
def failingMethod(self): def failingMethod(self):
raise ValueError, 'This method always fail' raise ValueError, 'This method always fail'
Organisation.failingMethod = failingMethod Organisation.failingMethod = failingMethod
activity_list = ['SQLQueue', 'SQLDict', ] activity_list = ['SQLQueue', 'SQLDict', 'SQLJoblib']
for activity in activity_list: for activity in activity_list:
# reset # reset
activity_tool.manageClearActivities() activity_tool.manageClearActivities()
...@@ -1090,7 +1244,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1090,7 +1244,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
full_message_list = activity_tool.getMessageList() full_message_list = activity_tool.getMessageList()
remaining_messages = [a for a in full_message_list if a.method_id != remaining_messages = [a for a in full_message_list if a.method_id !=
'failingMethod'] 'failingMethod']
if len(full_message_list) != 2: if len(full_message_list) != 3:
self.fail('failingMethod should not have been flushed') self.fail('failingMethod should not have been flushed')
if len(remaining_messages) != 0: if len(remaining_messages) != 0:
self.fail('Activity tool should have no other remaining messages') self.fail('Activity tool should have no other remaining messages')
...@@ -1104,7 +1258,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1104,7 +1258,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
full_message_list = activity_tool.getMessageList() full_message_list = activity_tool.getMessageList()
remaining_messages = [a for a in full_message_list if a.method_id != remaining_messages = [a for a in full_message_list if a.method_id !=
'failingMethod'] 'failingMethod']
if len(full_message_list) != 3: if len(full_message_list) != 4:
self.fail('failingMethod should not have been flushed') self.fail('failingMethod should not have been flushed')
if len(remaining_messages) != 1: if len(remaining_messages) != 1:
self.fail('Activity tool should have one blocked setTitle activity') self.fail('Activity tool should have one blocked setTitle activity')
...@@ -1112,7 +1266,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1112,7 +1266,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
['failingMethod']) ['failingMethod'])
self.assertEqual(obj.getTitle(), original_title) self.assertEqual(obj.getTitle(), original_title)
def test_66_TestCountMessageWithTagWithSQLDict(self, quiet=0, run=run_all_test): def test_67_TestCountMessageWithTagWithSQLDict(self, quiet=0, run=run_all_test):
""" """
Test new countMessageWithTag function with SQLDict. Test new countMessageWithTag function with SQLDict.
""" """
...@@ -1123,7 +1277,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1123,7 +1277,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ', 0, message) LOG('Testing... ', 0, message)
self.CheckCountMessageWithTag('SQLDict') self.CheckCountMessageWithTag('SQLDict')
def test_67_TestCancelFailedActiveObject(self, quiet=0, run=run_all_test): def test_68_TestCancelFailedActiveObject(self, quiet=0, run=run_all_test):
"""Cancel an active object to make sure that it does not refer to """Cancel an active object to make sure that it does not refer to
a persistent object. a persistent object.
...@@ -1174,7 +1328,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1174,7 +1328,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.commit() self.commit()
self.assertEqual(len(activity_tool.getMessageList()), 0) self.assertEqual(len(activity_tool.getMessageList()), 0)
def test_68_RetryMessageExecution(self, quiet=0): def test_69_RetryMessageExecution(self, quiet=0):
if not quiet: if not quiet:
message = '\nCheck number of executions of failing activities' message = '\nCheck number of executions of failing activities'
ZopeTestCase._print(message) ZopeTestCase._print(message)
...@@ -1194,7 +1348,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1194,7 +1348,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
raise ConflictError if conflict else Exception raise ConflictError if conflict else Exception
def check(retry_list, **activate_kw): def check(retry_list, **activate_kw):
fail = retry_list[-1][0] is not None and 1 or 0 fail = retry_list[-1][0] is not None and 1 or 0
for activity in 'SQLDict', 'SQLQueue': for activity in 'SQLDict', 'SQLQueue', 'SQLJoblib':
exec_count[0] = 0 exec_count[0] = 0
activity_tool.activate(activity=activity, priority=priority(1,6), activity_tool.activate(activity=activity, priority=priority(1,6),
**activate_kw).doSomething(retry_list) **activate_kw).doSomething(retry_list)
...@@ -1257,6 +1411,28 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1257,6 +1411,28 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ', 0, message) LOG('Testing... ', 0, message)
self.TryConflictErrorsWhileValidating('SQLQueue') self.TryConflictErrorsWhileValidating('SQLQueue')
def test_72_TestConflictErrorsWhileValidatingWithSQLJoblib(self, quiet=0, run=run_all_test):
"""
Test if conflict errors spoil out active objects with SQLJoblib.
"""
if not run: return
if not quiet:
message = '\nTest Conflict Errors While Validating With SQLJoblib'
ZopeTestCase._print(message)
LOG('Testing... ', 0, message)
self.TryConflictErrorsWhileValidating('SQLJoblib')
def test_71_TestConflictErrorsWhileValidatingWithSQLJoblib(self, quiet=0, run=run_all_test):
"""
Test if conflict errors spoil out active objects with SQLJoblib.
"""
if not run: return
if not quiet:
message = '\nTest Conflict Errors While Validating With SQLJoblib'
ZopeTestCase._print(message)
LOG('Testing... ', 0, message)
self.TryConflictErrorsWhileValidating('SQLJoblib')
def test_72_TestErrorsWhileFinishingCommitDBWithSQLDict(self, quiet=0, run=run_all_test): def test_72_TestErrorsWhileFinishingCommitDBWithSQLDict(self, quiet=0, run=run_all_test):
""" """
""" """
...@@ -1439,7 +1615,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1439,7 +1615,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
message_list = portal.portal_activities.getMessageList() message_list = portal.portal_activities.getMessageList()
self.assertEqual(len(message_list),5) self.assertEqual(len(message_list),5)
portal.portal_activities.tic() portal.portal_activities.tic()
expected = dict(SQLDict=1, SQLQueue=5)[activity] expected = dict(SQLDict=1, SQLQueue=5, SQLJoblib=1)[activity]
self.assertEqual(expected, organisation.getFoobar()) self.assertEqual(expected, organisation.getFoobar())
...@@ -1470,9 +1646,9 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1470,9 +1646,9 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
message_list = portal.portal_activities.getMessageList() message_list = portal.portal_activities.getMessageList()
self.assertEqual(len(message_list),20) self.assertEqual(len(message_list),20)
portal.portal_activities.tic() portal.portal_activities.tic()
self.assertEqual(dict(SQLDict=11, SQLQueue=60)[activity], self.assertEqual(dict(SQLDict=11, SQLQueue=60, SQLJoblib=11)[activity],
organisation.getFoobar()) organisation.getFoobar())
self.assertEqual(dict(SQLDict=[1, 1, 1], SQLQueue=[5, 5, 10])[activity], self.assertEqual(dict(SQLDict=[1, 1, 1], SQLQueue=[5, 5, 10], SQLJoblib=[1,1,1])[activity],
sorted(foobar_list)) sorted(foobar_list))
message_list = portal.portal_activities.getMessageList() message_list = portal.portal_activities.getMessageList()
self.assertEqual(len(message_list), 0) self.assertEqual(len(message_list), 0)
...@@ -1490,6 +1666,13 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1490,6 +1666,13 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
""" """
self.callWithGroupIdParamater('SQLQueue', quiet=quiet, run=run) self.callWithGroupIdParamater('SQLQueue', quiet=quiet, run=run)
def test_80b_CallWithGroupIdParamaterSQLJoblib(self, quiet=0,
run=run_all_test):
"""
Test that group_id parameter is used to separate execution of the same method
"""
self.callWithGroupIdParamater('SQLJoblib', quiet=quiet, run=run)
def test_81_ActivateKwForWorkflowTransition(self, quiet=0, run=run_all_test): def test_81_ActivateKwForWorkflowTransition(self, quiet=0, run=run_all_test):
""" """
Test call of a workflow transition with activate_kw parameter propagate them Test call of a workflow transition with activate_kw parameter propagate them
...@@ -1640,17 +1823,17 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1640,17 +1823,17 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
finally: finally:
del Organisation.failingMethod del Organisation.failingMethod
def test_90_userNotificationOnActivityFailureWithSQLDict(self, quiet=0, run=run_all_test): def test_90_userNotificationOnActivityFailureWithSQLJoblib(self, quiet=0, run=run_all_test):
""" """
Check that a user notification method is called on message when activity Check that a user notification method is called on message when activity
fails and will not be tried again. fails and will not be tried again.
""" """
if not run: return if not run: return
if not quiet: if not quiet:
message = '\nCheck user notification sent on activity final error (SQLDict)' message = '\nCheck user notification sent on activity final error (SQLJoblib)'
ZopeTestCase._print(message) ZopeTestCase._print(message)
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.TryUserNotificationOnActivityFailure('SQLDict') self.TryUserNotificationOnActivityFailure('SQLJoblib')
def test_91_userNotificationOnActivityFailureWithSQLQueue(self, quiet=0, run=run_all_test): def test_91_userNotificationOnActivityFailureWithSQLQueue(self, quiet=0, run=run_all_test):
""" """
...@@ -1707,6 +1890,17 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1707,6 +1890,17 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.TryUserNotificationRaise('SQLQueue') self.TryUserNotificationRaise('SQLQueue')
def test_94_userNotificationRaiseWithSQLJoblib(self, quiet=0, run=run_all_test):
"""
Check that activities are not left with processing=1 when notifyUser raises.
"""
if not run: return
if not quiet:
message = '\nCheck that activities are not left with processing=1 when notifyUser raises (SQLJoblib)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryUserNotificationRaise('SQLJoblib')
def TryActivityRaiseInCommitDoesNotStallActivityConection(self, activity): def TryActivityRaiseInCommitDoesNotStallActivityConection(self, activity):
""" """
Check that an activity which commit raises (as would a regular conflict Check that an activity which commit raises (as would a regular conflict
...@@ -1817,7 +2011,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1817,7 +2011,15 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.TryChangeSkinInActivity('SQLQueue') self.TryChangeSkinInActivity('SQLQueue')
def test_102_1_CheckSQLDictDoesNotDeleteSimilaritiesBeforeExecution(self, quiet=0, run=run_all_test): def test_102_TryChangeSkinInActivitySQLJoblib(self, quiet=0, run=run_all_test):
if not run: return
if not quiet:
message = '\nTry ChangeSkin In Activity (SQLJoblib)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryChangeSkinInActivity('SQLJoblib')
def test_103_1_CheckSQLDictDoesNotDeleteSimilaritiesBeforeExecution(self, quiet=0, run=run_all_test):
""" """
Test that SQLDict does not delete similar messages which have the same Test that SQLDict does not delete similar messages which have the same
method_id and path but a different tag before execution. method_id and path but a different tag before execution.
...@@ -1849,7 +2051,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1849,7 +2051,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
finally: finally:
del activity_tool.__class__.doSomething del activity_tool.__class__.doSomething
def test_102_2_CheckSQLDictDeleteDuplicatesBeforeExecution(self, quiet=0, run=run_all_test): def test_103_2_CheckSQLDictDeleteDuplicatesBeforeExecution(self, quiet=0, run=run_all_test):
""" """
Test that SQLDict delete the same messages before execution if messages Test that SQLDict delete the same messages before execution if messages
has the same method_id and path and tag. has the same method_id and path and tag.
...@@ -1884,7 +2086,42 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1884,7 +2086,42 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
finally: finally:
del activity_tool.__class__.doSomething del activity_tool.__class__.doSomething
def test_102_3_CheckSQLDictDistributeWithSerializationTagAndGroupMethodId( def test_103_3_CheckSQLJoblibDeleteDuplicatesBeforeExecution(self, quiet=0, run=run_all_test):
"""
Test that SQLJoblib delete the same messages before execution if messages
has the same method_id and path and tag and signature.
"""
if not run: return
if not quiet:
message = '\nCheck duplicates are deleted before execution of original message (SQLJoblib)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
activity_tool = self.getActivityTool()
marker = []
def doSomething(self, other_tag):
marker.append(self.countMessage(tag=other_tag))
activity_tool.__class__.doSomething = doSomething
try:
# Adds two same activities.
activity_tool.activate(activity='SQLJoblib', after_tag='foo', priority=2,
tag='a', signature=555666).doSomething(other_tag='a')
self.commit()
uid1, = [x.uid for x in activity_tool.getMessageList()]
activity_tool.activate(activity='SQLJoblib', after_tag='bar', priority=1,
tag='a', signature=555666).doSomething(other_tag='a')
self.commit()
self.assertEqual(len(activity_tool.getMessageList()), 2)
activity_tool.distribute()
# After distribute, duplicate is deleted.
uid2, = [x.uid for x in activity_tool.getMessageList()]
self.assertNotEqual(uid1, uid2)
activity_tool.tic()
self.assertEqual(len(activity_tool.getMessageList()), 0)
self.assertEqual(marker, [1])
finally:
del activity_tool.__class__.doSomething
def test_103_4_CheckSQLDictDistributeWithSerializationTagAndGroupMethodId(
self, quiet=0): self, quiet=0):
""" """
Distribuation was at some point buggy with this scenario when there was Distribuation was at some point buggy with this scenario when there was
...@@ -1908,7 +2145,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1908,7 +2145,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.tic() self.tic()
self.assertEqual(len(activity_tool.getMessageList()), 0) self.assertEqual(len(activity_tool.getMessageList()), 0)
def test_103_interQueuePriorities(self, quiet=0, run=run_all_test): def test_104_interQueuePriorities(self, quiet=0, run=run_all_test):
""" """
Important note: there is no way to really reliably check that this Important note: there is no way to really reliably check that this
feature is correctly implemented, as activity execution order is feature is correctly implemented, as activity execution order is
...@@ -2004,6 +2241,14 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2004,6 +2241,14 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.CheckActivityRuntimeEnvironment('SQLQueue') self.CheckActivityRuntimeEnvironment('SQLQueue')
def test_105_activityRuntimeEnvironmentSQLJoblib(self, quiet=0, run=run_all_test):
if not run: return
if not quiet:
message = '\nCheck ActivityRuntimeEnvironment (SQLJoblib)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.CheckActivityRuntimeEnvironment('SQLJoblib')
def CheckSerializationTag(self, activity): def CheckSerializationTag(self, activity):
organisation = self.getPortal().organisation_module.newContent(portal_type='Organisation') organisation = self.getPortal().organisation_module.newContent(portal_type='Organisation')
self.tic() self.tic()
...@@ -2323,7 +2568,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2323,7 +2568,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertFalse(active_process.hasActivity()) self.assertFalse(active_process.hasActivity())
def test(obj, **kw): def test(obj, **kw):
for activity in ('SQLDict', 'SQLQueue'): for activity in ('SQLDict', 'SQLQueue', 'SQLJoblib'):
active_object.activate(activity=activity, **kw).getTitle() active_object.activate(activity=activity, **kw).getTitle()
self.commit() self.commit()
self.assertTrue(obj.hasActivity(), activity) self.assertTrue(obj.hasActivity(), activity)
...@@ -2371,6 +2616,9 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2371,6 +2616,9 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
def test_hasErrorActivity_error_SQLDict(self): def test_hasErrorActivity_error_SQLDict(self):
self._test_hasErrorActivity_error('SQLDict') self._test_hasErrorActivity_error('SQLDict')
def test_hasErrorActivity_error_SQLJoblib(self):
self._test_hasErrorActivity_error('SQLJoblib')
def _test_hasErrorActivity(self, activity): def _test_hasErrorActivity(self, activity):
active_object = self.portal.organisation_module.newContent( active_object = self.portal.organisation_module.newContent(
portal_type='Organisation') portal_type='Organisation')
...@@ -2403,6 +2651,9 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2403,6 +2651,9 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
def test_hasErrorActivity_SQLDict(self): def test_hasErrorActivity_SQLDict(self):
self._test_hasErrorActivity('SQLDict') self._test_hasErrorActivity('SQLDict')
def test_hasErrorActivity_SQLJoblib(self):
self._test_hasErrorActivity('SQLJoblib')
def test_active_object_hasActivity_does_not_catch_exceptions(self): def test_active_object_hasActivity_does_not_catch_exceptions(self):
""" """
Some time ago, hasActivity was doing a silent try/except, and this was Some time ago, hasActivity was doing a silent try/except, and this was
...@@ -2442,7 +2693,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2442,7 +2693,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.__call_count += 1 self.__call_count += 1
o = self.portal.organisation_module.newContent(portal_type='Organisation') o = self.portal.organisation_module.newContent(portal_type='Organisation')
for activity in "SQLDict", "SQLQueue": for activity in "SQLDict", "SQLQueue", "SQLJoblib":
self.__call_count = 0 self.__call_count = 0
try: try:
for i in xrange(10): for i in xrange(10):
...@@ -2538,7 +2789,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2538,7 +2789,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool = self.portal.portal_activities activity_tool = self.portal.portal_activities
activity_tool.__class__.doSomething = processed.append activity_tool.__class__.doSomething = processed.append
try: try:
for activity in 'SQLDict', 'SQLQueue': for activity in 'SQLDict', 'SQLQueue', 'SQLJoblib':
activity_tool.activate(activity=activity).doSomething(activity) activity_tool.activate(activity=activity).doSomething(activity)
self.commit() self.commit()
# Make first commit in dequeueMessage raise # Make first commit in dequeueMessage raise
...@@ -2622,6 +2873,17 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2622,6 +2873,17 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
del Organisation.failingMethod del Organisation.failingMethod
self._ignore_log_errors() self._ignore_log_errors()
def test_118_userNotificationSavedOnEventLogWhenNotifyUserRaisesWithSQLJoblib(self, quiet=0, run=run_all_test):
"""
Check the error is saved on event log even if the mail notification is not sent.
"""
if not run: return
if not quiet:
message = '\nCheck the error is saved on event log even if the mail notification is not sent (SQLJoblib)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryNotificationSavedOnEventLogWhenNotifyUserRaises('SQLJoblib')
def test_118_userNotificationSavedOnEventLogWhenNotifyUserRaisesWithSQLDict(self, quiet=0, run=run_all_test): def test_118_userNotificationSavedOnEventLogWhenNotifyUserRaisesWithSQLDict(self, quiet=0, run=run_all_test):
""" """
Check the error is saved on event log even if the mail notification is not sent. Check the error is saved on event log even if the mail notification is not sent.
...@@ -2692,6 +2954,14 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2692,6 +2954,14 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.TryUserMessageContainingNoTracebackIsStillSent('SQLDict') self.TryUserMessageContainingNoTracebackIsStillSent('SQLDict')
def test_121_sendMessageWithNoTracebackWithSQLJoblib(self, quiet=0, run=run_all_test):
if not run: return
if not quiet:
message = '\nCheck that message with no traceback is still sent (SQLJoblib)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryUserMessageContainingNoTracebackIsStillSent('SQLJoblib')
def TryNotificationSavedOnEventLogWhenSiteErrorLoggerRaises(self, activity): def TryNotificationSavedOnEventLogWhenSiteErrorLoggerRaises(self, activity):
# Make sure that no active object is installed. # Make sure that no active object is installed.
activity_tool = self.getPortal().portal_activities activity_tool = self.getPortal().portal_activities
...@@ -2735,6 +3005,14 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2735,6 +3005,14 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
del Organisation.failingMethod del Organisation.failingMethod
self._ignore_log_errors() self._ignore_log_errors()
def test_122_userNotificationSavedOnEventLogWhenSiteErrorLoggerRaisesWithSQLJoblib(self, quiet=0, run=run_all_test):
if not run: return
if not quiet:
message = '\nCheck that message not saved in site error logger is not lost'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryNotificationSavedOnEventLogWhenSiteErrorLoggerRaises('SQLJoblib')
def test_122_userNotificationSavedOnEventLogWhenSiteErrorLoggerRaisesWithSQLDict(self, quiet=0, run=run_all_test): def test_122_userNotificationSavedOnEventLogWhenSiteErrorLoggerRaisesWithSQLDict(self, quiet=0, run=run_all_test):
if not run: return if not run: return
if not quiet: if not quiet:
...@@ -2797,7 +3075,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2797,7 +3075,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
group_method_call_list.append(r) group_method_call_list.append(r)
activity_tool.__class__.doSomething = doSomething activity_tool.__class__.doSomething = doSomething
try: try:
for activity in 'SQLDict', 'SQLQueue': for activity in 'SQLDict', 'SQLQueue', 'SQLJoblib':
activity_kw = dict(activity=activity, serialization_tag=self.id(), activity_kw = dict(activity=activity, serialization_tag=self.id(),
group_method_id='portal_activities/doSomething') group_method_id='portal_activities/doSomething')
obj1.activate(**activity_kw).dummy(1, x=None) obj1.activate(**activity_kw).dummy(1, x=None)
...@@ -2820,7 +3098,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2820,7 +3098,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool.tic() activity_tool.tic()
self.assertEqual(group_method_call_list.pop(), self.assertEqual(group_method_call_list.pop(),
dict(SQLDict=[message2], dict(SQLDict=[message2],
SQLQueue=[message1, message2])[activity]) SQLQueue=[message1, message2],
SQLJoblib=[message2])[activity])
self.assertFalse(group_method_call_list) self.assertFalse(group_method_call_list)
self.assertFalse(activity_tool.getMessageList()) self.assertFalse(activity_tool.getMessageList())
finally: finally:
...@@ -2931,7 +3210,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2931,7 +3210,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
transaction.get().addBeforeCommitHook(_raise, (error,)) transaction.get().addBeforeCommitHook(_raise, (error,))
obj.__class__.doSomething = doSomething obj.__class__.doSomething = doSomething
try: try:
for activity in 'SQLDict', 'SQLQueue': for activity in 'SQLDict', 'SQLQueue', 'SQLJoblib':
for conflict_error in False, True: for conflict_error in False, True:
weakref_list = [] weakref_list = []
obj.activity_count = obj.on_error_count = 0 obj.activity_count = obj.on_error_count = 0
...@@ -3062,7 +3341,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -3062,7 +3341,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
kw = {} kw = {}
self._catch_log_errors(subsystem='CMFActivity') self._catch_log_errors(subsystem='CMFActivity')
try: try:
for kw['activity'] in 'SQLDict', 'SQLQueue': for kw['activity'] in 'SQLDict', 'SQLQueue', 'SQLJoblib':
for kw['group_method_id'] in '', None: for kw['group_method_id'] in '', None:
obj = activity_tool.newActiveProcess() obj = activity_tool.newActiveProcess()
self.tic() self.tic()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment