diff --git a/product/CMFActivity/Activity/SQLBase.py b/product/CMFActivity/Activity/SQLBase.py index b6622d741f8158d72efe7c972d4d938899e0a53d..9a08b8b24a5be82398c9afa80dae9b69ee5c89bf 100644 --- a/product/CMFActivity/Activity/SQLBase.py +++ b/product/CMFActivity/Activity/SQLBase.py @@ -806,7 +806,7 @@ CREATE TABLE %s ( self._log(WARNING, 'Exception during notification phase of finalizeMessageExecution') - def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): + def flush(self, activity_tool, object_path, invoke=0, method_id=None, only_safe=False, **kw): """ object_path is a tuple """ @@ -843,6 +843,8 @@ CREATE TABLE %s ( db = activity_tool.getSQLConnection() for line in self._getMessageList(db, path=path, **({'method_id': method_id} if method_id else {})): + if only_safe and line.processing_node > -2: + continue uid_list.append(line.uid) if invoke and line.processing_node <= 0: invoke(Message.load(line.message, uid=line.uid, line=line)) diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py index 297b3a55f348c8469798312d00f5f14c1fa85fcf..1affcf99ad3dc1bd7b7447cf7cbba00ddffe0470 100644 --- a/product/CMFActivity/ActivityTool.py +++ b/product/CMFActivity/ActivityTool.py @@ -194,13 +194,13 @@ class Message(BaseMessage): exc_type = None is_executed = MESSAGE_NOT_EXECUTED traceback = None - oid = None + document_uid = None is_registered = False def __init__( self, url, - oid, + document_uid, active_process, active_process_uid, activity_kw, @@ -210,7 +210,7 @@ class Message(BaseMessage): portal_activities=None, ): self.object_path = url - self.oid = oid + self.document_uid = document_uid self.active_process = active_process self.active_process_uid = active_process_uid self.activity_kw = activity_kw @@ -266,10 +266,8 @@ class Message(BaseMessage): % (self.object_path,), error=True) self.setExecutionState(MESSAGE_NOT_EXECUTABLE) else: - if (self.oid and self.oid != getattr(aq_base(obj), '_p_oid', None) and - # XXX: BusinessTemplate must be fixed to preserve OID - 'portal_workflow' not in self.object_path): - raise ValueError("OID mismatch for %r" % obj) + if self.document_uid and self.document_uid != getattr(aq_base(obj), 'uid', None): + raise ValueError("UID mismatch for %r" % obj) return obj def getObjectList(self, activity_tool): @@ -506,7 +504,7 @@ class Method(object): __slots__ = ( '_portal_activities', '_passive_url', - '_passive_oid', + '_passive_uid', '_activity', '_active_process', '_active_process_uid', @@ -515,11 +513,11 @@ class Method(object): '_request', ) - def __init__(self, portal_activities, passive_url, passive_oid, activity, + def __init__(self, portal_activities, passive_url, passive_uid, activity, active_process, active_process_uid, kw, method_id, request): self._portal_activities = portal_activities self._passive_url = passive_url - self._passive_oid = passive_oid + self._passive_uid = passive_uid self._activity = activity self._active_process = active_process self._active_process_uid = active_process_uid @@ -531,7 +529,7 @@ class Method(object): portal_activities = self._portal_activities m = Message( url=self._passive_url, - oid=self._passive_oid, + document_uid=self._passive_uid, active_process=self._active_process, active_process_uid=self._active_process_uid, activity_kw=self._kw, @@ -552,7 +550,7 @@ class ActiveWrapper(object): __slots__ = ( '__portal_activities', '__passive_url', - '__passive_oid', + '__passive_uid', '__activity', '__active_process', '__active_process_uid', @@ -562,12 +560,12 @@ class ActiveWrapper(object): # Shortcut security lookup (avoid calling __getattr__) __parent__ = None - def __init__(self, portal_activities, url, oid, activity, active_process, + def __init__(self, portal_activities, url, uid, activity, active_process, active_process_uid, kw, request): # second parameter can be an object or an object's path self.__portal_activities = portal_activities self.__passive_url = url - self.__passive_oid = oid + self.__passive_uid = uid self.__activity = activity self.__active_process = active_process self.__active_process_uid = active_process_uid @@ -578,7 +576,7 @@ class ActiveWrapper(object): return Method( self.__portal_activities, self.__passive_url, - self.__passive_oid, + self.__passive_uid, self.__activity, self.__active_process, self.__active_process_uid, @@ -1396,7 +1394,7 @@ class ActivityTool (BaseTool): def activateObject(self, object, activity=DEFAULT_ACTIVITY, active_process=None, serialization_tag=None, - node=None, **kw): + node=None, uid=None, **kw): if active_process is None: active_process_uid = None elif isinstance(active_process, str): @@ -1406,15 +1404,11 @@ class ActivityTool (BaseTool): active_process_uid = active_process.getUid() active_process = active_process.getPhysicalPath() if isinstance(object, str): - oid = None url = tuple(object.split('/')) else: - try: - oid = aq_base(object)._p_oid - # Note that it's too early to get the OID of a newly created object, - # so at this point, self.oid may still be None. - except AttributeError: - pass + if uid is not None: + raise ValueError + uid = getattr(aq_base(object), 'uid', None) url = object.getPhysicalPath() if serialization_tag is not None: kw['serialization_tag'] = serialization_tag @@ -1438,7 +1432,7 @@ class ActivityTool (BaseTool): except ValueError: pass break - return ActiveWrapper(self, url, oid, activity, + return ActiveWrapper(self, url, uid, activity, active_process, active_process_uid, kw, getattr(self, 'REQUEST', None)) diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py index 80d063608e0bd47be26abd8fe0fa36985874bf63..6a919c8f6df4980f1c6fbdead4b628c2941de283 100644 --- a/product/CMFActivity/tests/testCMFActivity.py +++ b/product/CMFActivity/tests/testCMFActivity.py @@ -1125,6 +1125,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): def testTryUserNotificationOnActivityFailure(self, activity): message_list = self.portal.MailHost._message_list del message_list[:] + portal_activities = self.portal.portal_activities + countMessage = portal_activities.countMessage obj = self.portal.organisation_module.newContent(portal_type='Organisation') self.tic() def failingMethod(self): raise ValueError('This method always fails') @@ -1140,13 +1142,16 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): self.assertIn("Module %s, line %s, in failingMethod" % ( __name__, inspect.getsourcelines(failingMethod)[1]), mail) self.assertIn("ValueError:", mail) + portal_activities.manageClearActivities() # MESSAGE_NOT_EXECUTABLE - obj.getParentValue()._delObject(obj.getId()) + obj_path = obj.getPath() obj.activate(activity=activity).failingMethod() self.commit() - self.assertTrue(obj.hasActivity()) + obj.getParentValue()._delObject(obj.getId()) + self.commit() + self.assertGreater(countMessage(path=obj_path), 0) self.tic() - self.assertFalse(obj.hasActivity()) + self.assertEqual(countMessage(path=obj_path), 0) self.assertFalse(message_list) finally: del Organisation.failingMethod @@ -2342,7 +2347,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): self.assertEqual(1, activity_tool.countMessage()) self.flushAllActivities() sender, recipients, mail = message_list.pop() - self.assertIn('OID mismatch', mail) + self.assertIn('UID mismatch', mail) m, = activity_tool.getMessageList() self.assertEqual(m.processing_node, INVOKE_ERROR_STATE) obj.flushActivity() diff --git a/product/ERP5Catalog/tests/testERP5Catalog.py b/product/ERP5Catalog/tests/testERP5Catalog.py index c5b1232e22558346ae88d2c24daefd33690d6e50..b4fea7a438fb61930aa684255bc4a505755886fe 100644 --- a/product/ERP5Catalog/tests/testERP5Catalog.py +++ b/product/ERP5Catalog/tests/testERP5Catalog.py @@ -27,16 +27,21 @@ # ############################################################################## +from functools import partial +import httplib from random import randint import sys +import threading +import traceback import unittest -import httplib +import six from AccessControl import getSecurityManager from AccessControl.SecurityManagement import newSecurityManager from Acquisition import aq_base from DateTime import DateTime from _mysql_exceptions import ProgrammingError from OFS.ObjectManager import ObjectManager +from Products.CMFActivity import ActivityTool from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.utils import LogInterceptor, createZODBPythonScript, todo_erp5, getExtraSqlConnectionStringList from Products.PageTemplates.Expressions import getEngine @@ -44,6 +49,100 @@ from Products.ZSQLCatalog.SQLCatalog import Query, ComplexQuery, SimpleQuery from Testing import ZopeTestCase from zLOG import LOG +def format_stack(thread=None): + frame_dict = sys._current_frames() + if thread is not None: + thread_id = thread.ident + frame_dict = { + thread_id: frame_dict[thread_id], + } + frame = None + try: + return ''.join(( + 'Thread %s\n %s' % ( + thread_id, + ' '.join(traceback.format_stack(frame)), + ) + for thread_id, frame in frame_dict.iteritems() + )) + finally: + del frame, frame_dict + +class TransactionThread(threading.Thread): + """ + Run payload(portal_value=portal_value) within a separate transaction. + Note: because of transaction isolation, given portal_value will be a + different instance of the same persistent object. + + Instances of this class may be used as a context manager to manage thread + lifespam, especially to be properly informed of any exception which happened + during thread's life. In which case, join_timeout is used upon context exit. + """ + def __init__(self, portal_value, payload, join_timeout=10): + super(TransactionThread, self).__init__() + self.daemon = True + self.zodb = portal_value._p_jar.db() + self.root_physical_path = portal_value.getPhysicalPath() + self.payload = payload + self.exception = None + self.join_timeout = join_timeout + + def run(self): + try: + # Get a new portal, in a new transactional connection bound to default + # transaction manager (which should be the threaded transaction manager). + portal_value = self.zodb.open().root()['Application'].unrestrictedTraverse( + self.root_physical_path, + ) + # Trigger ERP5Site magic + portal_value.getSiteManager() + # Trigger skin magic + portal_value.changeSkin(None) + # Login + newSecurityManager(None, portal_value.acl_users.getUser('ERP5TypeTestCase')) + self.payload(portal_value=portal_value) + except Exception as self.exception: + if six.PY2: + self.exception.__traceback__ = sys.exc_info()[2] + + def join(self, *args, **kw): + super(TransactionThread, self).join(*args, **kw) + if not self.is_alive(): + exception = self.exception + # Break reference cycle: + # run frame -> self -> exception -> __traceback__ -> run frame + # Not re-raising on subsequent calls is kind of a bug, but it's really up + # to caller to either not ignore exceptions or keep them around. + self.exception = None + if exception is not None: + if six.PY3: + raise exception + six.reraise(exception, None, exception.__traceback__) + + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + try: + self.join(self.join_timeout) + # Note: if context was interrupted by an exception, being unable to join + # the thread may be unavoidable (ex: context could not signal blocked + # thread), in which case this assertion will be mostly useless noise. + # But conditionally ignoring this seems worse. + assert not self.is_alive(), format_stack(self) + except Exception as join_exc_val: + if exc_val is None: + # No exception from context: just propagate exception + raise + # Both an exception from context and an exception in thread + if six.PY3: + # PY3: "raise join_exc_val from exc_val" + six.raise_from(join_exc_val, exc_val) + # PY2, handle our exception ourselves and let interpreter reraise + # context's. + traceback.print_exc() + class IndexableDocument(ObjectManager): # This tests uses a simple ObjectManager, but ERP5Catalog only # support classes inherting from ERP5Type.Base. @@ -230,6 +329,66 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): self.checkRelativeUrlNotInSQLPathList(path_list) self.tic() self.checkRelativeUrlNotInSQLPathList(path_list) + # Now delete document while its indexation is running + # (both started and not committed yet). + # First, create a person but do not index it. + person = person_module.newContent(id='4', portal_type='Person') + path_list = [person.getRelativeUrl()] + self.commit() + self.checkRelativeUrlNotInSQLPathList(path_list) + rendez_vous = threading.Event() + unblock_activity = threading.Event() + # Prepare an isolated transaction to act as one activity node. + def runValidablePendingActivities(portal_value, node_id): + """ + Validate messages once, execute whatever is immediately executable. + """ + activity_tool = portal_value.portal_activities + activity_tool.distribute() + # XXX: duplicate ActivityTool.tic, without locking as we are being + # multiple activity nodes in a single process. + for activity in ActivityTool.activity_dict.itervalues(): + while not activity.dequeueMessage(activity_tool, node_id, ()): + pass + # Monkey-patch catalog to synchronise between main thread and the + # isolated transaction. + catalog_tool_class = self.portal.portal_catalog.__class__ + orig_catalogObjectList = catalog_tool_class.catalogObjectList + def catalogObjectList(*args, **kw): + # Note: rendez-vous/unblock_activity *before* modifying tables, otherwise + # unindexation's synchronous catalog update will deadlock: + # unindexation UPDATE -> indexation UPDATE -> unblock_activity -> unindexation commit + rendez_vous.set() + assert unblock_activity.wait(10), format_stack() + orig_catalogObjectList(*args, **kw) + catalog_tool_class.catalogObjectList = catalogObjectList + try: + # Let pending activities (indexation) start. + with TransactionThread( + portal_value=self.portal, + payload=partial(runValidablePendingActivities, node_id=2), + ): + # Wait until indexation is indeed initiated. + assert rendez_vous.wait(10), format_stack() + # Delete object, which will try to modify catalog content and spawn + # unindexation activity. + person_module.manage_delObjects(ids=['4']) + self.commit() + # Try to run this activity. It should not run, as it must wait on + # indexation to be over. + runValidablePendingActivities(self.portal, 1) + # Let indexation carry on, it is still able to access the object. + unblock_activity.set() + finally: + # Un-monkey-patch. + catalog_tool_class.catalogObjectList = orig_catalogObjectList + # Document must be indexed: unindexation must have waited for indexation + # to finish, so runValidablePendingActivities(..., 1) must have been + # a no-op. + self.checkRelativeUrlInSQLPathList(path_list) + self.tic() + # And now it's gone. + self.checkRelativeUrlNotInSQLPathList(path_list) def test_04_SearchFolderWithDeletedObjects(self): person_module = self.getPersonModule() diff --git a/product/ERP5Type/CopySupport.py b/product/ERP5Type/CopySupport.py index 6e3b7cba663751132b64e63841960ead83e5794e..a71b76d4258c85cef311641690e54ab9eb842b97 100644 --- a/product/ERP5Type/CopySupport.py +++ b/product/ERP5Type/CopySupport.py @@ -365,8 +365,8 @@ class CopyContainer: except AttributeError: pass else: - # Make sure there is not activity for this object - self.flushActivity(invoke=0) + # Cleanup any failed and spawned-during-current-transaction activities for this document. + self.flushActivity(invoke=0, only_safe=True) uid = getattr(self,'uid',None) if uid is None: return @@ -498,6 +498,20 @@ class CopyContainer: new_ob.setDefaultReindexParameterDict(reindex_kw) if is_indexable is not None and not is_indexable: new_ob._setNonIndexable() + if is_clone: + # Clear uids before spawning indexation activities, so the activity does + # not do an uid check (as the uid is still the old one, and a new one + # will be allocated by _postDuplicate, which must run after object gets + # a parent). + # Note: do not use __recurse as it only iterates over immediate content, + # and then stop instead of calling itself into them. It relies on called + # methods to call it back, and we do not want that for _setUid . + todo_list = [new_ob] + while todo_list: + document = todo_list.pop() + todo_list.extend(document.objectValues()) + todo_list.extend(document.opaqueValues()) + document._setUid(None) self._setObject(new_id, new_ob, set_owner=set_owner) new_ob = self._getOb(new_id) new_ob._postCopy(self, op=op) diff --git a/product/ZSQLCatalog/Extensions/zsqlbrain.py b/product/ZSQLCatalog/Extensions/zsqlbrain.py index 325a1d5a06941a588f9484863e9c5e05f40d732c..54accfbd7f52d5907cfc0338a64c832613f57c87 100644 --- a/product/ZSQLCatalog/Extensions/zsqlbrain.py +++ b/product/ZSQLCatalog/Extensions/zsqlbrain.py @@ -137,7 +137,7 @@ class ZSQLBrain(Acquisition.Implicit): # a queue can be provided as well as extra parameters # which can be used for example to define deferred tasks return activity_tool.activateObject( - self.getPath(), activity, active_process, **new_kw) + self.getPath(), activity, active_process, uid=self.getUid(), **new_kw) allow_class(ZSQLBrain)