testERP5Catalog: Test indexation/unindexation parallelism.
... | ... | @@ -29,12 +29,15 @@ |
from random import randint | ||
import sys | ||
import threading | ||
import traceback | ||
import unittest | ||
import httplib | ||
from AccessControl import getSecurityManager | ||
from AccessControl.SecurityManagement import newSecurityManager | ||
from DateTime import DateTime | ||
from OFS.ObjectManager import ObjectManager | ||
from Products.CMFActivity import ActivityTool | ||
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase | ||
from Products.ERP5Type.tests.utils import LogInterceptor, createZODBPythonScript, todo_erp5, getExtraSqlConnectionStringList | ||
from Products.PageTemplates.Expressions import getEngine | ||
... | ... | @@ -42,6 +45,42 @@ from Products.ZSQLCatalog.SQLCatalog import Query, ComplexQuery, SimpleQuery |
from Testing import ZopeTestCase | ||
from zLOG import LOG | ||
def print_all_stacks(): | ||
""" | ||
Equivalent of threading.print_stack which prints stack traces for all threads | ||
in current process, and not just current thread. | ||
""" | ||
for thread_id, frame in sys._current_frames().iteritems(): | ||
print 'Thread', thread_id | ||
print ' ', ' '.join(traceback.format_stack(frame)) | ||
del frame | ||
class TransactionThread(threading.Thread): | ||
""" | ||
Run payload(portal, **payload_kw) within a separate transaction. | ||
""" | ||
def __init__(self, portal, payload, payload_kw=()): | ||
super(TransactionThread, self).__init__() | ||
self.daemon = True | ||
self.zodb = portal._p_jar.db() | ||
self.root_physical_path = portal.getPhysicalPath() | ||
self.payload = payload | ||
self.payload_kw = payload_kw | ||
def run(self): | ||
# Get a new portal, in a new transactional connection bound to default | ||
# transaction manager (which should be the threaded transaction manager). | ||
portal = self.zodb.open().root()['Application'].unrestrictedTraverse( | ||
self.root_physical_path, | ||
) | ||
# Trigger ERP5Site magic | ||
portal.getSiteManager() | ||
# Trigger skin magic | ||
portal.changeSkin(None) | ||
# Login | ||
newSecurityManager(None, portal.acl_users.getUser('ERP5TypeTestCase')) | ||
self.payload(portal=portal, **dict(self.payload_kw)) | ||
class IndexableDocument(ObjectManager): | ||
# this property is required for dummy providesIMovement | ||
... | ... | @@ -221,6 +260,71 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): |
self.checkRelativeUrlNotInSQLPathList(path_list) | ||
self.tic() | ||
self.checkRelativeUrlNotInSQLPathList(path_list) | ||
# Now delete document while its indexation is running | ||
# (both started and not committed yet). | ||
# First, create a person but do not index it. | ||
person = person_module.newContent(id='4', portal_type='Person') | ||
path_list = [person.getRelativeUrl()] | ||
self.commit() | ||
self.checkRelativeUrlNotInSQLPathList(path_list) | ||
rendez_vous = threading.Event() | ||
unblock_activity = threading.Event() | ||
# Prepare an isolated transaction to act as one activity node. | ||
def runValidablePendingActivities(portal, node_id): | ||
""" | ||
Validate messages once, execute whatever is immediately executable. | ||
""" | ||
activity_tool = portal.portal_activities | ||
activity_tool.distribute() | ||
# XXX: duplicate ActivityTool.tic, without locking as we are being | ||
# multiple activity nodes in a single process. | ||
for activity in ActivityTool.activity_dict.itervalues(): | ||
while not activity.dequeueMessage(activity_tool, node_id, ()): | ||
pass | ||
activity_thread = TransactionThread( | ||
portal=self.portal, | ||
payload=runValidablePendingActivities, | ||
payload_kw={'node_id': 2}, | ||
) | ||
# Monkey-patch catalog to synchronise between main thread and the | ||
# isolated transaction. | ||
catalog_tool_class = self.portal.portal_catalog.__class__ | ||
orig_catalogObjectList = catalog_tool_class.catalogObjectList | ||
def catalogObjectList(*args, **kw): | ||
# Note: rendez-vous *before* modifying tables, otherwise unindexation's | ||
# synchronous catalog alteration will wait on indexation transaction to | ||
# finish, which is prevented until unindexation happened: so a deadlock, | ||
# resolved by a timeout. | ||
rendez_vous.set() | ||
assert unblock_activity.wait(10), print_all_stacks() | ||
|
||
orig_catalogObjectList(*args, **kw) | ||
catalog_tool_class.catalogObjectList = catalogObjectList | ||
try: | ||
# Let pending activities (indexation) start. | ||
activity_thread.start() | ||
# Wait until indexation is indeed initiated. | ||
assert rendez_vous.wait(10), print_all_stacks() | ||
# Delete object, which will try to modify catalog content and spawn | ||
# unindexation activity. | ||
person_module.manage_delObjects(ids=['4']) | ||
self.commit() | ||
# Try to run this activity. It should not run, as it must wait on | ||
# indexation to be over. | ||
runValidablePendingActivities(self.portal, 1) | ||
# Let indexation carry on, it is still able to access the object. | ||
unblock_activity.set() | ||
activity_thread.join(10) | ||
assert not activity_thread.is_alive() | ||
finally: | ||
# Un-monkey-patch. | ||
catalog_tool_class.catalogObjectList = orig_catalogObjectList | ||
# Document must be indexed: unindexation must have waited for indexation | ||
# to finish, so runValidablePendingActivities(..., 1) must have been | ||
# a no-op. | ||
self.checkRelativeUrlInSQLPathList(path_list) | ||
self.tic() | ||
# And now it's gone. | ||
self.checkRelativeUrlNotInSQLPathList(path_list) | ||
def test_04_SearchFolderWithDeletedObjects(self): | ||
person_module = self.getPersonModule() | ||
... | ... |