Commit 05c3a1d1 authored by Sebastien Robin's avatar Sebastien Robin

- Fixed horrible bug : flush (called in manageInvoke) was

  deleting messages wich were not VALID, without execution !!
- Wrote a test to show that the bug is currently fixed.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@13180 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 3d01d5c8
...@@ -159,6 +159,7 @@ class Queue: ...@@ -159,6 +159,7 @@ class Queue:
error=sys.exc_info()) error=sys.exc_info())
# Do not try to call methods on objects which cause errors # Do not try to call methods on objects which cause errors
return EXCEPTION return EXCEPTION
LOG('Queue.validate, return',0,'VALID')
return VALID return VALID
def isAwake(self, activity_tool, processing_node): def isAwake(self, activity_tool, processing_node):
......
...@@ -406,7 +406,6 @@ class SQLDict(RAMDict): ...@@ -406,7 +406,6 @@ class SQLDict(RAMDict):
# Parse each message in registered # Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self): for m in activity_tool.getRegisteredMessageList(self):
if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id): if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id):
activity_tool.unregisterMessage(self, m)
#if not method_dict.has_key(method_id or m.method_id): #if not method_dict.has_key(method_id or m.method_id):
if not method_dict.has_key(m.method_id): if not method_dict.has_key(m.method_id):
method_dict[m.method_id] = 1 # Prevents calling invoke twice method_dict[m.method_id] = 1 # Prevents calling invoke twice
...@@ -423,6 +422,10 @@ class SQLDict(RAMDict): ...@@ -423,6 +422,10 @@ class SQLDict(RAMDict):
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'The document %s does not exist' % path) 'The document %s does not exist' % path)
else:
raise ActivityFlushError, (
'Could not validate %s on %s' % (m.method_id , path))
activity_tool.unregisterMessage(self, m)
# Parse each message in SQL dict # Parse each message in SQL dict
result = readMessageList(path=path, method_id=method_id, result = readMessageList(path=path, method_id=method_id,
processing_node=None,include_processing=0) processing_node=None,include_processing=0)
...@@ -435,7 +438,6 @@ class SQLDict(RAMDict): ...@@ -435,7 +438,6 @@ class SQLDict(RAMDict):
# node and minimize network traffic with ZEO server # node and minimize network traffic with ZEO server
method_dict[method_id] = 1 method_dict[method_id] = 1
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid = line.uid)
self.deleteMessage(activity_tool, m)
if invoke: if invoke:
# First Validate # First Validate
validate_value = m.validate(self, activity_tool) validate_value = m.validate(self, activity_tool)
...@@ -447,10 +449,14 @@ class SQLDict(RAMDict): ...@@ -447,10 +449,14 @@ class SQLDict(RAMDict):
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'Could not evaluate %s on %s' % (m.method_id , path)) 'Could not evaluate %s on %s' % (m.method_id , path))
if validate_value is INVALID_PATH: elif validate_value is INVALID_PATH:
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'The document %s does not exist' % path) 'The document %s does not exist' % path)
else:
raise ActivityFlushError, (
'Could not validate %s on %s' % (m.method_id , path))
self.deleteMessage(activity_tool, m)
def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw): def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw):
# YO: reading all lines might cause a deadlock # YO: reading all lines might cause a deadlock
......
...@@ -222,7 +222,6 @@ class SQLQueue(RAMQueue): ...@@ -222,7 +222,6 @@ class SQLQueue(RAMQueue):
# Only invoke once (it would be different for a queue) # Only invoke once (it would be different for a queue)
method_dict[method_id] = 1 method_dict[method_id] = 1
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid = line.uid)
self.deleteMessage(activity_tool, m)
if invoke: if invoke:
# First Validate # First Validate
if m.validate(self, activity_tool) is VALID: if m.validate(self, activity_tool) is VALID:
...@@ -235,6 +234,7 @@ class SQLQueue(RAMQueue): ...@@ -235,6 +234,7 @@ class SQLQueue(RAMQueue):
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'The document %s does not exist' % path) 'The document %s does not exist' % path)
self.deleteMessage(activity_tool, m)
# def start(self, activity_tool, active_process=None): # def start(self, activity_tool, active_process=None):
# uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process) # uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process)
......
...@@ -40,7 +40,7 @@ from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase ...@@ -40,7 +40,7 @@ from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE,\ from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE,\
VALIDATE_ERROR_STATE VALIDATE_ERROR_STATE
from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
from Products.CMFActivity.Errors import ActivityPendingError from Products.CMFActivity.Errors import ActivityPendingError, ActivityFlushError
from Products.ERP5Type.Document.Organisation import Organisation from Products.ERP5Type.Document.Organisation import Organisation
from AccessControl.SecurityManagement import newSecurityManager from AccessControl.SecurityManagement import newSecurityManager
from zLOG import LOG from zLOG import LOG
...@@ -552,6 +552,37 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -552,6 +552,37 @@ class TestCMFActivity(ERP5TypeTestCase):
self.tic() self.tic()
self.assertEquals(o.getCorporateName(), 'cd') self.assertEquals(o.getCorporateName(), 'cd')
def TryFlushActivityWithAfterTag(self, activity):
"""
Ensure the order of an execution by a tag
"""
portal = self.getPortal()
organisation_module = self.getOrganisationModule()
if not organisation_module.hasContent(self.company_id):
organisation_module.newContent(id=self.company_id)
o = portal.organisation._getOb(self.company_id)
o.setTitle('?')
o.setDescription('?')
self.assertEquals(o.getTitle(), '?')
self.assertEquals(o.getDescription(), '?')
get_transaction().commit()
self.tic()
o.activate(after_tag = 'toto', activity = activity).setDescription('b')
o.activate(tag = 'toto', activity = activity).setTitle('a')
get_transaction().commit()
tool = self.getActivityTool()
self.assertRaises(ActivityFlushError,tool.manageInvoke,o.getPath(),'setDescription')
tool.manageInvoke(o.getPath(),'setTitle')
get_transaction().commit()
self.assertEquals(o.getTitle(), 'a')
self.assertEquals(o.getDescription(), '?')
self.tic()
self.assertEquals(len(tool.getMessageList()),0)
self.assertEquals(o.getTitle(), 'a')
self.assertEquals(o.getDescription(), 'b')
def CheckScheduling(self, activity): def CheckScheduling(self, activity):
""" """
Check if active objects with different after parameters are executed in a correct order Check if active objects with different after parameters are executed in a correct order
...@@ -1583,6 +1614,24 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -1583,6 +1614,24 @@ class TestCMFActivity(ERP5TypeTestCase):
LOG('Testing... ', 0, message) LOG('Testing... ', 0, message)
self.TryErrorsWhileFinishingCommitDB('SQLQueue') self.TryErrorsWhileFinishingCommitDB('SQLQueue')
def test_74_TryFlushActivityWithAfterTagSQLDict(self, quiet=0, run=run_all_test):
# Test if after_tag can be used
if not run: return
if not quiet:
message = '\nTry Flus Activity With After Tag With SQL Dict'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryFlushActivityWithAfterTag('SQLDict')
def test_75_TryFlushActivityWithAfterTagWithSQLQueue(self, quiet=0, run=run_all_test):
# Test if after_tag can be used
if not run: return
if not quiet:
message = '\nTry Flush Activity With After Tag With SQL Queue'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryFlushActivityWithAfterTag('SQLQueue')
if __name__ == '__main__': if __name__ == '__main__':
framework() framework()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment