Commit 7b634f22 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: remove dead code and simplify

parent 6338f120
...@@ -35,12 +35,6 @@ from cStringIO import StringIO ...@@ -35,12 +35,6 @@ from cStringIO import StringIO
import transaction import transaction
# Error values for message validation
EXCEPTION = -1
VALID = 0
INVALID_PATH = 1
INVALID_ORDER = 2
# Time global parameters # Time global parameters
MAX_PROCESSING_TIME = 900 # in seconds MAX_PROCESSING_TIME = 900 # in seconds
VALIDATION_ERROR_DELAY = 15 # in seconds VALIDATION_ERROR_DELAY = 15 # in seconds
...@@ -96,52 +90,6 @@ class Queue(object): ...@@ -96,52 +90,6 @@ class Queue(object):
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
raise NotImplementedError raise NotImplementedError
def validate(self, activity_tool, message, check_order_validation=1, **kw):
"""
This is the place where activity semantics is implemented
**kw contains all parameters which allow to implement synchronisation,
constraints, delays, etc.
Standard synchronisation parameters:
after_method_id -- never validate message if after_method_id
is in the list of methods which are
going to be executed
after_message_uid -- never validate message if after_message_uid
is in the list of messages which are
going to be executed
after_path -- never validate message if after_path
is in the list of path which are
going to be executed
"""
try:
if activity_tool.unrestrictedTraverse(message.object_path, None) is None:
# Do not try to call methods on objects which do not exist
LOG('CMFActivity', WARNING,
'Object %s does not exist' % '/'.join(message.object_path))
return INVALID_PATH
if check_order_validation:
for k, v in kw.iteritems():
if activity_tool.validateOrder(message, k, v):
return INVALID_ORDER
except ConflictError:
raise
except:
LOG('CMFActivity', WARNING,
'Validation of Object %s raised exception' % '/'.join(message.object_path),
error=sys.exc_info())
# Do not try to call methods on objects which cause errors
return EXCEPTION
return VALID
def getDependentMessageList(self, activity_tool, message):
message_list = []
for k, v in message.activity_kw.iteritems():
message_list += activity_tool.getDependentMessageList(message, k, v)
return message_list
def getExecutableMessageList(self, activity_tool, message, message_dict, def getExecutableMessageList(self, activity_tool, message, message_dict,
validation_text_dict, now_date=None): validation_text_dict, now_date=None):
"""Get messages which have no dependent message, and store them in the dictionary. """Get messages which have no dependent message, and store them in the dictionary.
...@@ -165,7 +113,7 @@ class Queue(object): ...@@ -165,7 +113,7 @@ class Queue(object):
cached_result = validation_text_dict.get(message.order_validation_text) cached_result = validation_text_dict.get(message.order_validation_text)
if cached_result is None: if cached_result is None:
message_list = self.getDependentMessageList(activity_tool, message) message_list = activity_tool.getDependentMessageList(message)
transaction.commit() # Release locks. transaction.commit() # Release locks.
if message_list: if message_list:
# The result is not empty, so this message is not executable. # The result is not empty, so this message is not executable.
......
...@@ -40,7 +40,7 @@ from Products.CMFActivity.ActivityTool import ( ...@@ -40,7 +40,7 @@ from Products.CMFActivity.ActivityTool import (
Message, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, SkippedMessage) Message, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, SkippedMessage)
from Products.CMFActivity.ActivityRuntimeEnvironment import ( from Products.CMFActivity.ActivityRuntimeEnvironment import (
DEFAULT_MAX_RETRY, ActivityRuntimeEnvironment) DEFAULT_MAX_RETRY, ActivityRuntimeEnvironment)
from Queue import Queue, VALIDATION_ERROR_DELAY, VALID, INVALID_PATH from Queue import Queue, VALIDATION_ERROR_DELAY
from Products.CMFActivity.Errors import ActivityFlushError from Products.CMFActivity.Errors import ActivityFlushError
# Stop validating more messages when this limit is reached # Stop validating more messages when this limit is reached
...@@ -743,17 +743,14 @@ class SQLBase(Queue): ...@@ -743,17 +743,14 @@ class SQLBase(Queue):
except AttributeError: except AttributeError:
pass pass
line = getattr(message, 'line', None) line = getattr(message, 'line', None)
validate_value = VALID if line and line.processing_node != -1 else \ if (line and line.processing_node != -1 or
message.validate(self, activity_tool) not activity_tool.getDependentMessageList(message)):
if validate_value == VALID:
# Try to invoke the message - what happens if invoke calls flushActivity ?? # Try to invoke the message - what happens if invoke calls flushActivity ??
with ActivityRuntimeEnvironment(message): with ActivityRuntimeEnvironment(message):
activity_tool.invoke(message) activity_tool.invoke(message)
if message.getExecutionState() != MESSAGE_EXECUTED: if message.getExecutionState() != MESSAGE_EXECUTED:
raise ActivityFlushError('Could not invoke %s on %s' raise ActivityFlushError('Could not invoke %s on %s'
% (message.method_id, path)) % (message.method_id, path))
elif validate_value is INVALID_PATH:
raise ActivityFlushError('The document %s does not exist' % path)
else: else:
raise ActivityFlushError('Could not validate %s on %s' raise ActivityFlushError('Could not validate %s on %s'
% (message.method_id, path)) % (message.method_id, path))
......
...@@ -367,11 +367,6 @@ class Message(BaseMessage): ...@@ -367,11 +367,6 @@ class Message(BaseMessage):
except: except:
self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool) self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
def validate(self, activity, activity_tool, check_order_validation=1):
return activity.validate(activity_tool, self,
check_order_validation=check_order_validation,
**self.activity_kw)
def notifyUser(self, activity_tool, retry=False): def notifyUser(self, activity_tool, retry=False):
"""Notify the user that the activity failed.""" """Notify the user that the activity failed."""
portal = activity_tool.getPortalObject() portal = activity_tool.getPortalObject()
...@@ -1553,22 +1548,17 @@ class ActivityTool (BaseTool): ...@@ -1553,22 +1548,17 @@ class ActivityTool (BaseTool):
REQUEST['RESPONSE'].redirect( 'manage_main' ) REQUEST['RESPONSE'].redirect( 'manage_main' )
return obj return obj
# Active synchronisation methods
security.declarePrivate('validateOrder')
def validateOrder(self, message, validator_id, validation_value):
message_list = self.getDependentMessageList(message, validator_id, validation_value)
return len(message_list) > 0
security.declarePrivate('getDependentMessageList') security.declarePrivate('getDependentMessageList')
def getDependentMessageList(self, message, validator_id, validation_value): def getDependentMessageList(self, message):
message_list = [] message_list = []
method_id = "_validate_" + validator_id for validator_id, validation_value in message.activity_kw.iteritems():
for activity in activity_dict.itervalues(): method_id = "_validate_" + validator_id
method = getattr(activity, method_id, None) for activity in activity_dict.itervalues():
if method is not None: method = getattr(activity, method_id, None)
result = method(aq_inner(self), message, validation_value) if method is not None:
if result: result = method(self, message, validation_value)
message_list += [(activity, m) for m in result] if result:
message_list += [(activity, m) for m in result]
return message_list return message_list
# Required for tests (time shift) # Required for tests (time shift)
......
...@@ -555,37 +555,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -555,37 +555,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(o.getTitle(), 'a') self.assertEqual(o.getTitle(), 'a')
self.assertEqual(activity_tool.countMessageWithTag('toto'), 0) self.assertEqual(activity_tool.countMessageWithTag('toto'), 0)
def TryConflictErrorsWhileValidating(self, activity):
"""Try to execute active objects which may throw conflict errors
while validating, and check if they are still executed."""
o = self.getOrganisation()
# Monkey patch Queue to induce conflict errors artificially.
def validate(self, *args, **kwargs):
if Queue.current_num_conflict_errors < Queue.conflict_errors_limit:
Queue.current_num_conflict_errors += 1
# LOG('TryConflictErrorsWhileValidating', 0, 'causing a conflict error artificially')
raise ConflictError
return self.original_validate(*args, **kwargs)
from Products.CMFActivity.Activity.Queue import Queue
Queue.original_validate = Queue.validate
Queue.validate = validate
try:
# Test some range of conflict error occurences.
for i in xrange(10):
Queue.current_num_conflict_errors = 0
Queue.conflict_errors_limit = i
o.activate(activity = activity).getId()
self.commit()
self.flushAllActivities(silent = 1, loop_size = i + 10)
self.assertFalse(self.portal.portal_activities.getMessageList())
finally:
Queue.validate = Queue.original_validate
del Queue.original_validate
del Queue.current_num_conflict_errors
del Queue.conflict_errors_limit
def TryErrorsWhileFinishingCommitDB(self, activity): def TryErrorsWhileFinishingCommitDB(self, activity):
"""Try to execute active objects which may throw conflict errors """Try to execute active objects which may throw conflict errors
while validating, and check if they are still executed.""" while validating, and check if they are still executed."""
...@@ -1056,24 +1025,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1056,24 +1025,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
del activity_tool.__class__.doSomething del activity_tool.__class__.doSomething
self.assertFalse(activity_tool.getMessageList()) self.assertFalse(activity_tool.getMessageList())
def test_72_TestConflictErrorsWhileValidatingWithSQLDict(self):
"""
Test if conflict errors spoil out active objects with SQLDict.
"""
self.TryConflictErrorsWhileValidating('SQLDict')
def test_73_TestConflictErrorsWhileValidatingWithSQLQueue(self):
"""
Test if conflict errors spoil out active objects with SQLQueue.
"""
self.TryConflictErrorsWhileValidating('SQLQueue')
def test_74_TestConflictErrorsWhileValidatingWithSQLJoblib(self):
"""
Test if conflict errors spoil out active objects with SQLJoblib.
"""
self.TryConflictErrorsWhileValidating('SQLJoblib')
def test_75_TestErrorsWhileFinishingCommitDBWithSQLDict(self): def test_75_TestErrorsWhileFinishingCommitDBWithSQLDict(self):
""" """
""" """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment