Commit c85a840f authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: slightly delay non-executed grouped messages

When grouped messages fail, ActivityTool must distinguish 3 groups,
in order to reexecute them separately, as follows:
- first, those that succeeded
- then, those that were skipped
- at last, failed ones

Grouping methods are updated to handle partial failures, and stop doing
anything when something goes wrong.

Without this, we would have the following pathological cases.

1. Let's suppose first that skipped messages are marked as succeeded.

The problem is that each skipped message that will fail causes the reexecution
of those that didn't fail.

Exemple: A:ok B:ok C:err D:err E:err F:err
  1: A:ok, B:ok, C:err, D:skipped, E:skipped, F:skipped
  2: A:ok, B:ok, D:err, E:skipped, F:skipped
  3: A:ok, B:ok, E:err, F:skipped
  4: A:ok, B:ok, F:err
  5: A:ok, B:ok -> commit

And worst, the first failed (C) may be processable again before 5, entering
a failing loop if it is executed again in the same group as A & B.

2. Another implementation is to mark all skipped as failed.

Example:
  1: A:ok, B:ok, C:err, D:skipped, E:skipped, F:skipped
  2: A:ok, B:ok -> commit
  3: C:err, D:skipped, E:skipped, F:skipped
 >3: same as 3

=> D, E or F are never tried.
parent 30fbdd3d
...@@ -33,7 +33,7 @@ from Shared.DC.ZRDB.Results import Results ...@@ -33,7 +33,7 @@ from Shared.DC.ZRDB.Results import Results
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
from Products.CMFActivity.ActivityTool import ( from Products.CMFActivity.ActivityTool import (
Message, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED) Message, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, SkippedMessage)
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE
from Products.CMFActivity.ActivityRuntimeEnvironment import ( from Products.CMFActivity.ActivityRuntimeEnvironment import (
DEFAULT_MAX_RETRY, ActivityRuntimeEnvironment, getTransactionalVariable) DEFAULT_MAX_RETRY, ActivityRuntimeEnvironment, getTransactionalVariable)
...@@ -569,7 +569,8 @@ class SQLBase(Queue): ...@@ -569,7 +569,8 @@ class SQLBase(Queue):
if uid_to_duplicate_uid_list_dict is not None: if uid_to_duplicate_uid_list_dict is not None:
make_available_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ()) make_available_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
if (m.exc_type and # m.exc_type may be None if (m.exc_type and # m.exc_type may be None
m.conflict_retry and issubclass(m.exc_type, ConflictError)): (m.conflict_retry if issubclass(m.exc_type, ConflictError) else
m.exc_type is SkippedMessage)):
delay_uid_list.append(uid) delay_uid_list.append(uid)
else: else:
max_retry = m.max_retry max_retry = m.max_retry
......
...@@ -137,6 +137,10 @@ MESSAGE_EXECUTED = 1 ...@@ -137,6 +137,10 @@ MESSAGE_EXECUTED = 1
MESSAGE_NOT_EXECUTABLE = 2 MESSAGE_NOT_EXECUTABLE = 2
class SkippedMessage(Exception):
pass
class Message(BaseMessage): class Message(BaseMessage):
"""Activity Message Class. """Activity Message Class.
...@@ -417,6 +421,8 @@ Named Parameters: %r ...@@ -417,6 +421,8 @@ Named Parameters: %r
raise Exception, 'Message execution failed, but there is no exception to explain it. This is a dummy exception so that one can track down why we end up here outside of an exception handling code path.' raise Exception, 'Message execution failed, but there is no exception to explain it. This is a dummy exception so that one can track down why we end up here outside of an exception handling code path.'
except Exception: except Exception:
exc_info = sys.exc_info() exc_info = sys.exc_info()
elif exc_info[0] is SkippedMessage:
return
if log: if log:
LOG('ActivityTool', WARNING, 'Could not call method %s on object %s. Activity created at:\n%s' % (self.method_id, self.object_path, self.call_traceback), error=exc_info) LOG('ActivityTool', WARNING, 'Could not call method %s on object %s. Activity created at:\n%s' % (self.method_id, self.object_path, self.call_traceback), error=exc_info)
# push the error in ZODB error_log # push the error in ZODB error_log
...@@ -1285,7 +1291,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -1285,7 +1291,7 @@ class ActivityTool (Folder, UniqueObject):
active_obj = subobj.activate(activity=activity, **activity_kw) active_obj = subobj.activate(activity=activity, **activity_kw)
getattr(active_obj, alternate_method_id)(*m.args, **m.kw) getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
else: else:
expanded_object_list.append([subobj, m.args, m.kw, None]) expanded_object_list.append([subobj, m.args, m.kw])
except: except:
m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self) m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
...@@ -1294,9 +1300,12 @@ class ActivityTool (Folder, UniqueObject): ...@@ -1294,9 +1300,12 @@ class ActivityTool (Folder, UniqueObject):
if len(expanded_object_list) > 0: if len(expanded_object_list) > 0:
traverse = self.getPortalObject().unrestrictedTraverse traverse = self.getPortalObject().unrestrictedTraverse
# FIXME: how to apply security here? # FIXME: how to apply security here?
# NOTE: expanded_object_list[*][3] must be updated by the callee: # NOTE: The callee must update each processed item of
# it must be deleted in case of failure, or updated with the # expanded_object_list, by appending:
# result to post on the active process otherwise. # - exc_info in case of error (so its length becomes 6)
# - None or the result to post on the active process otherwise
# (length=4)
# Skipped item must not be touched (length=3).
traverse(method_id)(expanded_object_list) traverse(method_id)(expanded_object_list)
except: except:
# In this case, the group method completely failed. # In this case, the group method completely failed.
...@@ -1329,7 +1338,10 @@ class ActivityTool (Folder, UniqueObject): ...@@ -1329,7 +1338,10 @@ class ActivityTool (Folder, UniqueObject):
else: else:
m.setExecutionState(MESSAGE_EXECUTED, context=self) m.setExecutionState(MESSAGE_EXECUTED, context=self)
continue continue
m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self) exc_info = result[3:]
m.setExecutionState(MESSAGE_NOT_EXECUTED,
tuple(exc_info) if exc_info else (SkippedMessage,),
context=self)
if self.activity_tracking: if self.activity_tracking:
activity_tracking_logger.info('invoked group messages') activity_tracking_logger.info('invoked group messages')
...@@ -1337,8 +1349,11 @@ class ActivityTool (Folder, UniqueObject): ...@@ -1337,8 +1349,11 @@ class ActivityTool (Folder, UniqueObject):
class dummyGroupMethod(object): class dummyGroupMethod(object):
def __bobo_traverse__(self, REQUEST, method_id): def __bobo_traverse__(self, REQUEST, method_id):
def group_method(message_list): def group_method(message_list):
for m in message_list: try:
m[3] = getattr(m[0], method_id)(*m[1], **m[2]) for m in message_list:
m.append(getattr(m[0], method_id)(*m[1], **m[2]))
except Exception:
m += sys.exc_info()
return group_method return group_method
dummyGroupMethod = dummyGroupMethod() dummyGroupMethod = dummyGroupMethod()
......
...@@ -1417,8 +1417,10 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1417,8 +1417,10 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
foobar_list = [] foobar_list = []
def setFoobar(self, object_list): def setFoobar(self, object_list):
foobar_list.append(len(object_list)) foobar_list.append(len(object_list))
for obj, args, kw, _ in object_list: for m in object_list:
obj, args, kw = m
obj.foobar = getattr(obj.aq_base, 'foobar', 0) + kw.get('number', 1) obj.foobar = getattr(obj.aq_base, 'foobar', 0) + kw.get('number', 1)
m.append(None)
from Products.ERP5Type.Core.Folder import Folder from Products.ERP5Type.Core.Folder import Folder
Folder.setFoobar = setFoobar Folder.setFoobar = setFoobar
...@@ -3039,8 +3041,12 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -3039,8 +3041,12 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.tic() self.tic()
group_method_call_list = [] group_method_call_list = []
def doSomething(self, message_list): def doSomething(self, message_list):
group_method_call_list.append(sorted((ob.getPath(), args, kw) r = []
for ob, args, kw, _ in message_list)) for m in message_list:
r.append((m[0].getPath(), m[1], m[2]))
m.append(None)
r.sort()
group_method_call_list.append(r)
activity_tool.__class__.doSomething = doSomething activity_tool.__class__.doSomething = doSomething
try: try:
for activity in 'SQLDict', 'SQLQueue': for activity in 'SQLDict', 'SQLQueue':
...@@ -3258,7 +3264,12 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -3258,7 +3264,12 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
merge_parent=c[(i-1)//2 or i].getPath(), merge_parent=c[(i-1)//2 or i].getPath(),
priority=priority, **kw).doSomething() priority=priority, **kw).doSomething()
def invokeGroup(self, message_list): def invokeGroup(self, message_list):
invoked.append(sorted(c.index(m[0]) for m in message_list)) r = []
for m in message_list:
r.append(c.index(m[0]))
m.append(None)
r.sort()
invoked.append(r)
category_tool.__class__.invokeGroup = invokeGroup category_tool.__class__.invokeGroup = invokeGroup
try: try:
activate(5, 0); activate(1, 1); check([1, 5]) activate(5, 0); activate(1, 1); check([1, 5])
...@@ -3380,6 +3391,40 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -3380,6 +3391,40 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
organisation.reindexObject() organisation.reindexObject()
self.tic() self.tic()
def test_failingGroupMethod(self):
activity_tool = self.portal.portal_activities
obj = activity_tool.newActiveProcess()
self.tic()
obj.x = 1
def doSomething(self):
self.x %= self.x
obj.__class__.doSomething = doSomething
try:
activity_kw = dict(activity="SQLQueue", group_method_id=None)
obj.activate(**activity_kw).doSomething()
obj.activate(**activity_kw).doSomething()
obj.activate(**activity_kw).doSomething()
self.commit()
self.assertEqual(3, len(activity_tool.getMessageList()))
activity_tool.tic()
self.assertEqual(obj.x, 0)
skipped, failed = activity_tool.getMessageList()
self.assertEqual(0, skipped.retry)
self.assertEqual(1, failed.retry)
obj.x = 1
self.commit()
activity_tool.timeShift(VALIDATION_ERROR_DELAY)
activity_tool.tic()
m, = activity_tool.getMessageList()
self.assertEqual(1, failed.retry)
obj.x = 1
self.commit()
activity_tool.timeShift(VALIDATION_ERROR_DELAY)
activity_tool.tic()
self.assertFalse(activity_tool.getMessageList())
finally:
del obj.__class__.doSomething
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity)) suite.addTest(unittest.makeSuite(TestCMFActivity))
......
...@@ -26,6 +26,8 @@ ...@@ -26,6 +26,8 @@
# #
############################################################################## ##############################################################################
import sys
from collections import defaultdict
from zLOG import LOG, INFO from zLOG import LOG, INFO
from Products.ERP5Type.Tool.BaseTool import BaseTool from Products.ERP5Type.Tool.BaseTool import BaseTool
from AccessControl import ClassSecurityInfo from AccessControl import ClassSecurityInfo
...@@ -140,12 +142,21 @@ class RuleTool(BaseTool): ...@@ -140,12 +142,21 @@ class RuleTool(BaseTool):
security.declarePrivate('updateSimulation') security.declarePrivate('updateSimulation')
@UnrestrictedMethod @UnrestrictedMethod
def updateSimulation(self, message_list): def updateSimulation(self, message_list):
expandable_dict = {} expandable_dict = defaultdict(list)
for m in message_list: for m in message_list:
expandable_dict.setdefault(m[0], {}).update(m[2]) expandable_dict[m[0]].append(m)
for expandable, kw in expandable_dict.iteritems(): for expandable, message_list in expandable_dict.iteritems():
LOG("RuleTool", INFO, "Updating simulation for %s: %r" try:
% (expandable.getPath(), kw)) kw = {}
expandable._updateSimulation(**kw) for m in message_list:
kw.update(m[2])
m.append(None)
LOG("RuleTool", INFO, "Updating simulation for %s: %r"
% (expandable.getPath(), kw))
expandable._updateSimulation(**kw)
except Exception:
exc_info = sys.exc_info()
for m in message_list:
m[3:] = exc_info
InitializeClass(RuleTool) InitializeClass(RuleTool)
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
# #
############################################################################## ##############################################################################
import sys
from copy import deepcopy from copy import deepcopy
from collections import defaultdict from collections import defaultdict
from Products.CMFCore.CatalogTool import CatalogTool as CMFCoreCatalogTool from Products.CMFCore.CatalogTool import CatalogTool as CMFCoreCatalogTool
...@@ -792,9 +793,12 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject): ...@@ -792,9 +793,12 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
tmp_object_list = [x[0] for x in object_list] tmp_object_list = [x[0] for x in object_list]
super(CatalogTool, self).catalogObjectList(tmp_object_list, **m[2]) super(CatalogTool, self).catalogObjectList(tmp_object_list, **m[2])
if tmp_object_list: if tmp_object_list:
for x in object_list: exc_info = sys.exc_info()
if x[0] in tmp_object_list: for x in object_list:
del x[3] # no result means failed if x[0] in tmp_object_list:
x += exc_info # failed
else:
x.append(None) # success, no result
else: else:
super(CatalogTool, self).catalogObjectList(object_list, *args, **kw) super(CatalogTool, self).catalogObjectList(object_list, *args, **kw)
...@@ -803,8 +807,11 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject): ...@@ -803,8 +807,11 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
"""Uncatalog a list of objects""" """Uncatalog a list of objects"""
# XXX: this is currently only a placeholder for further optimization # XXX: this is currently only a placeholder for further optimization
# (for the moment, it's not faster than the dummy group method) # (for the moment, it's not faster than the dummy group method)
for m in message_list: try:
self.unindexObject(*m[1], **m[2]) for m in message_list:
m.append(self.unindexObject(*m[1], **m[2]))
except Exception:
m += sys.exc_info()
security.declarePrivate('unindexObject') security.declarePrivate('unindexObject')
def unindexObject(self, object=None, path=None, uid=None,sql_catalog_id=None): def unindexObject(self, object=None, path=None, uid=None,sql_catalog_id=None):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment