Commit 9786c47d authored by Julien Muchembled's avatar Julien Muchembled Committed by Cédric Le Ninivin

CMFActivity: more test clean-up

parent bb07e0f4
...@@ -34,17 +34,18 @@ from Testing import ZopeTestCase ...@@ -34,17 +34,18 @@ from Testing import ZopeTestCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import createZODBPythonScript from Products.ERP5Type.tests.utils import createZODBPythonScript
from Products.ERP5Type.Base import Base from Products.ERP5Type.Base import Base
from Products.CMFActivity import ActivityTool
from Products.CMFActivity.Activity.SQLBase import INVOKE_ERROR_STATE from Products.CMFActivity.Activity.SQLBase import INVOKE_ERROR_STATE
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
from Products.CMFActivity.Activity.SQLDict import SQLDict from Products.CMFActivity.Activity.SQLDict import SQLDict
import Products.CMFActivity.ActivityTool
from Products.CMFActivity.Errors import ActivityPendingError, ActivityFlushError from Products.CMFActivity.Errors import ActivityPendingError, ActivityFlushError
from erp5.portal_type import Organisation from erp5.portal_type import Organisation
from AccessControl.SecurityManagement import newSecurityManager from AccessControl.SecurityManagement import newSecurityManager
from zLOG import LOG from zLOG import LOG
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
from DateTime import DateTime from DateTime import DateTime
from Products.CMFActivity.ActivityTool import Message from Products.CMFActivity.ActivityTool import (
cancelProcessShutdown, Message, getCurrentNode, getServerAddress)
from _mysql_exceptions import OperationalError from _mysql_exceptions import OperationalError
from Products.ZMySQLDA.db import DB from Products.ZMySQLDA.db import DB
from sklearn.externals.joblib.hashing import hash as joblib_hash from sklearn.externals.joblib.hashing import hash as joblib_hash
...@@ -53,7 +54,6 @@ import random ...@@ -53,7 +54,6 @@ import random
import threading import threading
import weakref import weakref
import transaction import transaction
from Products.CMFActivity.ActivityTool import getCurrentNode, getServerAddress
from App.config import getConfiguration from App.config import getConfiguration
from asyncore import socket_map from asyncore import socket_map
import socket import socket
...@@ -562,7 +562,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -562,7 +562,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Monkey patch Queue to induce conflict errors artificially. # Monkey patch Queue to induce conflict errors artificially.
def validate(self, *args, **kwargs): def validate(self, *args, **kwargs):
from Products.CMFActivity.Activity.Queue import Queue
if Queue.current_num_conflict_errors < Queue.conflict_errors_limit: if Queue.current_num_conflict_errors < Queue.conflict_errors_limit:
Queue.current_num_conflict_errors += 1 Queue.current_num_conflict_errors += 1
# LOG('TryConflictErrorsWhileValidating', 0, 'causing a conflict error artificially') # LOG('TryConflictErrorsWhileValidating', 0, 'causing a conflict error artificially')
...@@ -910,7 +909,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -910,7 +909,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
title=original_title) title=original_title)
# Monkey patch Organisation to add a failing method # Monkey patch Organisation to add a failing method
def failingMethod(self): def failingMethod(self):
raise ValueError, 'This method always fail' raise ValueError('This method always fail')
Organisation.failingMethod = failingMethod Organisation.failingMethod = failingMethod
activity_list = ['SQLQueue', 'SQLDict', 'SQLJoblib'] activity_list = ['SQLQueue', 'SQLDict', 'SQLJoblib']
...@@ -969,7 +968,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -969,7 +968,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Monkey patch Organisation to add a failing method # Monkey patch Organisation to add a failing method
def failingMethod(self): def failingMethod(self):
raise ValueError, 'This method always fail' raise ValueError('This method always fail')
Organisation.failingMethod = failingMethod Organisation.failingMethod = failingMethod
# First, index the object. # First, index the object.
...@@ -1276,7 +1275,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1276,7 +1275,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
""" """
Test that the loss of volatile attribute doesn't loose activities Test that the loss of volatile attribute doesn't loose activities
""" """
self.tic()
activity_tool = self.getActivityTool() activity_tool = self.getActivityTool()
message_list = activity_tool.getMessageList() message_list = activity_tool.getMessageList()
self.assertEqual(len(message_list), 0) self.assertEqual(len(message_list), 0)
...@@ -1306,14 +1304,13 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1306,14 +1304,13 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
Queues supporting message batch processing: Queues supporting message batch processing:
- SQLQueue - SQLQueue
""" """
self.tic()
activity_tool = self.getActivityTool() activity_tool = self.getActivityTool()
obj = self.portal.organisation_module.newContent(portal_type='Organisation') obj = self.portal.organisation_module.newContent(portal_type='Organisation')
active_obj = obj.activate(activity='SQLQueue') active_obj = obj.activate(activity='SQLQueue')
def appendToTitle(self, to_append, fail=False): def appendToTitle(self, to_append, fail=False):
self.setTitle(self.getTitle() + to_append) self.setTitle(self.getTitle() + to_append)
if fail: if fail:
raise ValueError, 'This method always fail' raise ValueError('This method always fail')
try: try:
Organisation.appendToTitle = appendToTitle Organisation.appendToTitle = appendToTitle
obj.setTitle('a') obj.setTitle('a')
...@@ -1337,7 +1334,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1337,7 +1334,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
This only apply to queues supporting batch processing: This only apply to queues supporting batch processing:
- SQLQueue - SQLQueue
""" """
self.tic()
obj = self.portal.organisation_module.newContent(portal_type='Organisation', title='Pending') obj = self.portal.organisation_module.newContent(portal_type='Organisation', title='Pending')
marker_id = 'marker_%i' % (random.randint(1, 10), ) marker_id = 'marker_%i' % (random.randint(1, 10), )
def putMarkerValue(self, marker_id): def putMarkerValue(self, marker_id):
...@@ -1409,16 +1405,14 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1409,16 +1405,14 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.TryUserNotificationOnActivityFailure('SQLQueue') self.TryUserNotificationOnActivityFailure('SQLQueue')
def TryUserNotificationRaise(self, activity): def TryUserNotificationRaise(self, activity):
self.tic()
obj = self.portal.organisation_module.newContent(portal_type='Organisation') obj = self.portal.organisation_module.newContent(portal_type='Organisation')
self.tic() self.tic()
from Products.CMFActivity.ActivityTool import Message
original_notifyUser = Message.notifyUser original_notifyUser = Message.notifyUser
def failingMethod(self, *args, **kw): def failingMethod(self, *args, **kw):
raise ValueError, 'This method always fail' raise ValueError('This method always fail')
Message.notifyUser = failingMethod Message.notifyUser = failingMethod
Organisation.failingMethod = failingMethod Organisation.failingMethod = failingMethod
getMessageList = self.getPortalObject().portal_activities.getMessageList getMessageList = self.portal.portal_activities.getMessageList
try: try:
obj.activate(activity=activity, priority=6).failingMethod() obj.activate(activity=activity, priority=6).failingMethod()
self.commit() self.commit()
...@@ -1453,7 +1447,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1453,7 +1447,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
error be raised in tpc_vote) does not cause activity connection to error be raised in tpc_vote) does not cause activity connection to
stall. stall.
""" """
self.tic()
activity_tool = self.getActivityTool() activity_tool = self.getActivityTool()
try: try:
Organisation.registerFailingTransactionManager = registerFailingTransactionManager Organisation.registerFailingTransactionManager = registerFailingTransactionManager
...@@ -1465,7 +1458,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1465,7 +1458,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.flushAllActivities(silent=1, loop_size=100) self.flushAllActivities(silent=1, loop_size=100)
self.commit() self.commit()
# Check that cmf_activity SQL connection still works # Check that cmf_activity SQL connection still works
connection_da = self.getPortalObject().cmf_activity_sql_connection() connection_da = self.portal.cmf_activity_sql_connection()
self.assertFalse(connection_da._registered) self.assertFalse(connection_da._registered)
connection_da.query('select 1') connection_da.query('select 1')
self.assertTrue(connection_da._registered) self.assertTrue(connection_da._registered)
...@@ -1483,7 +1476,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1483,7 +1476,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
def TryActivityRaiseInCommitDoesNotLooseMessages(self, activity): def TryActivityRaiseInCommitDoesNotLooseMessages(self, activity):
""" """
""" """
self.tic()
activity_tool = self.getActivityTool() activity_tool = self.getActivityTool()
try: try:
Organisation.registerFailingTransactionManager = registerFailingTransactionManager Organisation.registerFailingTransactionManager = registerFailingTransactionManager
...@@ -1505,7 +1497,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1505,7 +1497,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.TryActivityRaiseInCommitDoesNotLooseMessages('SQLQueue') self.TryActivityRaiseInCommitDoesNotLooseMessages('SQLQueue')
def TryChangeSkinInActivity(self, activity): def TryChangeSkinInActivity(self, activity):
self.tic()
activity_tool = self.getActivityTool() activity_tool = self.getActivityTool()
def changeSkinToNone(self): def changeSkinToNone(self):
self.getPortalObject().changeSkin(None) self.getPortalObject().changeSkin(None)
...@@ -1568,7 +1559,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1568,7 +1559,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
Deduplication is cheap: Deduplication is cheap:
- inside the transaction which spawned duplicate activities, because it - inside the transaction which spawned duplicate activities, because it
has to have created activities around anyway, and can keep track has to have created activities around anyway, and can keep track
- inside the CMFActvitiy-level processing surrounding activity execution - inside the CMFActivity-level processing surrounding activity execution
because it has to load the activities to process them anyway because it has to load the activities to process them anyway
""" """
activity_tool = self.getActivityTool() activity_tool = self.getActivityTool()
...@@ -1868,7 +1859,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1868,7 +1859,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
def first(context): def first(context):
context.changeSkin(skin_selection_name) context.changeSkin(skin_selection_name)
if getattr(context, script_id, None) is not None: if getattr(context, script_id, None) is not None:
raise Exception, '%s is not supposed to be found here.' % (script_id, ) raise Exception('%s is not supposed to be found here.' % script_id)
def second(context): def second(context):
# If the wrong skin is selected this will raise. # If the wrong skin is selected this will raise.
getattr(context, script_id) getattr(context, script_id)
...@@ -1919,7 +1910,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1919,7 +1910,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
rendez_vous_event.set() rendez_vous_event.set()
# When this event is available, it means test has called process_shutdown. # When this event is available, it means test has called process_shutdown.
activity_event.wait() activity_event.wait()
from Products.CMFActivity.Activity.SQLDict import SQLDict
original_dequeue = SQLDict.dequeueMessage original_dequeue = SQLDict.dequeueMessage
queue_tic_test_dict = {} queue_tic_test_dict = {}
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
...@@ -1983,7 +1973,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1983,7 +1973,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(len(activity_tool.getMessageList()), 1) self.assertEqual(len(activity_tool.getMessageList()), 1)
finally: finally:
# Put activity tool back in a working state # Put activity tool back in a working state
from Products.CMFActivity.ActivityTool import cancelProcessShutdown
try: try:
cancelProcessShutdown() cancelProcessShutdown()
except StandardException: except StandardException:
...@@ -2151,11 +2140,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2151,11 +2140,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
then serialization tag guarantees that only one of the same serialization then serialization tag guarantees that only one of the same serialization
tagged activities can be processed at the same time. tagged activities can be processed at the same time.
""" """
from Products.CMFActivity import ActivityTool
portal = self.portal portal = self.portal
activity_tool = portal.portal_activities activity_tool = portal.portal_activities
self.tic()
# Add 6 activities # Add 6 activities
portal.organisation_module.activate(activity='SQLDict', tag='', serialization_tag='test_115').getId() portal.organisation_module.activate(activity='SQLDict', tag='', serialization_tag='test_115').getId()
...@@ -2175,7 +2161,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2175,7 +2161,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool.distribute() activity_tool.distribute()
self.commit() self.commit()
from Products.CMFActivity import ActivityTool
activity = ActivityTool.activity_dict['SQLDict'] activity = ActivityTool.activity_dict['SQLDict']
activity.getProcessableMessageList(activity_tool, 1) activity.getProcessableMessageList(activity_tool, 1)
self.commit() self.commit()
...@@ -2212,8 +2197,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2212,8 +2197,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
def test_116_RaiseInCommitBeforeMessageExecution(self): def test_116_RaiseInCommitBeforeMessageExecution(self):
""" """
Test behaviour of CMFActivity when the commit just before message Test behaviour of CMFActivity when the commit just before message
execution fails. In particular, CMFActivity should restart the execution fails. In particular, it should restart the messages it
activities it selected (processing=1) instead of ignoring them forever. selected (processing_node=current_node) instead of ignoring them forever.
""" """
processed = [] processed = []
activity_tool = self.portal.portal_activities activity_tool = self.portal.portal_activities
...@@ -2228,7 +2213,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2228,7 +2213,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Normally, the request stops here and Zope aborts the transaction # Normally, the request stops here and Zope aborts the transaction
self.abort() self.abort()
self.assertEqual(processed, []) self.assertEqual(processed, [])
# Activity is already in 'processing=1' state. Check tic reselects it. # Activity is already reserved for current node. Check tic reselects it.
activity_tool.tic() activity_tool.tic()
self.assertEqual(processed, [activity]) self.assertEqual(processed, [activity])
del processed[:] del processed[:]
...@@ -2275,12 +2260,11 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2275,12 +2260,11 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
def TryNotificationSavedOnEventLogWhenNotifyUserRaises(self, activity): def TryNotificationSavedOnEventLogWhenNotifyUserRaises(self, activity):
activity_tool = self.getActivityTool() activity_tool = self.getActivityTool()
self.tic()
obj = self.portal.organisation_module.newContent(portal_type='Organisation') obj = self.portal.organisation_module.newContent(portal_type='Organisation')
self.tic() self.tic()
original_notifyUser = Message.notifyUser.im_func original_notifyUser = Message.notifyUser.im_func
def failSendingEmail(self, *args, **kw): def failSendingEmail(self, *args, **kw):
raise MailHostError, 'Mail is not sent' raise MailHostError('Mail is not sent')
activity_unit_test_error = Exception() activity_unit_test_error = Exception()
def failingMethod(self): def failingMethod(self):
raise activity_unit_test_error raise activity_unit_test_error
...@@ -2325,7 +2309,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2325,7 +2309,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool = self.getActivityTool() activity_tool = self.getActivityTool()
# With Message.__call__ # With Message.__call__
# 1: activity context does not exist when activity is executed # 1: activity context does not exist when activity is executed
self.tic()
obj = self.portal.organisation_module.newContent(portal_type='Organisation') obj = self.portal.organisation_module.newContent(portal_type='Organisation')
self.tic() self.tic()
notification_done = [] notification_done = []
...@@ -2334,7 +2317,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2334,7 +2317,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.traceback = None self.traceback = None
original_notifyUser = Message.notifyUser original_notifyUser = Message.notifyUser
def failingMethod(self): def failingMethod(self):
raise ValueError, "This method always fail" raise ValueError("This method always fail")
Message.notifyUser = fake_notifyUser Message.notifyUser = fake_notifyUser
Organisation.failingMethod = failingMethod Organisation.failingMethod = failingMethod
try: try:
...@@ -2845,7 +2828,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2845,7 +2828,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
skin.manage_delObjects([script_id]) skin.manage_delObjects([script_id])
self.tic() self.tic()
def testGetCurrentNode(self): def test_getCurrentNode(self):
current_node = getattr(getConfiguration(), 'product_config', {}) \ current_node = getattr(getConfiguration(), 'product_config', {}) \
.get('cmfactivity', {}).get('node-id') .get('cmfactivity', {}).get('node-id')
if not current_node: if not current_node:
...@@ -2855,7 +2838,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2855,7 +2838,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_node = self.portal.portal_activities.getCurrentNode() activity_node = self.portal.portal_activities.getCurrentNode()
self.assertEqual(activity_node, current_node) self.assertEqual(activity_node, current_node)
def testGetServerAddress(self): def test_getServerAddress(self):
ip = port = '' ip = port = ''
for k, v in socket_map.items(): for k, v in socket_map.items():
if hasattr(v, 'addr'): if hasattr(v, 'addr'):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment