Commit 14de76a1 authored by Jérome Perrin's avatar Jérome Perrin

Added a test to make sure method are called even if they are activated

after other methods using after_method_id and those other method fails.



git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@7137 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent f269939d
...@@ -36,7 +36,7 @@ from random import randint ...@@ -36,7 +36,7 @@ from random import randint
import os, sys import os, sys
if __name__ == '__main__': if __name__ == '__main__':
execfile(os.path.join(sys.path[0], 'framework.py')) execfile(os.path.join(sys.path[0], 'framework.py'))
# Needed in order to have a log file inside the current folder # Needed in order to have a log file inside the current folder
os.environ['EVENT_LOG_FILE'] = os.path.join(os.getcwd(), 'zLOG.log') os.environ['EVENT_LOG_FILE'] = os.path.join(os.getcwd(), 'zLOG.log')
...@@ -44,6 +44,10 @@ os.environ['EVENT_LOG_SEVERITY'] = '-300' ...@@ -44,6 +44,10 @@ os.environ['EVENT_LOG_SEVERITY'] = '-300'
from Testing import ZopeTestCase from Testing import ZopeTestCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE,\
VALIDATE_ERROR_STATE
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
from Products.ERP5Type.Document.Organisation import Organisation
from AccessControl.SecurityManagement import newSecurityManager, noSecurityManager from AccessControl.SecurityManagement import newSecurityManager, noSecurityManager
from DateTime import DateTime from DateTime import DateTime
from Acquisition import aq_base, aq_inner from Acquisition import aq_base, aq_inner
...@@ -186,7 +190,6 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -186,7 +190,6 @@ class TestCMFActivity(ERP5TypeTestCase):
self.foobar = 1 self.foobar = 1
def getFoobar(self): def getFoobar(self):
return (getattr(self,'foobar',0)) return (getattr(self,'foobar',0))
from Products.ERP5Type.Document.Organisation import Organisation
organisation = portal.organisation._getOb(self.company_id) organisation = portal.organisation._getOb(self.company_id)
Organisation.setFoobar = setFoobar Organisation.setFoobar = setFoobar
Organisation.getFoobar = getFoobar Organisation.getFoobar = getFoobar
...@@ -1219,6 +1222,89 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -1219,6 +1222,89 @@ class TestCMFActivity(ERP5TypeTestCase):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.CheckClearActivities('SQLQueue') self.CheckClearActivities('SQLQueue')
def flushAllActivities(self):
"""Executes all messages until the queue only contains failed
messages.
"""
activity_tool = self.getPortal().portal_activities
loop_count=0
# flush activities
while 1:
loop_count += 1
if loop_count >= 1000:
self.fail('flushAllActivities maximum loop count reached')
activity_tool.distribute(node_count=1)
activity_tool.tic(processing_node=1)
finished = 1
for message in activity_tool.getMessageList():
if message.processing_node not in (INVOKE_ERROR_STATE,
VALIDATE_ERROR_STATE):
finished = 0
activity_tool.timeShift(3 * VALIDATION_ERROR_DELAY)
get_transaction().commit()
if finished:
return
def test_65_TestMessageValidationAndFailedActivities(self,
quiet=0, run=run_all_test):
"""after_method_id and failed activities.
Tests that if we have an active method scheduled by
after_method_id and a failed activity with this method id, the
method is executed."""
if not run: return
if not quiet:
message = '\nafter_method_id and failed activities'
ZopeTestCase._print(message)
LOG('Testing... ', 0, message)
activity_tool = self.getPortal().portal_activities
obj = self.getPortal().organisation_module.newContent(
portal_type='Organisation')
# Monkey patch Organisation to add a failing method
def failingMethod(self):
raise ValueError, 'This method always fail'
Organisation.failingMethod = failingMethod
# Monkey patch Message not to send failure notification emails
from Products.CMFActivity.ActivityTool import Message
originalNotifyUser = Message.notifyUser
def notifyUserSilent(self, activity_tool, message=''):
pass
Message.notifyUser = notifyUserSilent
activity_list = ['SQLQueue', 'SQLDict', ]
for activity in activity_list:
# reset
activity_tool.manageClearActivities()
obj.setTitle('something')
get_transaction().commit()
# activate failing message and flush
for fail_activity in activity_list:
obj.activate(activity = fail_activity).failingMethod()
get_transaction().commit()
self.flushAllActivities()
message_count = len(activity_tool.getMessageList())
if message_count == 0:
self.fail('Activity tool should have remaining messages')
# activate our message
new_title = 'nothing'
obj.activate(after_method_id = ['failingMethod'],
activity = activity ).setTitle(new_title)
get_transaction().commit()
self.flushAllActivities()
self.assertEquals(message_count,
len(activity_tool.getMessageList()))
self.assertEquals(obj.getTitle(), new_title)
# restore notification
Message.notifyUser = originalNotifyUser
if __name__ == '__main__': if __name__ == '__main__':
framework() framework()
else: else:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment