Commit 1ac567bf authored by Julien Muchembled's avatar Julien Muchembled

Allow executed activity to decide how to finalize message execution

getActivityRuntimeEnvironment is changed to return an ActivityRuntimeEnvironment
instance instead of a dict (and this value is now stored in a transactional
variable, for automatic cleanup).
This object allow activities to change default behaviour of CMFActivity if an
error happens. In the future, this object could also allow executed activity to
inspect its related Message object.

In case of infinite retry, notify the user when the default limit is reached.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@32879 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 53c7fc56
...@@ -35,8 +35,6 @@ from Products.CMFActivity.ActiveObject import ( ...@@ -35,8 +35,6 @@ from Products.CMFActivity.ActiveObject import (
INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE) INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE)
from Queue import VALIDATION_ERROR_DELAY from Queue import VALIDATION_ERROR_DELAY
MAX_RETRY = 5
class SQLBase: class SQLBase:
""" """
...@@ -166,16 +164,24 @@ class SQLBase: ...@@ -166,16 +164,24 @@ class SQLBase:
# please, remove the "type(m.exc_type) is type(ConflictError)" check # please, remove the "type(m.exc_type) is type(ConflictError)" check
# and leave only the "issubclass(m.exc_type, ConflictError)" check. # and leave only the "issubclass(m.exc_type, ConflictError)" check.
if type(m.exc_type) is type(ConflictError) and \ if type(m.exc_type) is type(ConflictError) and \
issubclass(m.exc_type, ConflictError): m.conflict_retry and issubclass(m.exc_type, ConflictError):
delay_uid_list.append(uid) delay_uid_list.append(uid)
else: else:
max_retry = m.max_retry
retry = m.line.retry retry = m.line.retry
if retry >= MAX_RETRY: if max_retry is not None and retry >= max_retry:
# Always notify when we stop retrying.
notify_user_list.append(m) notify_user_list.append(m)
final_error_uid_list.append(uid) final_error_uid_list.append(uid)
continue continue
# By default, make delay quadratic to the number of retries. # In case of infinite retry, notify the user
delay = VALIDATION_ERROR_DELAY * (retry * retry + 1) / 2 # when the default limit is reached.
if max_retry is None and retry == m.__class__.max_retry:
notify_user_list.append(m)
delay = m.delay
if delay is None:
# By default, make delay quadratic to the number of retries.
delay = VALIDATION_ERROR_DELAY * (retry * retry + 1) / 2
try: try:
# Immediately update, because values different for every message # Immediately update, because values different for every message
activity_tool.SQLBase_reactivate(table=self.sql_table, activity_tool.SQLBase_reactivate(table=self.sql_table,
......
...@@ -36,7 +36,8 @@ import sys ...@@ -36,7 +36,8 @@ import sys
from types import ClassType from types import ClassType
#from time import time #from time import time
from SQLBase import SQLBase from SQLBase import SQLBase
from Products.CMFActivity.ActivityRuntimeEnvironment import setActivityRuntimeValue, updateActivityRuntimeValue, clearActivityRuntimeEnvironment from Products.CMFActivity.ActivityRuntimeEnvironment import (
ActivityRuntimeEnvironment, getTransactionalVariable)
from zExceptions import ExceptionFormatter from zExceptions import ExceptionFormatter
try: try:
...@@ -315,19 +316,15 @@ class SQLDict(RAMDict, SQLBase): ...@@ -315,19 +316,15 @@ class SQLDict(RAMDict, SQLBase):
# Remove group_id parameter from group_method_id # Remove group_id parameter from group_method_id
if group_method_id is not None: if group_method_id is not None:
group_method_id = group_method_id.split('\0')[0] group_method_id = group_method_id.split('\0')[0]
clearActivityRuntimeEnvironment()
if group_method_id not in (None, ""): if group_method_id not in (None, ""):
setActivityRuntimeValue('group_method_id', group_method_id)
method = activity_tool.invokeGroup method = activity_tool.invokeGroup
args = (group_method_id, message_list) args = (group_method_id, message_list)
activity_runtime_environment = ActivityRuntimeEnvironment(None)
else: else:
method = activity_tool.invoke method = activity_tool.invoke
message = message_list[0] message = message_list[0]
args = (message, ) args = (message, )
updateActivityRuntimeValue({'activity_kw': message.activity_kw, activity_runtime_environment = ActivityRuntimeEnvironment(message)
'priority': message.line.priority,
'uid': message.uid})
setActivityRuntimeValue('processing_node', processing_node)
# Commit right before executing messages. # Commit right before executing messages.
# As MySQL transaction does not start exactly at the same time as ZODB # As MySQL transaction does not start exactly at the same time as ZODB
# transactions but a bit later, messages available might be called # transactions but a bit later, messages available might be called
...@@ -336,6 +333,8 @@ class SQLDict(RAMDict, SQLBase): ...@@ -336,6 +333,8 @@ class SQLDict(RAMDict, SQLBase):
# So all connectors must be committed now that we have selected # So all connectors must be committed now that we have selected
# everything needed from MySQL to get a fresh view of ZODB objects. # everything needed from MySQL to get a fresh view of ZODB objects.
get_transaction().commit() get_transaction().commit()
tv = getTransactionalVariable(None)
tv['activity_runtime_environment'] = activity_runtime_environment
# Try to invoke # Try to invoke
try: try:
method(*args) method(*args)
......
...@@ -37,7 +37,8 @@ import sys ...@@ -37,7 +37,8 @@ import sys
from time import time from time import time
from sets import ImmutableSet from sets import ImmutableSet
from SQLBase import SQLBase from SQLBase import SQLBase
from Products.CMFActivity.ActivityRuntimeEnvironment import setActivityRuntimeValue, updateActivityRuntimeValue, clearActivityRuntimeEnvironment from Products.CMFActivity.ActivityRuntimeEnvironment import (
ActivityRuntimeEnvironment, getTransactionalVariable)
from zExceptions import ExceptionFormatter from zExceptions import ExceptionFormatter
try: try:
...@@ -214,13 +215,10 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -214,13 +215,10 @@ class SQLQueue(RAMQueue, SQLBase):
# So all connectors must be committed now that we have selected # So all connectors must be committed now that we have selected
# everything needed from MySQL to get a fresh view of ZODB objects. # everything needed from MySQL to get a fresh view of ZODB objects.
get_transaction().commit() get_transaction().commit()
tv = getTransactionalVariable(None)
for m in message_list: for m in message_list:
tv['activity_runtime_environment'] = ActivityRuntimeEnvironment(m)
processed_count += 1 processed_count += 1
clearActivityRuntimeEnvironment()
updateActivityRuntimeValue({'processing_node': processing_node,
'activity_kw': m.activity_kw,
'priority': m.line.priority,
'uid': m.uid})
# Try to invoke # Try to invoke
try: try:
activity_tool.invoke(m) activity_tool.invoke(m)
......
import threading from Products.ERP5Type.TransactionalVariable import getTransactionalVariable
import copy
activity_runtime_environment_container = threading.local()
def getActivityRuntimeEnvironment(): def getActivityRuntimeEnvironment():
""" """
Raises AttributeError if called outside activity. Raises KeyError if called outside activity.
""" """
return copy.deepcopy(activity_runtime_environment_container.current) return getTransactionalVariable(None)['activity_runtime_environment']
def _getActivityRuntimeEnvironment(): def _getActivityRuntimeEnvironment():
current = getattr(activity_runtime_environment_container, 'current', None) try:
if current is None: return getActivityRuntimeEnvironment()
current = activity_runtime_environment_container.current = {} except KeyError:
return current return
def setActivityRuntimeValue(key, value):
"""
TODO: protect against unauthorized use ?
"""
_getActivityRuntimeEnvironment()[key] = value
def updateActivityRuntimeValue(new_dict): class BaseMessage:
"""
TODO: protect against unauthorized use ? delay = None
""" # None means infinite retry
_getActivityRuntimeEnvironment().update(new_dict) max_retry = 5
# For errors happening after message invocation (ConflictError),
# should we retry quickly without increasing 'retry' count ?
conflict_retry = True
class ActivityRuntimeEnvironment(object):
def clearActivityRuntimeEnvironment(): def __init__(self, message):
if getattr(activity_runtime_environment_container, 'current', None) is not None: self._message = message
delattr(activity_runtime_environment_container, 'current')
def edit(self, **kw):
# There is no point allowing to modify other attributes from a message
for k in kw:
getattr(BaseMessage, k)
self._message.__dict__.update(kw)
...@@ -48,6 +48,7 @@ from Products.ERP5Type.Globals import InitializeClass, DTMLFile ...@@ -48,6 +48,7 @@ from Products.ERP5Type.Globals import InitializeClass, DTMLFile
from Acquisition import aq_base from Acquisition import aq_base
from Acquisition import aq_inner from Acquisition import aq_inner
from ActivityBuffer import ActivityBuffer from ActivityBuffer import ActivityBuffer
from ActivityRuntimeEnvironment import BaseMessage
from zExceptions import ExceptionFormatter from zExceptions import ExceptionFormatter
from BTrees.OIBTree import OIBTree from BTrees.OIBTree import OIBTree
...@@ -153,7 +154,8 @@ MESSAGE_NOT_EXECUTED = 0 ...@@ -153,7 +154,8 @@ MESSAGE_NOT_EXECUTED = 0
MESSAGE_EXECUTED = 1 MESSAGE_EXECUTED = 1
MESSAGE_NOT_EXECUTABLE = 2 MESSAGE_NOT_EXECUTABLE = 2
class Message:
class Message(BaseMessage):
"""Activity Message Class. """Activity Message Class.
Message instances are stored in an activity queue, inside the Activity Tool. Message instances are stored in an activity queue, inside the Activity Tool.
......
...@@ -32,6 +32,7 @@ import unittest ...@@ -32,6 +32,7 @@ import unittest
from Testing import ZopeTestCase from Testing import ZopeTestCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import DummyMailHost from Products.ERP5Type.tests.utils import DummyMailHost
from Products.ERP5Type.TransactionalVariable import getTransactionalVariable
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE,\ from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE,\
VALIDATE_ERROR_STATE VALIDATE_ERROR_STATE
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
...@@ -49,7 +50,6 @@ from DateTime import DateTime ...@@ -49,7 +50,6 @@ from DateTime import DateTime
import cPickle as pickle import cPickle as pickle
from Products.CMFActivity.ActivityTool import Message from Products.CMFActivity.ActivityTool import Message
import random import random
from Products.CMFActivity.ActivityRuntimeEnvironment import setActivityRuntimeValue, clearActivityRuntimeEnvironment
import threading import threading
try: try:
...@@ -2802,44 +2802,32 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -2802,44 +2802,32 @@ class TestCMFActivity(ERP5TypeTestCase):
delattr(Organisation, 'mustRunAfter') delattr(Organisation, 'mustRunAfter')
def CheckActivityRuntimeEnvironment(self, activity): def CheckActivityRuntimeEnvironment(self, activity):
organisation = self.getPortal().organisation_module.newContent(portal_type='Organisation') document = self.portal.organisation_module
get_transaction().commit() activity_result = []
self.tic()
activity_tool = self.getActivityTool()
check_result_dict = {}
initial_list_check_value = [1, 2]
def extractActivityRuntimeEnvironment(self): def extractActivityRuntimeEnvironment(self):
setActivityRuntimeValue('list_check', initial_list_check_value) activity_result.append(self.getActivityRuntimeEnvironment())
environment = self.getActivityRuntimeEnvironment() document.__class__.doSomething = extractActivityRuntimeEnvironment
check_result_dict['environment'] = environment
def runAndCheck():
check_result_dict.clear()
self.assertFalse('environment' in check_result_dict)
get_transaction().commit()
self.tic()
self.assertTrue('environment' in check_result_dict)
Organisation.extractActivityRuntimeEnvironment = extractActivityRuntimeEnvironment
try: try:
# Check that organisation.getActivityRuntimeEnvironment raises outside document.activate(activity=activity).doSomething()
# of activities. get_transaction().commit()
clearActivityRuntimeEnvironment() # Check that getActivityRuntimeEnvironment raises outside of activities
#organisation.getActivityRuntimeEnvironment() self.assertRaises(KeyError, document.getActivityRuntimeEnvironment)
self.assertRaises(AttributeError, organisation.getActivityRuntimeEnvironment)
# Check Runtime isolation # Check Runtime isolation
setActivityRuntimeValue('blah', True) self.tic()
organisation.activate(activity=activity).extractActivityRuntimeEnvironment() # Check that it still raises outside of activities
runAndCheck() self.assertRaises(KeyError, document.getActivityRuntimeEnvironment)
self.assertEqual(check_result_dict['environment'].get('blah'), None) # Check activity runtime environment instance
# Check Runtime presence env = activity_result.pop()
self.assertTrue(len(check_result_dict['environment']) > 0) self.assertFalse(activity_result)
self.assertTrue('processing_node' in check_result_dict['environment']) message = env._message
# Check Runtime does a deepcopy self.assertEqual(message.line.priority, 1)
self.assertTrue('list_check' in check_result_dict['environment']) self.assertEqual(message.object_path, document.getPhysicalPath())
check_result_dict['environment']['list_check'].append(3) self.assertTrue(message.conflict_retry) # default value
self.assertTrue(check_result_dict['environment']['list_check'] != \ env.edit(max_retry=0, conflict_retry=False)
initial_list_check_value) self.assertFalse(message.conflict_retry) # edited value
self.assertRaises(AttributeError, env.edit, foo='bar')
finally: finally:
delattr(Organisation, 'extractActivityRuntimeEnvironment') del document.__class__.doSomething
def test_104_activityRuntimeEnvironmentSQLDict(self, quiet=0, run=run_all_test): def test_104_activityRuntimeEnvironmentSQLDict(self, quiet=0, run=run_all_test):
if not run: return if not run: return
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment