Commit 0474c88f authored by Vincent Pelletier's avatar Vincent Pelletier

ProcessingNodeTestCase: Do not retry failed activities.

Including activities which would be allowed to retry later.
parent 2cddee38
......@@ -3,6 +3,11 @@ import base64, errno, os, select, socket, sys, time
from threading import Thread
from UserDict import IterableUserDict
import Lifetime
from AccessControl.SecurityManagement import (
newSecurityManager,
setSecurityManager,
getSecurityManager,
)
import transaction
from Testing import ZopeTestCase
from ZODB.POSException import ConflictError
......@@ -10,7 +15,7 @@ from zLOG import LOG, ERROR
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
from Products.ERP5Type.tests.utils import addUserToDeveloperRole
from Products.ERP5Type.tests.utils import createZServer
from Products.CMFActivity.ActivityTool import getCurrentNode
from Products.CMFActivity.ActivityTool import getCurrentNode, Message
class DictPersistentWrapper(IterableUserDict, object):
......@@ -31,6 +36,20 @@ class DictPersistentWrapper(IterableUserDict, object):
self.data = dict
self._persistent_object = persistent_object
class ActivityFailed(RuntimeError):
def __init__(self, activity_list, last_error):
self.activity_list = activity_list
self.last_error = last_error
super(ActivityFailed, self).__init__()
def __str__(self):
return 'tic is looping forever. These messages are pending: %r\n%s' % (
[
('/'.join(m.object_path), m.method_id, m.processing_node, m.retry)
for m in self.activity_list
],
self.last_error,
)
def patchActivityTool():
"""Redefine several methods of ActivityTool for unit tests
......@@ -186,6 +205,16 @@ class ProcessingNodeTestCase(ZopeTestCase.TestCase):
transaction.commit()
self._close()
def _getLastError(self):
error_log = self.portal.error_log._getLog()
if len(error_log):
return (
'Last error message:\n'
'%(type)s\n'
'%(value)s\n'
'%(tb_text)s' % error_log[-1]
)
def assertNoPendingMessage(self):
"""Get the last error message from error_log"""
message_list = self.portal.portal_activities.getMessageList()
......@@ -193,11 +222,9 @@ class ProcessingNodeTestCase(ZopeTestCase.TestCase):
error_message = 'These messages are pending: %r' % [
('/'.join(m.object_path), m.method_id, m.processing_node, m.retry)
for m in message_list]
error_log = self.portal.error_log._getLog()
if len(error_log):
error_message += '\nLast error message:' \
'\n%(type)s\n%(value)s\n%(tb_text)s' \
% error_log[-1]
last_error = self._getLastError()
if last_error:
error_message += '\n' + last_error
self.fail(error_message)
def abort(self):
......@@ -213,45 +240,58 @@ class ProcessingNodeTestCase(ZopeTestCase.TestCase):
transaction.commit()
# Some tests like testDeferredStyle require that we use self.getPortal()
# instead of self.portal in order to setup current skin.
portal_activities = self.getPortal().portal_activities
portal = self.getPortal()
portal_activities = portal.portal_activities
if verbose:
ZopeTestCase._print('Executing pending activities ...')
old_message_count = 0
start = time.time()
count = 1000
getMessageList = portal_activities.getMessageList
message_list = getMessageList()
message_count = len(message_list)
while message_count and not stop_condition(message_list):
if verbose and old_message_count != message_count:
ZopeTestCase._print(' %i' % message_count)
old_message_count = message_count
portal_activities.process_timer(None, None)
if Lifetime._shutdown_phase:
# XXX CMFActivity contains bare excepts
raise KeyboardInterrupt
message_list = getMessageList()
message_count = len(message_list)
# This prevents an infinite loop.
count -= 1
if not count or message_count and all(x.processing_node == -2
for x in message_list):
# We're about to raise RuntimeError, but maybe we've reached
# the stop condition, so check just once more:
if stop_condition(message_list):
pre_failed_uid_set = {
x.uid
for x in getMessageList()
if x.processing_node < -1
}
portal.changeSkin(None)
old_sm = getSecurityManager()
old_Message_load = Message.load
def Message_load(s, **kw):
"""
Prevent activity retries, as activities must succeed from the first try
in a unit test environment.
This is to catch missing activity dependencies which only work because
activities are being retried until they eventually succeed.
"""
kw['max_retry'] = 0
kw['conflict_retry'] = False
return old_Message_load(s, **kw)
try:
Message.load = staticmethod(Message_load)
newSecurityManager(None, portal.portal_catalog.getWrappedOwner())
while True:
if verbose:
ZopeTestCase._print(' %i' % len(getMessageList()))
# Put everything in the past - hopefully no activity will have been
# pushed that far in the future.
portal_activities.timeShift(30 * VALIDATION_ERROR_DELAY)
portal_activities.distribute()
portal_activities.tic()
self.commit()
message_list = getMessageList()
if not message_list or stop_condition(message_list):
break
error_message = 'tic is looping forever. '
try:
self.assertNoPendingMessage()
except AssertionError, e:
error_message += str(e)
raise RuntimeError(error_message)
# This give some time between messages
if count % 10 == 0:
portal_activities.timeShift(3 * VALIDATION_ERROR_DELAY)
failed_message_set = [
x
for x in message_list
if x.processing_node < -1
]
if failed_message_set:
raise ActivityFailed(failed_message_set, self._getLastError())
finally:
Message.load = staticmethod(old_Message_load)
setSecurityManager(old_sm)
if verbose:
ZopeTestCase._print(' done (%.3fs)\n' % (time.time() - start))
self.abort()
self.commit()
def afterSetUp(self):
"""Initialize a node that will only process activities"""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment