Commit 8b27e95d authored by Julien Muchembled's avatar Julien Muchembled

Add unit test to check that invalidations sent by ZEO are processed in time

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@35687 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent fd5601b6
......@@ -28,15 +28,16 @@
#
##############################################################################
import unittest
import os
import threading
import time
import unittest
import urllib
import transaction
from DateTime import DateTime
from Testing import ZopeTestCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import createZODBPythonScript
from DateTime import DateTime
class TestInvalidationBug(ERP5TypeTestCase):
......@@ -91,6 +92,81 @@ class TestInvalidationBug(ERP5TypeTestCase):
self.assertEqual(result_list[-2], [1,0]) # catalog
self.assertEqual(result_list[-1], [1,1]) # activity tables last
def testLateInvalidationFromZEO(self):
### Check unit test is run properly
from ZEO.ClientStorage import ClientStorage
storage = self.portal._p_jar._storage
activity_tool = self.portal.portal_activities
node_list = list(activity_tool.getProcessingNodeList())
node_list.remove(activity_tool.getCurrentNode())
assert node_list and isinstance(storage, ClientStorage), \
"this unit test must be run with at least 2 ZEO clients"
### Prepare unit test, to minimize amount of work during critical section
## url to create some content using another zope
new_content_url = "http://ERP5TypeTestCase:@%s%s/newContent" % (
node_list[0], self.portal.organisation_module.getPath())
## prepare freeze/unfreeze of ZEO storage
from asyncore import socket_map
zeo_connection = storage._connection
freeze_lock = threading.Lock()
freeze_lock.acquire()
def unfreezeStorage():
socket_map[zeo_connection.fileno()] = zeo_connection
# wake up asyncore loop to take the new socket into account
zeo_connection._pull_trigger()
# link to ZEO will be unfrozen 1 second after we read 'message' table
unfreeze_timer = threading.Timer(1, unfreezeStorage)
unfreeze_timer.setDaemon(True)
## prepare monkey-patches (with code to revert them)
from Products.CMFActivity.Activity.SQLDict import SQLDict
zeo_server = storage._server
def unpatch():
storage._server = zeo_server
SQLDict.getProcessableMessageList = SQLDict_getProcessableMessageList
SQLDict_getProcessableMessageList = SQLDict.getProcessableMessageList
def getProcessableMessageList(*args, **kw):
result = SQLDict_getProcessableMessageList(*args, **kw)
unpatch()
unfreeze_timer.start()
return result
### Perform unit test
## we should start without any pending activity
self.assertNoPendingMessage()
## monkey-patch ...
SQLDict.getProcessableMessageList = getProcessableMessageList
try:
# prevent nodes from processing activities automatically
activity_tool.manage_removeFromProcessingList(node_list)
transaction.commit()
del socket_map[zeo_connection.fileno()]
try:
# wake up asyncore loop and wait we really woke up
zeo_connection.trigger.pull_trigger(freeze_lock.release)
freeze_lock.acquire()
# make sure ZODB is not accessed until we get a message to process
storage._server = None
# ... monkey-patch done
## create object
urllib.urlopen(new_content_url).read()
## validate reindex activity
activity_tool.distribute()
self.assertEqual(1, len(activity_tool.getMessageList()))
## reindex created object
activity_tool.tic()
finally:
try:
unfreeze_timer.join()
except RuntimeError:
unfreezeStorage()
finally:
unpatch()
activity_tool.manage_addToProcessingList(node_list)
transaction.commit()
## When the bug is not fixed, we get a -3 failed activity
self.assertNoPendingMessage()
def _testReindex(self):
print("To reproduce bugs easily, distribution step should be skipped for"
" SQLDict, by writing messages with processing_node already at 0."
......
......@@ -113,6 +113,20 @@ class ProcessingNodeTestCase(backportUnittest.TestCase, ZopeTestCase.TestCase):
else:
activity_tool.manage_removeFromProcessingList((currentNode,))
def assertNoPendingMessage(self):
"""Get the last error message from error_log"""
message_list = self.portal.portal_activities.getMessageList()
if message_list:
error_message = 'These messages are pending: %r' % [
('/'.join(m.object_path), m.method_id, m.processing_node, m.retry)
for m in message_list]
error_log = self.portal.error_log._getLog()
if len(error_log):
error_message += '\nLast error message:' \
'\n%(type)s\n%(value)s\n%(tb_text)s' \
% error_log[-1]
raise self.fail(error_message)
def tic(self, verbose=0):
"""Execute pending activities"""
portal_activities = self.getPortal().portal_activities
......@@ -136,22 +150,12 @@ class ProcessingNodeTestCase(backportUnittest.TestCase, ZopeTestCase.TestCase):
# This prevents an infinite loop.
count -= 1
if count == 0:
# Get the last error message from error_log.
error_message = ''
error_log = self.portal.error_log._getLog()
if len(error_log):
last_log = error_log[-1]
error_message = '\nLast error message:\n%s\n%s\n%s\n' % (
last_log['type'],
last_log['value'],
last_log['tb_text'],
)
raise RuntimeError,\
'tic is looping forever. These messages are pending: %r %s' % (
[('/'.join(m.object_path), m.method_id, m.processing_node, m.retry)
for m in portal_activities.getMessageList()],
error_message
)
error_message = 'tic is looping forever. '
try:
self.assertNoPendingMessage()
except AssertionError, e:
error_message += str(e)
raise RuntimeError(error_message)
# This give some time between messages
if count % 10 == 0:
portal_activities.timeShift(3 * VALIDATION_ERROR_DELAY)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment