Commit 8cf35515 authored by Vincent Pelletier's avatar Vincent Pelletier

testERP5Catalog: Test indexation/unindexation parallelism.

parent 4f58cffa
......@@ -29,6 +29,8 @@
from random import randint
import sys
import threading
import traceback
import unittest
import httplib
from AccessControl import getSecurityManager
......@@ -36,6 +38,7 @@ from AccessControl.SecurityManagement import newSecurityManager
from DateTime import DateTime
from _mysql_exceptions import ProgrammingError
from OFS.ObjectManager import ObjectManager
from Products.CMFActivity import ActivityTool
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import LogInterceptor, createZODBPythonScript, todo_erp5, getExtraSqlConnectionStringList
from Products.PageTemplates.Expressions import getEngine
......@@ -43,6 +46,42 @@ from Products.ZSQLCatalog.SQLCatalog import Query, ComplexQuery, SimpleQuery
from Testing import ZopeTestCase
from zLOG import LOG
def print_all_stacks():
"""
Equivalent of threading.print_stack which prints stack traces for all threads
in current process, and not just current thread.
"""
for thread_id, frame in sys._current_frames().iteritems():
print 'Thread', thread_id
print ' ', ' '.join(traceback.format_stack(frame))
del frame
class TransactionThread(threading.Thread):
"""
Run payload(portal, **payload_kw) within a separate transaction.
"""
def __init__(self, portal, payload, payload_kw=()):
super(TransactionThread, self).__init__()
self.daemon = True
self.zodb = portal._p_jar.db()
self.root_physical_path = portal.getPhysicalPath()
self.payload = payload
self.payload_kw = payload_kw
def run(self):
# Get a new portal, in a new transactional connection bound to default
# transaction manager (which should be the threaded transaction manager).
portal = self.zodb.open().root()['Application'].unrestrictedTraverse(
self.root_physical_path,
)
# Trigger ERP5Site magic
portal.getSiteManager()
# Trigger skin magic
portal.changeSkin(None)
# Login
newSecurityManager(None, portal.acl_users.getUser('ERP5TypeTestCase'))
self.payload(portal=portal, **dict(self.payload_kw))
class IndexableDocument(ObjectManager):
# this property is required for dummy providesIMovement
......@@ -222,6 +261,71 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.checkRelativeUrlNotInSQLPathList(path_list)
self.tic()
self.checkRelativeUrlNotInSQLPathList(path_list)
# Now delete document while its indexation is running
# (both started and not committed yet).
# First, create a person but do not index it.
person = person_module.newContent(id='4', portal_type='Person')
path_list = [person.getRelativeUrl()]
self.commit()
self.checkRelativeUrlNotInSQLPathList(path_list)
rendez_vous = threading.Event()
unblock_activity = threading.Event()
# Prepare an isolated transaction to act as one activity node.
def runValidablePendingActivities(portal, node_id):
"""
Validate messages once, execute whatever is immediately executable.
"""
activity_tool = portal.portal_activities
activity_tool.distribute()
# XXX: duplicate ActivityTool.tic, without locking as we are being
# multiple activity nodes in a single process.
for activity in ActivityTool.activity_dict.itervalues():
while not activity.dequeueMessage(activity_tool, node_id, ()):
pass
activity_thread = TransactionThread(
portal=self.portal,
payload=runValidablePendingActivities,
payload_kw={'node_id': 2},
)
# Monkey-patch catalog to synchronise between main thread and the
# isolated transaction.
catalog_tool_class = self.portal.portal_catalog.__class__
orig_catalogObjectList = catalog_tool_class.catalogObjectList
def catalogObjectList(*args, **kw):
# Note: rendez-vous *before* modifying tables, otherwise unindexation's
# synchronous catalog alteration will wait on indexation transaction to
# finish, which is prevented until unindexation happened: so a deadlock,
# resolved by a timeout.
rendez_vous.set()
assert unblock_activity.wait(10), print_all_stacks()
orig_catalogObjectList(*args, **kw)
catalog_tool_class.catalogObjectList = catalogObjectList
try:
# Let pending activities (indexation) start.
activity_thread.start()
# Wait until indexation is indeed initiated.
assert rendez_vous.wait(10), print_all_stacks()
# Delete object, which will try to modify catalog content and spawn
# unindexation activity.
person_module.manage_delObjects(ids=['4'])
self.commit()
# Try to run this activity. It should not run, as it must wait on
# indexation to be over.
runValidablePendingActivities(self.portal, 1)
# Let indexation carry on, it is still able to access the object.
unblock_activity.set()
activity_thread.join(10)
assert not activity_thread.is_alive()
finally:
# Un-monkey-patch.
catalog_tool_class.catalogObjectList = orig_catalogObjectList
# Document must be indexed: unindexation must have waited for indexation
# to finish, so runValidablePendingActivities(..., 1) must have been
# a no-op.
self.checkRelativeUrlInSQLPathList(path_list)
self.tic()
# And now it's gone.
self.checkRelativeUrlNotInSQLPathList(path_list)
def test_04_SearchFolderWithDeletedObjects(self):
person_module = self.getPersonModule()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment