From 8aa2d4f42198d80ea524cdc9bff0c89af6a24169 Mon Sep 17 00:00:00 2001 From: Sebastien Robin <seb@nexedi.com> Date: Mon, 26 Apr 2004 13:59:59 +0000 Subject: [PATCH] cleaned CMFActivity unit test and added two more test git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@736 20353a03-c40f-0410-a6d1-a30d3c3de9de --- product/CMFActivity/tests/testCMFActivity.py | 330 ++++++------------- 1 file changed, 105 insertions(+), 225 deletions(-) diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py index d1a0d6480b..a81498ed7d 100755 --- a/product/CMFActivity/tests/testCMFActivity.py +++ b/product/CMFActivity/tests/testCMFActivity.py @@ -113,40 +113,40 @@ class TestCMFActivity(ERP5TypeTestCase): user = uf.getUserById('seb').__of__(uf) newSecurityManager(None, user) - def test_01_DeferedSetTitleSQLDict(self, quiet=0, run=run_all_test): - # Test if we can add a complete sales order - if not run: return - if not quiet: - message = '\nTest Defered Set Title SQLDict ' - ZopeTestCase._print(message) - LOG('Testing... ',0,message) + def InvokeAndCancelActivity(self, activity): portal = self.getPortal() organisation = portal.organisation._getOb(self.company_id) organisation.setTitle(self.title1) self.assertEquals(self.title1,organisation.getTitle()) - organisation.activate(activity='SQLDict').setTitle(self.title2) - organisation.reindexObject() + organisation.activate(activity=activity).setTitle(self.title2) # Needed so that the message are commited into the queue get_transaction().commit() + message_list = portal.portal_activities.getMessageList() + self.assertEquals(len(message_list),1) + portal.portal_activities.manageCancel(organisation.getPhysicalPath(),'setTitle') + # Needed so that the message are removed from the queue + get_transaction().commit() self.assertEquals(self.title1,organisation.getTitle()) - portal.portal_activities.distribute() - portal.portal_activities.tic() + message_list = portal.portal_activities.getMessageList() + self.assertEquals(len(message_list),0) + organisation.activate(activity=activity).setTitle(self.title2) + # Needed so that the message are commited into the queue + get_transaction().commit() + message_list = portal.portal_activities.getMessageList() + self.assertEquals(len(message_list),1) + portal.portal_activities.manageInvoke(organisation.getPhysicalPath(),'setTitle') + # Needed so that the message are removed from the queue + get_transaction().commit() self.assertEquals(self.title2,organisation.getTitle()) message_list = portal.portal_activities.getMessageList() self.assertEquals(len(message_list),0) - def test_02_DeferedSetTitleSQLQueue(self, quiet=0, run=run_all_test): - # Test if we can add a complete sales order - if not run: return - if not quiet: - message = '\nTest Defered Set Title SQLQueue ' - ZopeTestCase._print(message) - LOG('Testing... ',0,message) + def DeferedSetTitleActivity(self, activity): portal = self.getPortal() organisation = portal.organisation._getOb(self.company_id) organisation.setTitle(self.title1) self.assertEquals(self.title1,organisation.getTitle()) - organisation.activate(activity='SQLQueue').setTitle(self.title2) + organisation.activate(activity=activity).setTitle(self.title2) # Needed so that the message are commited into the queue get_transaction().commit() self.assertEquals(self.title1,organisation.getTitle()) @@ -156,26 +156,70 @@ class TestCMFActivity(ERP5TypeTestCase): message_list = portal.portal_activities.getMessageList() self.assertEquals(len(message_list),0) - def test_03_DeferedSetTitleRAMDict(self, quiet=0, run=run_all_test): - # Test if we can add a complete sales order - if not run: return - if not quiet: - message = '\nTest Defered Set Title RAMDict ' - ZopeTestCase._print(message) - LOG('Testing... ',0,message) + def CallOnceWithActivity(self, activity): portal = self.getPortal() + def setFoobar(self): + if hasattr(self,'foobar'): + self.foobar = self.foobar + 1 + else: + self.foobar = 1 + def getFoobar(self): + return (getattr(self,'foobar',0)) + from Products.ERP5Type.Document.Organisation import Organisation organisation = portal.organisation._getOb(self.company_id) + Organisation.setFoobar = setFoobar + Organisation.getFoobar = getFoobar + organisation.foobar = 0 organisation.setTitle(self.title1) - self.assertEquals(self.title1,organisation.getTitle()) - organisation.activate(activity='RAMDict').setTitle(self.title2) + self.assertEquals(0,organisation.getFoobar()) + organisation.activate(activity=activity).setFoobar() # Needed so that the message are commited into the queue get_transaction().commit() - self.assertEquals(self.title1,organisation.getTitle()) + message_list = portal.portal_activities.getMessageList() + self.assertEquals(len(message_list),1) portal.portal_activities.distribute() portal.portal_activities.tic() - self.assertEquals(self.title2,organisation.getTitle()) + self.assertEquals(1,organisation.getFoobar()) message_list = portal.portal_activities.getMessageList() self.assertEquals(len(message_list),0) + organisation.activate(activity=activity).setFoobar() + # Needed so that the message are commited into the queue + get_transaction().commit() + message_list = portal.portal_activities.getMessageList() + self.assertEquals(len(message_list),1) + portal.portal_activities.manageInvoke(organisation.getPhysicalPath(),'setFoobar') + # Needed so that the message are commited into the queue + get_transaction().commit() + message_list = portal.portal_activities.getMessageList() + self.assertEquals(len(message_list),0) + self.assertEquals(2,organisation.getFoobar()) + + def test_01_DeferedSetTitleSQLDict(self, quiet=0, run=run_all_test): + # Test if we can add a complete sales order + if not run: return + if not quiet: + message = '\nTest Defered Set Title SQLDict ' + ZopeTestCase._print(message) + LOG('Testing... ',0,message) + self.DeferedSetTitleActivity('SQLDict') + + def test_02_DeferedSetTitleSQLQueue(self, quiet=0, run=run_all_test): + # Test if we can add a complete sales order + if not run: return + if not quiet: + message = '\nTest Defered Set Title SQLQueue ' + ZopeTestCase._print(message) + LOG('Testing... ',0,message) + self.DeferedSetTitleActivity('SQLQueue') + + def test_03_DeferedSetTitleRAMDict(self, quiet=0, run=run_all_test): + # Test if we can add a complete sales order + if not run: return + if not quiet: + message = '\nTest Defered Set Title RAMDict ' + ZopeTestCase._print(message) + LOG('Testing... ',0,message) + self.DeferedSetTitleActivity('RAMDict') def test_04_DeferedSetTitleRAMQueue(self, quiet=0, run=run_all_test): # Test if we can add a complete sales order @@ -184,19 +228,7 @@ class TestCMFActivity(ERP5TypeTestCase): message = '\nTest Defered Set Title RAMQueue ' ZopeTestCase._print(message) LOG('Testing... ',0,message) - portal = self.getPortal() - organisation = portal.organisation._getOb(self.company_id) - organisation.setTitle(self.title1) - self.assertEquals(self.title1,organisation.getTitle()) - organisation.activate(activity='RAMQueue').setTitle(self.title2) - # Needed so that the message are commited into the queue - get_transaction().commit() - self.assertEquals(self.title1,organisation.getTitle()) - portal.portal_activities.distribute() - portal.portal_activities.tic() - self.assertEquals(self.title2,organisation.getTitle()) - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),0) + self.DeferedSetTitleActivity('RAMQueue') def test_05_InvokeAndCancelSQLDict(self, quiet=0, run=run_all_test): # Test if we can add a complete sales order @@ -205,32 +237,7 @@ class TestCMFActivity(ERP5TypeTestCase): message = '\nTest Invoke And Cancel SQLDict ' ZopeTestCase._print(message) LOG('Testing... ',0,message) - portal = self.getPortal() - organisation = portal.organisation._getOb(self.company_id) - organisation.setTitle(self.title1) - self.assertEquals(self.title1,organisation.getTitle()) - organisation.activate(activity='SQLDict').setTitle(self.title2) - # Needed so that the message are commited into the queue - get_transaction().commit() - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),1) - portal.portal_activities.manageCancel(organisation.getPhysicalPath(),'setTitle') - # Needed so that the message are removed from the queue - get_transaction().commit() - self.assertEquals(self.title1,organisation.getTitle()) - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),0) - organisation.activate(activity='SQLDict').setTitle(self.title2) - # Needed so that the message are commited into the queue - get_transaction().commit() - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),1) - portal.portal_activities.manageInvoke(organisation.getPhysicalPath(),'setTitle') - # Needed so that the message are removed from the queue - get_transaction().commit() - self.assertEquals(self.title2,organisation.getTitle()) - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),0) + self.InvokeAndCancelActivity('SQLDict') def test_06_InvokeAndCancelSQLQueue(self, quiet=0, run=run_all_test): # Test if we can add a complete sales order @@ -239,188 +246,61 @@ class TestCMFActivity(ERP5TypeTestCase): message = '\nTest Invoke And Cancel SQLQueue ' ZopeTestCase._print(message) LOG('Testing... ',0,message) - portal = self.getPortal() - organisation = portal.organisation._getOb(self.company_id) - organisation.setTitle(self.title1) - self.assertEquals(self.title1,organisation.getTitle()) - organisation.activate(activity='SQLQueue').setTitle(self.title2) - # Needed so that the message are commited into the queue - get_transaction().commit() - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),1) - portal.portal_activities.manageCancel(organisation.getPhysicalPath(),'setTitle') - # Needed so that the message are removed from the queue - get_transaction().commit() - self.assertEquals(self.title1,organisation.getTitle()) - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),0) - organisation.activate(activity='SQLQueue').setTitle(self.title2) - # Needed so that the message are commited into the queue - get_transaction().commit() - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),1) - portal.portal_activities.manageInvoke(organisation.getPhysicalPath(),'setTitle') - # Needed so that the message are removed from the queue - get_transaction().commit() - self.assertEquals(self.title2,organisation.getTitle()) - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),0) + self.InvokeAndCancelActivity('SQLQueue') - def test_07_InvokeAndCancelRAMQueue(self, quiet=0, run=run_all_test): + def test_07_InvokeAndCancelRAMDict(self, quiet=0, run=run_all_test): # Test if we can add a complete sales order if not run: return if not quiet: - message = '\nTest Invoke And Cancel RAMQueue ' + message = '\nTest Invoke And Cancel RAMDict ' ZopeTestCase._print(message) LOG('Testing... ',0,message) - portal = self.getPortal() - organisation = portal.organisation._getOb(self.company_id) - organisation.setTitle(self.title1) - self.assertEquals(self.title1,organisation.getTitle()) - organisation.activate(activity='RAMQueue').setTitle(self.title2) - # Needed so that the message are commited into the queue - get_transaction().commit() - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),1) - portal.portal_activities.manageCancel(organisation.getPhysicalPath(),'setTitle') - # Needed so that the message are removed from the queue - get_transaction().commit() - self.assertEquals(self.title1,organisation.getTitle()) - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),0) - organisation.activate(activity='RAMQueue').setTitle(self.title2) - # Needed so that the message are commited into the queue - get_transaction().commit() - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),1) - portal.portal_activities.manageInvoke(organisation.getPhysicalPath(),'setTitle') - # Needed so that the message are removed from the queue - get_transaction().commit() - self.assertEquals(self.title2,organisation.getTitle()) - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),0) + self.InvokeAndCancelActivity('RAMDict') - def test_08_InvokeAndCancelRAMDict(self, quiet=0, run=run_all_test): + def test_08_InvokeAndCancelRAMQueue(self, quiet=0, run=run_all_test): # Test if we can add a complete sales order if not run: return if not quiet: - message = '\nTest Invoke And Cancel RAMDict ' + message = '\nTest Invoke And Cancel RAMQueue ' ZopeTestCase._print(message) LOG('Testing... ',0,message) - portal = self.getPortal() - organisation = portal.organisation._getOb(self.company_id) - organisation.setTitle(self.title1) - self.assertEquals(self.title1,organisation.getTitle()) - organisation.activate(activity='RAMDict').setTitle(self.title2) - # Needed so that the message are commited into the queue - get_transaction().commit() - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),1) - portal.portal_activities.manageCancel(organisation.getPhysicalPath(),'setTitle') - # Needed so that the message are removed from the queue - get_transaction().commit() - self.assertEquals(self.title1,organisation.getTitle()) - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),0) - organisation.activate(activity='RAMDict').setTitle(self.title2) - # Needed so that the message are commited into the queue - get_transaction().commit() - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),1) - portal.portal_activities.manageInvoke(organisation.getPhysicalPath(),'setTitle') - # Needed so that the message are removed from the queue - get_transaction().commit() - self.assertEquals(self.title2,organisation.getTitle()) - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),0) + self.InvokeAndCancelActivity('RAMQueue') - def test_09_CallOnceWithSQLQueue(self, quiet=0, run=run_all_test): + def test_09_CallOnceWithSQLDict(self, quiet=0, run=run_all_test): + # Test if we call methods only once + if not run: return + if not quiet: + message = '\nCall Once With SQL Dict ' + ZopeTestCase._print(message) + LOG('Testing... ',0,message) + self.CallOnceWithActivity('SQLDict') + + def test_10_CallOnceWithSQLQueue(self, quiet=0, run=run_all_test): # Test if we call methods only once if not run: return if not quiet: message = '\nCall Once With SQL Queue ' ZopeTestCase._print(message) LOG('Testing... ',0,message) - portal = self.getPortal() - def setFoobar(self): - if hasattr(self,'foobar'): - self.foobar = self.foobar + 1 - else: - self.foobar = 1 - def getFoobar(self): - return (getattr(self,'foobar',0)) - from Products.ERP5Type.Document.Organisation import Organisation - organisation = portal.organisation._getOb(self.company_id) - Organisation.setFoobar = setFoobar - Organisation.getFoobar = getFoobar - organisation.foobar = 0 - organisation.setTitle(self.title1) - self.assertEquals(0,organisation.getFoobar()) - organisation.activate(activity='SQLQueue').setFoobar() - # Needed so that the message are commited into the queue - get_transaction().commit() - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),1) - portal.portal_activities.distribute() - portal.portal_activities.tic() - self.assertEquals(1,organisation.getFoobar()) - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),0) - organisation.activate(activity='SQLQueue').setFoobar() - # Needed so that the message are commited into the queue - get_transaction().commit() - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),1) - portal.portal_activities.manageInvoke(organisation.getPhysicalPath(),'setFoobar') - # Needed so that the message are commited into the queue - get_transaction().commit() - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),0) - self.assertEquals(2,organisation.getFoobar()) + self.CallOnceWithActivity('SQLQueue') + + def test_11_CallOnceWithRAMDict(self, quiet=0, run=run_all_test): + # Test if we call methods only once + if not run: return + if not quiet: + message = '\nCall Once With RAM Dict ' + ZopeTestCase._print(message) + LOG('Testing... ',0,message) + self.CallOnceWithActivity('RAMDict') - def test_10_CallOnceWithRAMQueue(self, quiet=0, run=run_all_test): + def test_12_CallOnceWithRAMQueue(self, quiet=0, run=run_all_test): # Test if we call methods only once if not run: return if not quiet: message = '\nCall Once With RAM Queue ' ZopeTestCase._print(message) LOG('Testing... ',0,message) - portal = self.getPortal() - def setFoobar(self): - if hasattr(self,'foobar'): - self.foobar = self.foobar + 1 - else: - self.foobar = 1 - def getFoobar(self): - return (getattr(self,'foobar',0)) - from Products.ERP5Type.Document.Organisation import Organisation - organisation = portal.organisation._getOb(self.company_id) - Organisation.setFoobar = setFoobar - Organisation.getFoobar = getFoobar - organisation.setTitle(self.title1) - organisation.foobar = 0 - self.assertEquals(0,organisation.getFoobar()) - organisation.activate(activity='RAMQueue').setFoobar() - # Needed so that the message are commited into the queue - get_transaction().commit() - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),1) - portal.portal_activities.distribute() - portal.portal_activities.tic() - self.assertEquals(1,organisation.getFoobar()) - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),0) - organisation.activate(activity='RAMQueue').setFoobar() - # Needed so that the message are commited into the queue - get_transaction().commit() - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),1) - portal.portal_activities.manageInvoke(organisation.getPhysicalPath(),'setFoobar') - # Needed so that the message are commited into the queue - get_transaction().commit() - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),0) - self.assertEquals(2,organisation.getFoobar()) + self.CallOnceWithActivity('RAMQueue') -- 2.30.9