From 24109b57cd578599b926a9ccdea91312db9112a8 Mon Sep 17 00:00:00 2001 From: Vincent Pelletier Date: Tue, 17 Sep 2019 14:40:31 +0900 Subject: [PATCH 1/2] CMFActivity.ActivityTool: Use uid for identity check instead of oid. uids are a way one can signal that different objects (from a ZODB point of view, and hence an oid point of view) are actually to be considered as the same objet (from a higher abstration point of view). For example, another variant of the same object, as imported over an older variant. In turn, this allows extending the protection to activities spawned from brains, and not just from live objects. --- product/CMFActivity/ActivityTool.py | 42 +++++++++----------- product/CMFActivity/tests/testCMFActivity.py | 2 +- product/ERP5Type/CopySupport.py | 14 +++++++ product/ZSQLCatalog/Extensions/zsqlbrain.py | 2 +- 4 files changed, 34 insertions(+), 26 deletions(-) diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py index 297b3a55f34..1affcf99ad3 100644 --- a/product/CMFActivity/ActivityTool.py +++ b/product/CMFActivity/ActivityTool.py @@ -194,13 +194,13 @@ class Message(BaseMessage): exc_type = None is_executed = MESSAGE_NOT_EXECUTED traceback = None - oid = None + document_uid = None is_registered = False def __init__( self, url, - oid, + document_uid, active_process, active_process_uid, activity_kw, @@ -210,7 +210,7 @@ class Message(BaseMessage): portal_activities=None, ): self.object_path = url - self.oid = oid + self.document_uid = document_uid self.active_process = active_process self.active_process_uid = active_process_uid self.activity_kw = activity_kw @@ -266,10 +266,8 @@ class Message(BaseMessage): % (self.object_path,), error=True) self.setExecutionState(MESSAGE_NOT_EXECUTABLE) else: - if (self.oid and self.oid != getattr(aq_base(obj), '_p_oid', None) and - # XXX: BusinessTemplate must be fixed to preserve OID - 'portal_workflow' not in self.object_path): - raise ValueError("OID mismatch for %r" % obj) + if self.document_uid and self.document_uid != getattr(aq_base(obj), 'uid', None): + raise ValueError("UID mismatch for %r" % obj) return obj def getObjectList(self, activity_tool): @@ -506,7 +504,7 @@ class Method(object): __slots__ = ( '_portal_activities', '_passive_url', - '_passive_oid', + '_passive_uid', '_activity', '_active_process', '_active_process_uid', @@ -515,11 +513,11 @@ class Method(object): '_request', ) - def __init__(self, portal_activities, passive_url, passive_oid, activity, + def __init__(self, portal_activities, passive_url, passive_uid, activity, active_process, active_process_uid, kw, method_id, request): self._portal_activities = portal_activities self._passive_url = passive_url - self._passive_oid = passive_oid + self._passive_uid = passive_uid self._activity = activity self._active_process = active_process self._active_process_uid = active_process_uid @@ -531,7 +529,7 @@ class Method(object): portal_activities = self._portal_activities m = Message( url=self._passive_url, - oid=self._passive_oid, + document_uid=self._passive_uid, active_process=self._active_process, active_process_uid=self._active_process_uid, activity_kw=self._kw, @@ -552,7 +550,7 @@ class ActiveWrapper(object): __slots__ = ( '__portal_activities', '__passive_url', - '__passive_oid', + '__passive_uid', '__activity', '__active_process', '__active_process_uid', @@ -562,12 +560,12 @@ class ActiveWrapper(object): # Shortcut security lookup (avoid calling __getattr__) __parent__ = None - def __init__(self, portal_activities, url, oid, activity, active_process, + def __init__(self, portal_activities, url, uid, activity, active_process, active_process_uid, kw, request): # second parameter can be an object or an object's path self.__portal_activities = portal_activities self.__passive_url = url - self.__passive_oid = oid + self.__passive_uid = uid self.__activity = activity self.__active_process = active_process self.__active_process_uid = active_process_uid @@ -578,7 +576,7 @@ class ActiveWrapper(object): return Method( self.__portal_activities, self.__passive_url, - self.__passive_oid, + self.__passive_uid, self.__activity, self.__active_process, self.__active_process_uid, @@ -1396,7 +1394,7 @@ class ActivityTool (BaseTool): def activateObject(self, object, activity=DEFAULT_ACTIVITY, active_process=None, serialization_tag=None, - node=None, **kw): + node=None, uid=None, **kw): if active_process is None: active_process_uid = None elif isinstance(active_process, str): @@ -1406,15 +1404,11 @@ class ActivityTool (BaseTool): active_process_uid = active_process.getUid() active_process = active_process.getPhysicalPath() if isinstance(object, str): - oid = None url = tuple(object.split('/')) else: - try: - oid = aq_base(object)._p_oid - # Note that it's too early to get the OID of a newly created object, - # so at this point, self.oid may still be None. - except AttributeError: - pass + if uid is not None: + raise ValueError + uid = getattr(aq_base(object), 'uid', None) url = object.getPhysicalPath() if serialization_tag is not None: kw['serialization_tag'] = serialization_tag @@ -1438,7 +1432,7 @@ class ActivityTool (BaseTool): except ValueError: pass break - return ActiveWrapper(self, url, oid, activity, + return ActiveWrapper(self, url, uid, activity, active_process, active_process_uid, kw, getattr(self, 'REQUEST', None)) diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py index 80d063608e0..e6db717bb84 100644 --- a/product/CMFActivity/tests/testCMFActivity.py +++ b/product/CMFActivity/tests/testCMFActivity.py @@ -2342,7 +2342,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): self.assertEqual(1, activity_tool.countMessage()) self.flushAllActivities() sender, recipients, mail = message_list.pop() - self.assertIn('OID mismatch', mail) + self.assertIn('UID mismatch', mail) m, = activity_tool.getMessageList() self.assertEqual(m.processing_node, INVOKE_ERROR_STATE) obj.flushActivity() diff --git a/product/ERP5Type/CopySupport.py b/product/ERP5Type/CopySupport.py index 6e3b7cba663..119b2d43183 100644 --- a/product/ERP5Type/CopySupport.py +++ b/product/ERP5Type/CopySupport.py @@ -498,6 +498,20 @@ class CopyContainer: new_ob.setDefaultReindexParameterDict(reindex_kw) if is_indexable is not None and not is_indexable: new_ob._setNonIndexable() + if is_clone: + # Clear uids before spawning indexation activities, so the activity does + # not do an uid check (as the uid is still the old one, and a new one + # will be allocated by _postDuplicate, which must run after object gets + # a parent). + # Note: do not use __recurse as it only iterates over immediate content, + # and then stop instead of calling itself into them. It relies on called + # methods to call it back, and we do not want that for _setUid . + todo_list = [new_ob] + while todo_list: + document = todo_list.pop() + todo_list.extend(document.objectValues()) + todo_list.extend(document.opaqueValues()) + document._setUid(None) self._setObject(new_id, new_ob, set_owner=set_owner) new_ob = self._getOb(new_id) new_ob._postCopy(self, op=op) diff --git a/product/ZSQLCatalog/Extensions/zsqlbrain.py b/product/ZSQLCatalog/Extensions/zsqlbrain.py index 325a1d5a069..54accfbd7f5 100644 --- a/product/ZSQLCatalog/Extensions/zsqlbrain.py +++ b/product/ZSQLCatalog/Extensions/zsqlbrain.py @@ -137,7 +137,7 @@ class ZSQLBrain(Acquisition.Implicit): # a queue can be provided as well as extra parameters # which can be used for example to define deferred tasks return activity_tool.activateObject( - self.getPath(), activity, active_process, **new_kw) + self.getPath(), activity, active_process, uid=self.getUid(), **new_kw) allow_class(ZSQLBrain) -- 2.30.9 From 2695b849fdd8dec2bee80ba95f6862a7a42a0668 Mon Sep 17 00:00:00 2001 From: Vincent Pelletier Date: Tue, 28 May 2019 17:57:54 +0900 Subject: [PATCH 2/2] EPR5Type.CopySupport: Only delete activities for deleted document which can be safely deleted. Safe activities are: - those which are not visible outside current transaction (IOW, queued in activity buffer) - those which are already marked as failed, as no activity node will try to process these Other activities may already be running. Note: deleting even failed activities may violate activity dependencies (other activities may be waiting on the ones being deleted, even those not spawned yet), but if this is an issue then some other code should take care of these as well. Also, rework a CMFActivity test checking that activties already-spawned on a later-deleted context get discarded instead of being executed: previous implementation was working around the activity deletion removed in this commit. Also, test indexation/unindexation parallelism. --- product/CMFActivity/Activity/SQLBase.py | 4 +- product/CMFActivity/tests/testCMFActivity.py | 11 +- product/ERP5Catalog/tests/testERP5Catalog.py | 161 ++++++++++++++++++- product/ERP5Type/CopySupport.py | 4 +- 4 files changed, 173 insertions(+), 7 deletions(-) diff --git a/product/CMFActivity/Activity/SQLBase.py b/product/CMFActivity/Activity/SQLBase.py index b6622d741f8..9a08b8b24a5 100644 --- a/product/CMFActivity/Activity/SQLBase.py +++ b/product/CMFActivity/Activity/SQLBase.py @@ -806,7 +806,7 @@ CREATE TABLE %s ( self._log(WARNING, 'Exception during notification phase of finalizeMessageExecution') - def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): + def flush(self, activity_tool, object_path, invoke=0, method_id=None, only_safe=False, **kw): """ object_path is a tuple """ @@ -843,6 +843,8 @@ CREATE TABLE %s ( db = activity_tool.getSQLConnection() for line in self._getMessageList(db, path=path, **({'method_id': method_id} if method_id else {})): + if only_safe and line.processing_node > -2: + continue uid_list.append(line.uid) if invoke and line.processing_node <= 0: invoke(Message.load(line.message, uid=line.uid, line=line)) diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py index e6db717bb84..6a919c8f6df 100644 --- a/product/CMFActivity/tests/testCMFActivity.py +++ b/product/CMFActivity/tests/testCMFActivity.py @@ -1125,6 +1125,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): def testTryUserNotificationOnActivityFailure(self, activity): message_list = self.portal.MailHost._message_list del message_list[:] + portal_activities = self.portal.portal_activities + countMessage = portal_activities.countMessage obj = self.portal.organisation_module.newContent(portal_type='Organisation') self.tic() def failingMethod(self): raise ValueError('This method always fails') @@ -1140,13 +1142,16 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): self.assertIn("Module %s, line %s, in failingMethod" % ( __name__, inspect.getsourcelines(failingMethod)[1]), mail) self.assertIn("ValueError:", mail) + portal_activities.manageClearActivities() # MESSAGE_NOT_EXECUTABLE - obj.getParentValue()._delObject(obj.getId()) + obj_path = obj.getPath() obj.activate(activity=activity).failingMethod() self.commit() - self.assertTrue(obj.hasActivity()) + obj.getParentValue()._delObject(obj.getId()) + self.commit() + self.assertGreater(countMessage(path=obj_path), 0) self.tic() - self.assertFalse(obj.hasActivity()) + self.assertEqual(countMessage(path=obj_path), 0) self.assertFalse(message_list) finally: del Organisation.failingMethod diff --git a/product/ERP5Catalog/tests/testERP5Catalog.py b/product/ERP5Catalog/tests/testERP5Catalog.py index c5b1232e225..b4fea7a438f 100644 --- a/product/ERP5Catalog/tests/testERP5Catalog.py +++ b/product/ERP5Catalog/tests/testERP5Catalog.py @@ -27,16 +27,21 @@ # ############################################################################## +from functools import partial +import httplib from random import randint import sys +import threading +import traceback import unittest -import httplib +import six from AccessControl import getSecurityManager from AccessControl.SecurityManagement import newSecurityManager from Acquisition import aq_base from DateTime import DateTime from _mysql_exceptions import ProgrammingError from OFS.ObjectManager import ObjectManager +from Products.CMFActivity import ActivityTool from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.utils import LogInterceptor, createZODBPythonScript, todo_erp5, getExtraSqlConnectionStringList from Products.PageTemplates.Expressions import getEngine @@ -44,6 +49,100 @@ from Products.ZSQLCatalog.SQLCatalog import Query, ComplexQuery, SimpleQuery from Testing import ZopeTestCase from zLOG import LOG +def format_stack(thread=None): + frame_dict = sys._current_frames() + if thread is not None: + thread_id = thread.ident + frame_dict = { + thread_id: frame_dict[thread_id], + } + frame = None + try: + return ''.join(( + 'Thread %s\n %s' % ( + thread_id, + ' '.join(traceback.format_stack(frame)), + ) + for thread_id, frame in frame_dict.iteritems() + )) + finally: + del frame, frame_dict + +class TransactionThread(threading.Thread): + """ + Run payload(portal_value=portal_value) within a separate transaction. + Note: because of transaction isolation, given portal_value will be a + different instance of the same persistent object. + + Instances of this class may be used as a context manager to manage thread + lifespam, especially to be properly informed of any exception which happened + during thread's life. In which case, join_timeout is used upon context exit. + """ + def __init__(self, portal_value, payload, join_timeout=10): + super(TransactionThread, self).__init__() + self.daemon = True + self.zodb = portal_value._p_jar.db() + self.root_physical_path = portal_value.getPhysicalPath() + self.payload = payload + self.exception = None + self.join_timeout = join_timeout + + def run(self): + try: + # Get a new portal, in a new transactional connection bound to default + # transaction manager (which should be the threaded transaction manager). + portal_value = self.zodb.open().root()['Application'].unrestrictedTraverse( + self.root_physical_path, + ) + # Trigger ERP5Site magic + portal_value.getSiteManager() + # Trigger skin magic + portal_value.changeSkin(None) + # Login + newSecurityManager(None, portal_value.acl_users.getUser('ERP5TypeTestCase')) + self.payload(portal_value=portal_value) + except Exception as self.exception: + if six.PY2: + self.exception.__traceback__ = sys.exc_info()[2] + + def join(self, *args, **kw): + super(TransactionThread, self).join(*args, **kw) + if not self.is_alive(): + exception = self.exception + # Break reference cycle: + # run frame -> self -> exception -> __traceback__ -> run frame + # Not re-raising on subsequent calls is kind of a bug, but it's really up + # to caller to either not ignore exceptions or keep them around. + self.exception = None + if exception is not None: + if six.PY3: + raise exception + six.reraise(exception, None, exception.__traceback__) + + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + try: + self.join(self.join_timeout) + # Note: if context was interrupted by an exception, being unable to join + # the thread may be unavoidable (ex: context could not signal blocked + # thread), in which case this assertion will be mostly useless noise. + # But conditionally ignoring this seems worse. + assert not self.is_alive(), format_stack(self) + except Exception as join_exc_val: + if exc_val is None: + # No exception from context: just propagate exception + raise + # Both an exception from context and an exception in thread + if six.PY3: + # PY3: "raise join_exc_val from exc_val" + six.raise_from(join_exc_val, exc_val) + # PY2, handle our exception ourselves and let interpreter reraise + # context's. + traceback.print_exc() + class IndexableDocument(ObjectManager): # This tests uses a simple ObjectManager, but ERP5Catalog only # support classes inherting from ERP5Type.Base. @@ -230,6 +329,66 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): self.checkRelativeUrlNotInSQLPathList(path_list) self.tic() self.checkRelativeUrlNotInSQLPathList(path_list) + # Now delete document while its indexation is running + # (both started and not committed yet). + # First, create a person but do not index it. + person = person_module.newContent(id='4', portal_type='Person') + path_list = [person.getRelativeUrl()] + self.commit() + self.checkRelativeUrlNotInSQLPathList(path_list) + rendez_vous = threading.Event() + unblock_activity = threading.Event() + # Prepare an isolated transaction to act as one activity node. + def runValidablePendingActivities(portal_value, node_id): + """ + Validate messages once, execute whatever is immediately executable. + """ + activity_tool = portal_value.portal_activities + activity_tool.distribute() + # XXX: duplicate ActivityTool.tic, without locking as we are being + # multiple activity nodes in a single process. + for activity in ActivityTool.activity_dict.itervalues(): + while not activity.dequeueMessage(activity_tool, node_id, ()): + pass + # Monkey-patch catalog to synchronise between main thread and the + # isolated transaction. + catalog_tool_class = self.portal.portal_catalog.__class__ + orig_catalogObjectList = catalog_tool_class.catalogObjectList + def catalogObjectList(*args, **kw): + # Note: rendez-vous/unblock_activity *before* modifying tables, otherwise + # unindexation's synchronous catalog update will deadlock: + # unindexation UPDATE -> indexation UPDATE -> unblock_activity -> unindexation commit + rendez_vous.set() + assert unblock_activity.wait(10), format_stack() + orig_catalogObjectList(*args, **kw) + catalog_tool_class.catalogObjectList = catalogObjectList + try: + # Let pending activities (indexation) start. + with TransactionThread( + portal_value=self.portal, + payload=partial(runValidablePendingActivities, node_id=2), + ): + # Wait until indexation is indeed initiated. + assert rendez_vous.wait(10), format_stack() + # Delete object, which will try to modify catalog content and spawn + # unindexation activity. + person_module.manage_delObjects(ids=['4']) + self.commit() + # Try to run this activity. It should not run, as it must wait on + # indexation to be over. + runValidablePendingActivities(self.portal, 1) + # Let indexation carry on, it is still able to access the object. + unblock_activity.set() + finally: + # Un-monkey-patch. + catalog_tool_class.catalogObjectList = orig_catalogObjectList + # Document must be indexed: unindexation must have waited for indexation + # to finish, so runValidablePendingActivities(..., 1) must have been + # a no-op. + self.checkRelativeUrlInSQLPathList(path_list) + self.tic() + # And now it's gone. + self.checkRelativeUrlNotInSQLPathList(path_list) def test_04_SearchFolderWithDeletedObjects(self): person_module = self.getPersonModule() diff --git a/product/ERP5Type/CopySupport.py b/product/ERP5Type/CopySupport.py index 119b2d43183..a71b76d4258 100644 --- a/product/ERP5Type/CopySupport.py +++ b/product/ERP5Type/CopySupport.py @@ -365,8 +365,8 @@ class CopyContainer: except AttributeError: pass else: - # Make sure there is not activity for this object - self.flushActivity(invoke=0) + # Cleanup any failed and spawned-during-current-transaction activities for this document. + self.flushActivity(invoke=0, only_safe=True) uid = getattr(self,'uid',None) if uid is None: return -- 2.30.9