diff --git a/product/ERP5/tests/testInvalidationBug.py b/product/ERP5/tests/testInvalidationBug.py index 843fbf16e626dd25f44a6320ced5b9018a4d67e2..f0fc470cff2a514b2d9e9066fa2065fa95a7c87d 100644 --- a/product/ERP5/tests/testInvalidationBug.py +++ b/product/ERP5/tests/testInvalidationBug.py @@ -28,15 +28,16 @@ # ############################################################################## -import unittest import os - +import threading +import time +import unittest +import urllib import transaction - +from DateTime import DateTime from Testing import ZopeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.utils import createZODBPythonScript -from DateTime import DateTime class TestInvalidationBug(ERP5TypeTestCase): @@ -91,6 +92,81 @@ class TestInvalidationBug(ERP5TypeTestCase): self.assertEqual(result_list[-2], [1,0]) # catalog self.assertEqual(result_list[-1], [1,1]) # activity tables last + def testLateInvalidationFromZEO(self): + ### Check unit test is run properly + from ZEO.ClientStorage import ClientStorage + storage = self.portal._p_jar._storage + activity_tool = self.portal.portal_activities + node_list = list(activity_tool.getProcessingNodeList()) + node_list.remove(activity_tool.getCurrentNode()) + assert node_list and isinstance(storage, ClientStorage), \ + "this unit test must be run with at least 2 ZEO clients" + + ### Prepare unit test, to minimize amount of work during critical section + ## url to create some content using another zope + new_content_url = "http://ERP5TypeTestCase:@%s%s/newContent" % ( + node_list[0], self.portal.organisation_module.getPath()) + ## prepare freeze/unfreeze of ZEO storage + from asyncore import socket_map + zeo_connection = storage._connection + freeze_lock = threading.Lock() + freeze_lock.acquire() + def unfreezeStorage(): + socket_map[zeo_connection.fileno()] = zeo_connection + # wake up asyncore loop to take the new socket into account + zeo_connection._pull_trigger() + # link to ZEO will be unfrozen 1 second after we read 'message' table + unfreeze_timer = threading.Timer(1, unfreezeStorage) + unfreeze_timer.setDaemon(True) + ## prepare monkey-patches (with code to revert them) + from Products.CMFActivity.Activity.SQLDict import SQLDict + zeo_server = storage._server + def unpatch(): + storage._server = zeo_server + SQLDict.getProcessableMessageList = SQLDict_getProcessableMessageList + SQLDict_getProcessableMessageList = SQLDict.getProcessableMessageList + def getProcessableMessageList(*args, **kw): + result = SQLDict_getProcessableMessageList(*args, **kw) + unpatch() + unfreeze_timer.start() + return result + + ### Perform unit test + ## we should start without any pending activity + self.assertNoPendingMessage() + ## monkey-patch ... + SQLDict.getProcessableMessageList = getProcessableMessageList + try: + # prevent nodes from processing activities automatically + activity_tool.manage_removeFromProcessingList(node_list) + transaction.commit() + del socket_map[zeo_connection.fileno()] + try: + # wake up asyncore loop and wait we really woke up + zeo_connection.trigger.pull_trigger(freeze_lock.release) + freeze_lock.acquire() + # make sure ZODB is not accessed until we get a message to process + storage._server = None + # ... monkey-patch done + ## create object + urllib.urlopen(new_content_url).read() + ## validate reindex activity + activity_tool.distribute() + self.assertEqual(1, len(activity_tool.getMessageList())) + ## reindex created object + activity_tool.tic() + finally: + try: + unfreeze_timer.join() + except RuntimeError: + unfreezeStorage() + finally: + unpatch() + activity_tool.manage_addToProcessingList(node_list) + transaction.commit() + ## When the bug is not fixed, we get a -3 failed activity + self.assertNoPendingMessage() + def _testReindex(self): print("To reproduce bugs easily, distribution step should be skipped for" " SQLDict, by writing messages with processing_node already at 0." diff --git a/product/ERP5Type/tests/ProcessingNodeTestCase.py b/product/ERP5Type/tests/ProcessingNodeTestCase.py index 0c71fcd2415acd1a420bc7de162202c530249286..8526f05558a11b6d7f21a7a51fc9ea8bd44db574 100644 --- a/product/ERP5Type/tests/ProcessingNodeTestCase.py +++ b/product/ERP5Type/tests/ProcessingNodeTestCase.py @@ -113,6 +113,20 @@ class ProcessingNodeTestCase(backportUnittest.TestCase, ZopeTestCase.TestCase): else: activity_tool.manage_removeFromProcessingList((currentNode,)) + def assertNoPendingMessage(self): + """Get the last error message from error_log""" + message_list = self.portal.portal_activities.getMessageList() + if message_list: + error_message = 'These messages are pending: %r' % [ + ('/'.join(m.object_path), m.method_id, m.processing_node, m.retry) + for m in message_list] + error_log = self.portal.error_log._getLog() + if len(error_log): + error_message += '\nLast error message:' \ + '\n%(type)s\n%(value)s\n%(tb_text)s' \ + % error_log[-1] + raise self.fail(error_message) + def tic(self, verbose=0): """Execute pending activities""" portal_activities = self.getPortal().portal_activities @@ -136,22 +150,12 @@ class ProcessingNodeTestCase(backportUnittest.TestCase, ZopeTestCase.TestCase): # This prevents an infinite loop. count -= 1 if count == 0: - # Get the last error message from error_log. - error_message = '' - error_log = self.portal.error_log._getLog() - if len(error_log): - last_log = error_log[-1] - error_message = '\nLast error message:\n%s\n%s\n%s\n' % ( - last_log['type'], - last_log['value'], - last_log['tb_text'], - ) - raise RuntimeError,\ - 'tic is looping forever. These messages are pending: %r %s' % ( - [('/'.join(m.object_path), m.method_id, m.processing_node, m.retry) - for m in portal_activities.getMessageList()], - error_message - ) + error_message = 'tic is looping forever. ' + try: + self.assertNoPendingMessage() + except AssertionError, e: + error_message += str(e) + raise RuntimeError(error_message) # This give some time between messages if count % 10 == 0: portal_activities.timeShift(3 * VALIDATION_ERROR_DELAY)