Commit 75805bb4 authored by Vincent Pelletier's avatar Vincent Pelletier

testERP5Catalog: Test indexation/unindexation parallelism.

parent 48ac021a
......@@ -29,12 +29,15 @@
from random import randint
import sys
import threading
import traceback
import unittest
import httplib
from AccessControl import getSecurityManager
from AccessControl.SecurityManagement import newSecurityManager
from DateTime import DateTime
from OFS.ObjectManager import ObjectManager
from Products.CMFActivity import ActivityTool
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import LogInterceptor, createZODBPythonScript, todo_erp5, getExtraSqlConnectionStringList
from Products.PageTemplates.Expressions import getEngine
......@@ -42,6 +45,42 @@ from Products.ZSQLCatalog.SQLCatalog import Query, ComplexQuery, SimpleQuery
from Testing import ZopeTestCase
from zLOG import LOG
def print_all_stacks():
"""
Equivalent of threading.print_stack which prints stack traces for all threads
in current process, and not just current thread.
"""
for thread_id, frame in sys._current_frames().iteritems():
print 'Thread', thread_id
print ' ', ' '.join(traceback.format_stack(frame))
del frame
class TransactionThread(threading.Thread):
"""
Run payload(portal, **payload_kw) within a separate transaction.
"""
def __init__(self, portal, payload, payload_kw=()):
super(TransactionThread, self).__init__()
self.daemon = True
self.zodb = portal._p_jar.db()
self.root_physical_path = portal.getPhysicalPath()
self.payload = payload
self.payload_kw = payload_kw
def run(self):
# Get a new portal, in a new transactional connection bound to default
# transaction manager (which should be the threaded transaction manager).
portal = self.zodb.open().root()['Application'].unrestrictedTraverse(
self.root_physical_path,
)
# Trigger ERP5Site magic
portal.getSiteManager()
# Trigger skin magic
portal.changeSkin(None)
# Login
newSecurityManager(None, portal.acl_users.getUser('ERP5TypeTestCase'))
self.payload(portal=portal, **dict(self.payload_kw))
class IndexableDocument(ObjectManager):
# this property is required for dummy providesIMovement
......@@ -221,6 +260,71 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.checkRelativeUrlNotInSQLPathList(path_list)
self.tic()
self.checkRelativeUrlNotInSQLPathList(path_list)
# Now delete document while its indexation is running
# (both started and not committed yet).
# First, create a person but do not index it.
person = person_module.newContent(id='4', portal_type='Person')
path_list = [person.getRelativeUrl()]
self.commit()
self.checkRelativeUrlNotInSQLPathList(path_list)
rendez_vous = threading.Event()
unblock_activity = threading.Event()
# Prepare an isolated transaction to act as one activity node.
def runValidablePendingActivities(portal, node_id):
"""
Validate messages once, execute whatever is immediately executable.
"""
activity_tool = portal.portal_activities
activity_tool.distribute()
# XXX: duplicate ActivityTool.tic, without locking as we are being
# multiple activity nodes in a single process.
for activity in ActivityTool.activity_dict.itervalues():
while not activity.dequeueMessage(activity_tool, node_id, ()):
pass
activity_thread = TransactionThread(
portal=self.portal,
payload=runValidablePendingActivities,
payload_kw={'node_id': 2},
)
# Monkey-patch catalog to synchronise between main thread and the
# isolated transaction.
catalog_tool_class = self.portal.portal_catalog.__class__
orig_catalogObjectList = catalog_tool_class.catalogObjectList
def catalogObjectList(*args, **kw):
# Note: rendez-vous *before* modifying tables, otherwise unindexation's
# synchronous catalog alteration will wait on indexation transaction to
# finish, which is prevented until unindexation happened: so a deadlock,
# resolved by a timeout.
rendez_vous.set()
assert unblock_activity.wait(10), print_all_stacks()
orig_catalogObjectList(*args, **kw)
catalog_tool_class.catalogObjectList = catalogObjectList
try:
# Let pending activities (indexation) start.
activity_thread.start()
# Wait until indexation is indeed initiated.
assert rendez_vous.wait(10), print_all_stacks()
# Delete object, which will try to modify catalog content and spawn
# unindexation activity.
person_module.manage_delObjects(ids=['4'])
self.commit()
# Try to run this activity. It should not run, as it must wait on
# indexation to be over.
runValidablePendingActivities(self.portal, 1)
# Let indexation carry on, it is still able to access the object.
unblock_activity.set()
activity_thread.join(10)
assert not activity_thread.is_alive()
finally:
# Un-monkey-patch.
catalog_tool_class.catalogObjectList = orig_catalogObjectList
# Document must be indexed: unindexation must have waited for indexation
# to finish, so runValidablePendingActivities(..., 1) must have been
# a no-op.
self.checkRelativeUrlInSQLPathList(path_list)
self.tic()
# And now it's gone.
self.checkRelativeUrlNotInSQLPathList(path_list)
def test_04_SearchFolderWithDeletedObjects(self):
person_module = self.getPersonModule()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment