Commit 8aa2d4f4 authored by Sebastien Robin's avatar Sebastien Robin

cleaned CMFActivity unit test and added two more test


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@736 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent b7088a74
......@@ -113,40 +113,40 @@ class TestCMFActivity(ERP5TypeTestCase):
user = uf.getUserById('seb').__of__(uf)
newSecurityManager(None, user)
def test_01_DeferedSetTitleSQLDict(self, quiet=0, run=run_all_test):
# Test if we can add a complete sales order
if not run: return
if not quiet:
message = '\nTest Defered Set Title SQLDict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
def InvokeAndCancelActivity(self, activity):
portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id)
organisation.setTitle(self.title1)
self.assertEquals(self.title1,organisation.getTitle())
organisation.activate(activity='SQLDict').setTitle(self.title2)
organisation.reindexObject()
organisation.activate(activity=activity).setTitle(self.title2)
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
portal.portal_activities.manageCancel(organisation.getPhysicalPath(),'setTitle')
# Needed so that the message are removed from the queue
get_transaction().commit()
self.assertEquals(self.title1,organisation.getTitle())
portal.portal_activities.distribute()
portal.portal_activities.tic()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
organisation.activate(activity=activity).setTitle(self.title2)
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
portal.portal_activities.manageInvoke(organisation.getPhysicalPath(),'setTitle')
# Needed so that the message are removed from the queue
get_transaction().commit()
self.assertEquals(self.title2,organisation.getTitle())
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
def test_02_DeferedSetTitleSQLQueue(self, quiet=0, run=run_all_test):
# Test if we can add a complete sales order
if not run: return
if not quiet:
message = '\nTest Defered Set Title SQLQueue '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
def DeferedSetTitleActivity(self, activity):
portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id)
organisation.setTitle(self.title1)
self.assertEquals(self.title1,organisation.getTitle())
organisation.activate(activity='SQLQueue').setTitle(self.title2)
organisation.activate(activity=activity).setTitle(self.title2)
# Needed so that the message are commited into the queue
get_transaction().commit()
self.assertEquals(self.title1,organisation.getTitle())
......@@ -156,26 +156,70 @@ class TestCMFActivity(ERP5TypeTestCase):
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
def test_03_DeferedSetTitleRAMDict(self, quiet=0, run=run_all_test):
# Test if we can add a complete sales order
if not run: return
if not quiet:
message = '\nTest Defered Set Title RAMDict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
def CallOnceWithActivity(self, activity):
portal = self.getPortal()
def setFoobar(self):
if hasattr(self,'foobar'):
self.foobar = self.foobar + 1
else:
self.foobar = 1
def getFoobar(self):
return (getattr(self,'foobar',0))
from Products.ERP5Type.Document.Organisation import Organisation
organisation = portal.organisation._getOb(self.company_id)
Organisation.setFoobar = setFoobar
Organisation.getFoobar = getFoobar
organisation.foobar = 0
organisation.setTitle(self.title1)
self.assertEquals(self.title1,organisation.getTitle())
organisation.activate(activity='RAMDict').setTitle(self.title2)
self.assertEquals(0,organisation.getFoobar())
organisation.activate(activity=activity).setFoobar()
# Needed so that the message are commited into the queue
get_transaction().commit()
self.assertEquals(self.title1,organisation.getTitle())
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
portal.portal_activities.distribute()
portal.portal_activities.tic()
self.assertEquals(self.title2,organisation.getTitle())
self.assertEquals(1,organisation.getFoobar())
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
organisation.activate(activity=activity).setFoobar()
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
portal.portal_activities.manageInvoke(organisation.getPhysicalPath(),'setFoobar')
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
self.assertEquals(2,organisation.getFoobar())
def test_01_DeferedSetTitleSQLDict(self, quiet=0, run=run_all_test):
# Test if we can add a complete sales order
if not run: return
if not quiet:
message = '\nTest Defered Set Title SQLDict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.DeferedSetTitleActivity('SQLDict')
def test_02_DeferedSetTitleSQLQueue(self, quiet=0, run=run_all_test):
# Test if we can add a complete sales order
if not run: return
if not quiet:
message = '\nTest Defered Set Title SQLQueue '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.DeferedSetTitleActivity('SQLQueue')
def test_03_DeferedSetTitleRAMDict(self, quiet=0, run=run_all_test):
# Test if we can add a complete sales order
if not run: return
if not quiet:
message = '\nTest Defered Set Title RAMDict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.DeferedSetTitleActivity('RAMDict')
def test_04_DeferedSetTitleRAMQueue(self, quiet=0, run=run_all_test):
# Test if we can add a complete sales order
......@@ -184,19 +228,7 @@ class TestCMFActivity(ERP5TypeTestCase):
message = '\nTest Defered Set Title RAMQueue '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id)
organisation.setTitle(self.title1)
self.assertEquals(self.title1,organisation.getTitle())
organisation.activate(activity='RAMQueue').setTitle(self.title2)
# Needed so that the message are commited into the queue
get_transaction().commit()
self.assertEquals(self.title1,organisation.getTitle())
portal.portal_activities.distribute()
portal.portal_activities.tic()
self.assertEquals(self.title2,organisation.getTitle())
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
self.DeferedSetTitleActivity('RAMQueue')
def test_05_InvokeAndCancelSQLDict(self, quiet=0, run=run_all_test):
# Test if we can add a complete sales order
......@@ -205,32 +237,7 @@ class TestCMFActivity(ERP5TypeTestCase):
message = '\nTest Invoke And Cancel SQLDict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id)
organisation.setTitle(self.title1)
self.assertEquals(self.title1,organisation.getTitle())
organisation.activate(activity='SQLDict').setTitle(self.title2)
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
portal.portal_activities.manageCancel(organisation.getPhysicalPath(),'setTitle')
# Needed so that the message are removed from the queue
get_transaction().commit()
self.assertEquals(self.title1,organisation.getTitle())
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
organisation.activate(activity='SQLDict').setTitle(self.title2)
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
portal.portal_activities.manageInvoke(organisation.getPhysicalPath(),'setTitle')
# Needed so that the message are removed from the queue
get_transaction().commit()
self.assertEquals(self.title2,organisation.getTitle())
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
self.InvokeAndCancelActivity('SQLDict')
def test_06_InvokeAndCancelSQLQueue(self, quiet=0, run=run_all_test):
# Test if we can add a complete sales order
......@@ -239,188 +246,61 @@ class TestCMFActivity(ERP5TypeTestCase):
message = '\nTest Invoke And Cancel SQLQueue '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id)
organisation.setTitle(self.title1)
self.assertEquals(self.title1,organisation.getTitle())
organisation.activate(activity='SQLQueue').setTitle(self.title2)
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
portal.portal_activities.manageCancel(organisation.getPhysicalPath(),'setTitle')
# Needed so that the message are removed from the queue
get_transaction().commit()
self.assertEquals(self.title1,organisation.getTitle())
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
organisation.activate(activity='SQLQueue').setTitle(self.title2)
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
portal.portal_activities.manageInvoke(organisation.getPhysicalPath(),'setTitle')
# Needed so that the message are removed from the queue
get_transaction().commit()
self.assertEquals(self.title2,organisation.getTitle())
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
self.InvokeAndCancelActivity('SQLQueue')
def test_07_InvokeAndCancelRAMQueue(self, quiet=0, run=run_all_test):
def test_07_InvokeAndCancelRAMDict(self, quiet=0, run=run_all_test):
# Test if we can add a complete sales order
if not run: return
if not quiet:
message = '\nTest Invoke And Cancel RAMQueue '
message = '\nTest Invoke And Cancel RAMDict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id)
organisation.setTitle(self.title1)
self.assertEquals(self.title1,organisation.getTitle())
organisation.activate(activity='RAMQueue').setTitle(self.title2)
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
portal.portal_activities.manageCancel(organisation.getPhysicalPath(),'setTitle')
# Needed so that the message are removed from the queue
get_transaction().commit()
self.assertEquals(self.title1,organisation.getTitle())
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
organisation.activate(activity='RAMQueue').setTitle(self.title2)
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
portal.portal_activities.manageInvoke(organisation.getPhysicalPath(),'setTitle')
# Needed so that the message are removed from the queue
get_transaction().commit()
self.assertEquals(self.title2,organisation.getTitle())
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
self.InvokeAndCancelActivity('RAMDict')
def test_08_InvokeAndCancelRAMDict(self, quiet=0, run=run_all_test):
def test_08_InvokeAndCancelRAMQueue(self, quiet=0, run=run_all_test):
# Test if we can add a complete sales order
if not run: return
if not quiet:
message = '\nTest Invoke And Cancel RAMDict '
message = '\nTest Invoke And Cancel RAMQueue '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id)
organisation.setTitle(self.title1)
self.assertEquals(self.title1,organisation.getTitle())
organisation.activate(activity='RAMDict').setTitle(self.title2)
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
portal.portal_activities.manageCancel(organisation.getPhysicalPath(),'setTitle')
# Needed so that the message are removed from the queue
get_transaction().commit()
self.assertEquals(self.title1,organisation.getTitle())
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
organisation.activate(activity='RAMDict').setTitle(self.title2)
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
portal.portal_activities.manageInvoke(organisation.getPhysicalPath(),'setTitle')
# Needed so that the message are removed from the queue
get_transaction().commit()
self.assertEquals(self.title2,organisation.getTitle())
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
self.InvokeAndCancelActivity('RAMQueue')
def test_09_CallOnceWithSQLQueue(self, quiet=0, run=run_all_test):
def test_09_CallOnceWithSQLDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nCall Once With SQL Dict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.CallOnceWithActivity('SQLDict')
def test_10_CallOnceWithSQLQueue(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nCall Once With SQL Queue '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
def setFoobar(self):
if hasattr(self,'foobar'):
self.foobar = self.foobar + 1
else:
self.foobar = 1
def getFoobar(self):
return (getattr(self,'foobar',0))
from Products.ERP5Type.Document.Organisation import Organisation
organisation = portal.organisation._getOb(self.company_id)
Organisation.setFoobar = setFoobar
Organisation.getFoobar = getFoobar
organisation.foobar = 0
organisation.setTitle(self.title1)
self.assertEquals(0,organisation.getFoobar())
organisation.activate(activity='SQLQueue').setFoobar()
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
portal.portal_activities.distribute()
portal.portal_activities.tic()
self.assertEquals(1,organisation.getFoobar())
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
organisation.activate(activity='SQLQueue').setFoobar()
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
portal.portal_activities.manageInvoke(organisation.getPhysicalPath(),'setFoobar')
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
self.assertEquals(2,organisation.getFoobar())
self.CallOnceWithActivity('SQLQueue')
def test_11_CallOnceWithRAMDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nCall Once With RAM Dict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.CallOnceWithActivity('RAMDict')
def test_10_CallOnceWithRAMQueue(self, quiet=0, run=run_all_test):
def test_12_CallOnceWithRAMQueue(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nCall Once With RAM Queue '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
def setFoobar(self):
if hasattr(self,'foobar'):
self.foobar = self.foobar + 1
else:
self.foobar = 1
def getFoobar(self):
return (getattr(self,'foobar',0))
from Products.ERP5Type.Document.Organisation import Organisation
organisation = portal.organisation._getOb(self.company_id)
Organisation.setFoobar = setFoobar
Organisation.getFoobar = getFoobar
organisation.setTitle(self.title1)
organisation.foobar = 0
self.assertEquals(0,organisation.getFoobar())
organisation.activate(activity='RAMQueue').setFoobar()
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
portal.portal_activities.distribute()
portal.portal_activities.tic()
self.assertEquals(1,organisation.getFoobar())
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
organisation.activate(activity='RAMQueue').setFoobar()
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
portal.portal_activities.manageInvoke(organisation.getPhysicalPath(),'setFoobar')
# Needed so that the message are commited into the queue
get_transaction().commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
self.assertEquals(2,organisation.getFoobar())
self.CallOnceWithActivity('RAMQueue')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment