Commit 820d2fd1 authored by Vincent Pelletier's avatar Vincent Pelletier

EPR5Type.CopySupport: Only delete activities for deleted document which can be safely deleted.

Safe activities are:
- those which are not visible outside current transaction (IOW, queued
  in activity buffer)
- those which are already marked as failed, as no activity node will try
  to process these
Other activities may already be running.
Note: deleting even failed activities may violate activity dependencies
(other activities may be waiting on the ones being deleted, even those
not spawned yet), but if this is an issue then some other code should
take care of these as well.
Also, rework a CMFActivity test checking that activties already-spawned on
a later-deleted context get discarded instead of being executed: previous
implementation was working around the activity deletion removed in this commit.
Also, test indexation/unindexation parallelism.
parent 06aee0c2
...@@ -806,7 +806,7 @@ CREATE TABLE %s ( ...@@ -806,7 +806,7 @@ CREATE TABLE %s (
self._log(WARNING, self._log(WARNING,
'Exception during notification phase of finalizeMessageExecution') 'Exception during notification phase of finalizeMessageExecution')
def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): def flush(self, activity_tool, object_path, invoke=0, method_id=None, only_safe=False, **kw):
""" """
object_path is a tuple object_path is a tuple
""" """
...@@ -843,6 +843,8 @@ CREATE TABLE %s ( ...@@ -843,6 +843,8 @@ CREATE TABLE %s (
db = activity_tool.getSQLConnection() db = activity_tool.getSQLConnection()
for line in self._getMessageList(db, path=path, for line in self._getMessageList(db, path=path,
**({'method_id': method_id} if method_id else {})): **({'method_id': method_id} if method_id else {})):
if only_safe and line.processing_node > -2:
continue
uid_list.append(line.uid) uid_list.append(line.uid)
if invoke and line.processing_node <= 0: if invoke and line.processing_node <= 0:
invoke(Message.load(line.message, uid=line.uid, line=line)) invoke(Message.load(line.message, uid=line.uid, line=line))
......
...@@ -1125,6 +1125,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1125,6 +1125,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
def testTryUserNotificationOnActivityFailure(self, activity): def testTryUserNotificationOnActivityFailure(self, activity):
message_list = self.portal.MailHost._message_list message_list = self.portal.MailHost._message_list
del message_list[:] del message_list[:]
portal_activities = self.portal.portal_activities
countMessage = portal_activities.countMessage
obj = self.portal.organisation_module.newContent(portal_type='Organisation') obj = self.portal.organisation_module.newContent(portal_type='Organisation')
self.tic() self.tic()
def failingMethod(self): raise ValueError('This method always fails') def failingMethod(self): raise ValueError('This method always fails')
...@@ -1140,13 +1142,16 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1140,13 +1142,16 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertIn("Module %s, line %s, in failingMethod" % ( self.assertIn("Module %s, line %s, in failingMethod" % (
__name__, inspect.getsourcelines(failingMethod)[1]), mail) __name__, inspect.getsourcelines(failingMethod)[1]), mail)
self.assertIn("ValueError:", mail) self.assertIn("ValueError:", mail)
portal_activities.manageClearActivities()
# MESSAGE_NOT_EXECUTABLE # MESSAGE_NOT_EXECUTABLE
obj.getParentValue()._delObject(obj.getId()) obj_path = obj.getPath()
obj.activate(activity=activity).failingMethod() obj.activate(activity=activity).failingMethod()
self.commit() self.commit()
self.assertTrue(obj.hasActivity()) obj.getParentValue()._delObject(obj.getId())
self.commit()
self.assertGreater(countMessage(path=obj_path), 0)
self.tic() self.tic()
self.assertFalse(obj.hasActivity()) self.assertEqual(countMessage(path=obj_path), 0)
self.assertFalse(message_list) self.assertFalse(message_list)
finally: finally:
del Organisation.failingMethod del Organisation.failingMethod
......
...@@ -27,16 +27,21 @@ ...@@ -27,16 +27,21 @@
# #
############################################################################## ##############################################################################
from functools import partial
import httplib
from random import randint from random import randint
import sys import sys
import threading
import traceback
import unittest import unittest
import httplib import six
from AccessControl import getSecurityManager from AccessControl import getSecurityManager
from AccessControl.SecurityManagement import newSecurityManager from AccessControl.SecurityManagement import newSecurityManager
from Acquisition import aq_base from Acquisition import aq_base
from DateTime import DateTime from DateTime import DateTime
from _mysql_exceptions import ProgrammingError from _mysql_exceptions import ProgrammingError
from OFS.ObjectManager import ObjectManager from OFS.ObjectManager import ObjectManager
from Products.CMFActivity import ActivityTool
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import LogInterceptor, createZODBPythonScript, todo_erp5, getExtraSqlConnectionStringList from Products.ERP5Type.tests.utils import LogInterceptor, createZODBPythonScript, todo_erp5, getExtraSqlConnectionStringList
from Products.PageTemplates.Expressions import getEngine from Products.PageTemplates.Expressions import getEngine
...@@ -44,6 +49,100 @@ from Products.ZSQLCatalog.SQLCatalog import Query, ComplexQuery, SimpleQuery ...@@ -44,6 +49,100 @@ from Products.ZSQLCatalog.SQLCatalog import Query, ComplexQuery, SimpleQuery
from Testing import ZopeTestCase from Testing import ZopeTestCase
from zLOG import LOG from zLOG import LOG
def format_stack(thread=None):
frame_dict = sys._current_frames()
if thread is not None:
thread_id = thread.ident
frame_dict = {
thread_id: frame_dict[thread_id],
}
frame = None
try:
return ''.join((
'Thread %s\n %s' % (
thread_id,
' '.join(traceback.format_stack(frame)),
)
for thread_id, frame in frame_dict.iteritems()
))
finally:
del frame, frame_dict
class TransactionThread(threading.Thread):
"""
Run payload(portal_value=portal_value) within a separate transaction.
Note: because of transaction isolation, given portal_value will be a
different instance of the same persistent object.
Instances of this class may be used as a context manager to manage thread
lifespam, especially to be properly informed of any exception which happened
during thread's life. In which case, join_timeout is used upon context exit.
"""
def __init__(self, portal_value, payload, join_timeout=10):
super(TransactionThread, self).__init__()
self.daemon = True
self.zodb = portal_value._p_jar.db()
self.root_physical_path = portal_value.getPhysicalPath()
self.payload = payload
self.exception = None
self.join_timeout = join_timeout
def run(self):
try:
# Get a new portal, in a new transactional connection bound to default
# transaction manager (which should be the threaded transaction manager).
portal_value = self.zodb.open().root()['Application'].unrestrictedTraverse(
self.root_physical_path,
)
# Trigger ERP5Site magic
portal_value.getSiteManager()
# Trigger skin magic
portal_value.changeSkin(None)
# Login
newSecurityManager(None, portal_value.acl_users.getUser('ERP5TypeTestCase'))
self.payload(portal_value=portal_value)
except Exception as self.exception:
if six.PY2:
self.exception.__traceback__ = sys.exc_info()[2]
def join(self, *args, **kw):
super(TransactionThread, self).join(*args, **kw)
if not self.is_alive():
exception = self.exception
# Break reference cycle:
# run frame -> self -> exception -> __traceback__ -> run frame
# Not re-raising on subsequent calls is kind of a bug, but it's really up
# to caller to either not ignore exceptions or keep them around.
self.exception = None
if exception is not None:
if six.PY3:
raise exception
six.reraise(exception, None, exception.__traceback__)
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
try:
self.join(self.join_timeout)
# Note: if context was interrupted by an exception, being unable to join
# the thread may be unavoidable (ex: context could not signal blocked
# thread), in which case this assertion will be mostly useless noise.
# But conditionally ignoring this seems worse.
assert not self.is_alive(), format_stack(self)
except Exception as join_exc_val:
if exc_val is None:
# No exception from context: just propagate exception
raise
# Both an exception from context and an exception in thread
if six.PY3:
# PY3: "raise join_exc_val from exc_val"
six.raise_from(join_exc_val, exc_val)
# PY2, handle our exception ourselves and let interpreter reraise
# context's.
traceback.print_exc()
class IndexableDocument(ObjectManager): class IndexableDocument(ObjectManager):
# This tests uses a simple ObjectManager, but ERP5Catalog only # This tests uses a simple ObjectManager, but ERP5Catalog only
# support classes inherting from ERP5Type.Base. # support classes inherting from ERP5Type.Base.
...@@ -230,6 +329,66 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -230,6 +329,66 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.checkRelativeUrlNotInSQLPathList(path_list) self.checkRelativeUrlNotInSQLPathList(path_list)
self.tic() self.tic()
self.checkRelativeUrlNotInSQLPathList(path_list) self.checkRelativeUrlNotInSQLPathList(path_list)
# Now delete document while its indexation is running
# (both started and not committed yet).
# First, create a person but do not index it.
person = person_module.newContent(id='4', portal_type='Person')
path_list = [person.getRelativeUrl()]
self.commit()
self.checkRelativeUrlNotInSQLPathList(path_list)
rendez_vous = threading.Event()
unblock_activity = threading.Event()
# Prepare an isolated transaction to act as one activity node.
def runValidablePendingActivities(portal_value, node_id):
"""
Validate messages once, execute whatever is immediately executable.
"""
activity_tool = portal_value.portal_activities
activity_tool.distribute()
# XXX: duplicate ActivityTool.tic, without locking as we are being
# multiple activity nodes in a single process.
for activity in ActivityTool.activity_dict.itervalues():
while not activity.dequeueMessage(activity_tool, node_id, ()):
pass
# Monkey-patch catalog to synchronise between main thread and the
# isolated transaction.
catalog_tool_class = self.portal.portal_catalog.__class__
orig_catalogObjectList = catalog_tool_class.catalogObjectList
def catalogObjectList(*args, **kw):
# Note: rendez-vous/unblock_activity *before* modifying tables, otherwise
# unindexation's synchronous catalog update will deadlock:
# unindexation UPDATE -> indexation UPDATE -> unblock_activity -> unindexation commit
rendez_vous.set()
assert unblock_activity.wait(10), format_stack()
orig_catalogObjectList(*args, **kw)
catalog_tool_class.catalogObjectList = catalogObjectList
try:
# Let pending activities (indexation) start.
with TransactionThread(
portal_value=self.portal,
payload=partial(runValidablePendingActivities, node_id=2),
):
# Wait until indexation is indeed initiated.
assert rendez_vous.wait(10), format_stack()
# Delete object, which will try to modify catalog content and spawn
# unindexation activity.
person_module.manage_delObjects(ids=['4'])
self.commit()
# Try to run this activity. It should not run, as it must wait on
# indexation to be over.
runValidablePendingActivities(self.portal, 1)
# Let indexation carry on, it is still able to access the object.
unblock_activity.set()
finally:
# Un-monkey-patch.
catalog_tool_class.catalogObjectList = orig_catalogObjectList
# Document must be indexed: unindexation must have waited for indexation
# to finish, so runValidablePendingActivities(..., 1) must have been
# a no-op.
self.checkRelativeUrlInSQLPathList(path_list)
self.tic()
# And now it's gone.
self.checkRelativeUrlNotInSQLPathList(path_list)
def test_04_SearchFolderWithDeletedObjects(self): def test_04_SearchFolderWithDeletedObjects(self):
person_module = self.getPersonModule() person_module = self.getPersonModule()
......
...@@ -365,8 +365,8 @@ class CopyContainer: ...@@ -365,8 +365,8 @@ class CopyContainer:
except AttributeError: except AttributeError:
pass pass
else: else:
# Make sure there is not activity for this object # Cleanup any failed and spawned-during-current-transaction activities for this document.
self.flushActivity(invoke=0) self.flushActivity(invoke=0, only_safe=True)
uid = getattr(self,'uid',None) uid = getattr(self,'uid',None)
if uid is None: if uid is None:
return return
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment