Commit bbde4298 authored by Vincent Pelletier's avatar Vincent Pelletier

Add run-time activity environment data container, and fill it from SQLDict and SQLQueue.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@19405 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent fa4c15f8
...@@ -32,6 +32,7 @@ from Acquisition import aq_base ...@@ -32,6 +32,7 @@ from Acquisition import aq_base
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
from Products.CMFCore.utils import getToolByName from Products.CMFCore.utils import getToolByName
from Products.ERP5Type.TransactionalVariable import getTransactionalVariable from Products.ERP5Type.TransactionalVariable import getTransactionalVariable
from ActivityRuntimeEnvironment import getActivityRuntimeEnvironment
try: try:
from Products.CMFCore import permissions from Products.CMFCore import permissions
...@@ -201,3 +202,5 @@ class ActiveObject(ExtensionClass.Base): ...@@ -201,3 +202,5 @@ class ActiveObject(ExtensionClass.Base):
key = ('default_activate_parameter', id(aq_base(self))) key = ('default_activate_parameter', id(aq_base(self)))
return tv.get(key) return tv.get(key)
def getActivityRuntimeEnvironment(self):
return getActivityRuntimeEnvironment()
...@@ -37,6 +37,7 @@ import sys ...@@ -37,6 +37,7 @@ import sys
from types import ClassType from types import ClassType
#from time import time #from time import time
from SQLBase import SQLBase from SQLBase import SQLBase
from Products.CMFActivity.ActivityRuntimeEnvironment import setActivityRuntimeValue, updateActivityRuntimeValue, clearActivityRuntimeEnvironment
try: try:
from transaction import get as get_transaction from transaction import get as get_transaction
...@@ -428,12 +429,19 @@ class SQLDict(RAMDict, SQLBase): ...@@ -428,12 +429,19 @@ class SQLDict(RAMDict, SQLBase):
if group_method_id is not None: if group_method_id is not None:
group_method_id = group_method_id.split('\0')[0] group_method_id = group_method_id.split('\0')[0]
message_list = [x[1] for x in message_uid_priority_list] message_list = [x[1] for x in message_uid_priority_list]
clearActivityRuntimeEnvironment()
if group_method_id not in (None, ""): if group_method_id not in (None, ""):
setActivityRuntimeValue('group_method_id', group_method_id)
method = activity_tool.invokeGroup method = activity_tool.invokeGroup
args = (group_method_id, message_list) args = (group_method_id, message_list)
else: else:
method = activity_tool.invoke method = activity_tool.invoke
args = (message_list[0], ) message = message_list[0]
args = (message, )
updateActivityRuntimeValue({'activity_kw': message.activity_kw,
'priority': message_uid_priority_list[0][2],
'uid': message_uid_priority_list[0][0]})
setActivityRuntimeValue('processing_node', processing_node)
# Commit right before executing messages. # Commit right before executing messages.
# As MySQL transaction do no start exactly at the same time as ZODB # As MySQL transaction do no start exactly at the same time as ZODB
# transactions but a bit later, messages available might be called # transactions but a bit later, messages available might be called
......
...@@ -38,6 +38,7 @@ import sys ...@@ -38,6 +38,7 @@ import sys
from time import time from time import time
from sets import ImmutableSet from sets import ImmutableSet
from SQLBase import SQLBase from SQLBase import SQLBase
from Products.CMFActivity.ActivityRuntimeEnvironment import setActivityRuntimeValue, updateActivityRuntimeValue, clearActivityRuntimeEnvironment
try: try:
from transaction import get as get_transaction from transaction import get as get_transaction
...@@ -273,6 +274,11 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -273,6 +274,11 @@ class SQLQueue(RAMQueue, SQLBase):
# everything needed from MySQL to get a fresh view of ZODB objects. # everything needed from MySQL to get a fresh view of ZODB objects.
get_transaction().commit() get_transaction().commit()
for value in message_uid_priority_list: for value in message_uid_priority_list:
clearActivityRuntimeEnvironment()
updateActivityRuntimeValue({'processing_node': processing_node,
'activity_kw': value[1].activity_kw,
'priority': value[2],
'uid': value[0]})
processed_message_uid_list.append(value) processed_message_uid_list.append(value)
# Try to invoke # Try to invoke
try: try:
......
import threading
import copy
activity_runtime_environment_container = threading.local()
def getActivityRuntimeEnvironment():
"""
Raises AttributeError if called outside activity.
"""
return copy.deepcopy(activity_runtime_environment_container.current)
def _getActivityRuntimeEnvironment():
current = getattr(activity_runtime_environment_container, 'current', None)
if current is None:
current = activity_runtime_environment_container.current = {}
return current
def setActivityRuntimeValue(key, value):
"""
TODO: protect against unauthorized use ?
"""
_getActivityRuntimeEnvironment()[key] = value
def updateActivityRuntimeValue(new_dict):
"""
TODO: protect against unauthorized use ?
"""
_getActivityRuntimeEnvironment().update(new_dict)
def clearActivityRuntimeEnvironment():
if getattr(activity_runtime_environment_container, 'current', None) is not None:
delattr(activity_runtime_environment_container, 'current')
...@@ -43,6 +43,7 @@ from DateTime import DateTime ...@@ -43,6 +43,7 @@ from DateTime import DateTime
import cPickle as pickle import cPickle as pickle
from Products.CMFActivity.ActivityTool import Message from Products.CMFActivity.ActivityTool import Message
import random import random
from Products.CMFActivity.ActivityRuntimeEnvironment import setActivityRuntimeValue, clearActivityRuntimeEnvironment
try: try:
from transaction import get as get_transaction from transaction import get as get_transaction
...@@ -2677,6 +2678,63 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -2677,6 +2678,63 @@ class TestCMFActivity(ERP5TypeTestCase):
finally: finally:
delattr(Organisation, 'mustRunBefore') delattr(Organisation, 'mustRunBefore')
delattr(Organisation, 'mustRunAfter') delattr(Organisation, 'mustRunAfter')
def CheckActivityRuntimeEnvironment(self, activity):
organisation = self.getPortal().organisation_module.newContent(portal_type='Organisation')
get_transaction().commit()
self.tic()
activity_tool = self.getActivityTool()
check_result_dict = {}
initial_list_check_value = [1, 2]
def extractActivityRuntimeEnvironment(self):
setActivityRuntimeValue('list_check', initial_list_check_value)
environment = self.getActivityRuntimeEnvironment()
check_result_dict['environment'] = environment
def runAndCheck():
check_result_dict.clear()
self.assertFalse('environment' in check_result_dict)
get_transaction().commit()
self.tic()
self.assertTrue('environment' in check_result_dict)
Organisation.extractActivityRuntimeEnvironment = extractActivityRuntimeEnvironment
try:
# Check that organisation.getActivityRuntimeEnvironment raises outside
# of activities.
clearActivityRuntimeEnvironment()
#organisation.getActivityRuntimeEnvironment()
self.assertRaises(AttributeError, organisation.getActivityRuntimeEnvironment)
# Check Runtime isolation
setActivityRuntimeValue('blah', True)
organisation.activate(activity=activity).extractActivityRuntimeEnvironment()
runAndCheck()
self.assertEqual(check_result_dict['environment'].get('blah'), None)
# Check Runtime presence
self.assertTrue(len(check_result_dict['environment']) > 0)
self.assertTrue('processing_node' in check_result_dict['environment'])
# Check Runtime does a deepcopy
self.assertTrue('list_check' in check_result_dict['environment'])
check_result_dict['environment']['list_check'].append(3)
self.assertTrue(check_result_dict['environment']['list_check'] != \
initial_list_check_value)
finally:
delattr(Organisation, 'extractActivityRuntimeEnvironment')
def test_104_activityRuntimeEnvironment(self, quiet=0, run=run_all_test):
if not run: return
if not quiet:
message = '\nCheck ActivityRuntimeEnvironment (SQLDict)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.CheckActivityRuntimeEnvironment('SQLDict')
def test_105_TryChangeSkinInActivitySQLQueue(self, quiet=0, run=run_all_test):
if not run: return
if not quiet:
message = '\nCheck ActivityRuntimeEnvironment (SQLQueue)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.CheckActivityRuntimeEnvironment('SQLQueue')
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity)) suite.addTest(unittest.makeSuite(TestCMFActivity))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment