Commit 5a2621ab authored by Jean-Paul Smets's avatar Jean-Paul Smets

extended validation initial implementation


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@1179 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent ad8b1ee3
...@@ -31,6 +31,12 @@ from Acquisition import aq_base ...@@ -31,6 +31,12 @@ from Acquisition import aq_base
from Products.CMFActivity.ActivityTool import Message from Products.CMFActivity.ActivityTool import Message
from zLOG import LOG from zLOG import LOG
# Error values for message validation
EXCEPTION = -1
VALID = 0
INVALID_PATH = 1
INVALID_ORDER = 2
class Queue: class Queue:
""" """
Step 1: use lists Step 1: use lists
...@@ -109,22 +115,41 @@ class Queue: ...@@ -109,22 +115,41 @@ class Queue:
self.is_awake[processing_node] = 0 self.is_awake[processing_node] = 0
self.is_alive[processing_node] = 0 self.is_alive[processing_node] = 0
def validate(self, activity_tool, message, wait_for=None, **kw): def validate(self, activity_tool, message, **kw):
"""
This is the place where activity semantics is implemented
**kw contains all parameters which allow to implement synchronisation,
constraints, delays, etc.
Standard synchronisation parameters:
after_method_id -- never validate message if after_method_id
is in the list of methods which are
going to be executed
after_message_uid -- never validate message if after_message_uid
is in the list of messages which are
going to be executed
after_path -- never validate message if after_path
is in the list of path which are
going to be executed
"""
try: try:
if activity_tool.unrestrictedTraverse(message.object_path) is None: if activity_tool.unrestrictedTraverse(message.object_path) is None:
# Do not try to call methods on objects which do not exist # Do not try to call methods on objects which do not exist
LOG('WARNING ActivityTool', 0, LOG('WARNING ActivityTool', 0,
'Object %s does not exist' % '/'.join(message.object_path)) 'Object %s does not exist' % '/'.join(message.object_path))
return 0 return INVALID_PATH
for k, v in kw.items():
if activity_tool.validateOrder(k, message, v):
return INVALID_ORDER
except: except:
LOG('WARNING ActivityTool', 0, LOG('WARNING ActivityTool', 0,
'Object %s could not be accessed' % '/'.join(message.object_path)) 'Object %s could not be accessed' % '/'.join(message.object_path))
# Do not try to call methods on objects which cause errors # Do not try to call methods on objects which cause errors
return 0 return EXCEPTION
if wait_for is not None: return VALID
if wait_for():
return 0
return 1
def isAwake(self, activity_tool, processing_node): def isAwake(self, activity_tool, processing_node):
return self.is_awake[processing_node] return self.is_awake[processing_node]
......
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
############################################################################## ##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity from Products.CMFActivity.ActivityTool import registerActivity
from Queue import Queue from Queue import Queue, VALID
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from zLOG import LOG from zLOG import LOG
...@@ -79,7 +79,7 @@ class RAMDict(Queue): ...@@ -79,7 +79,7 @@ class RAMDict(Queue):
if len(self.getDict(activity_tool).keys()) is 0: if len(self.getDict(activity_tool).keys()) is 0:
return 1 # Go to sleep return 1 # Go to sleep
for key, m in self.getDict(activity_tool).items(): for key, m in self.getDict(activity_tool).items():
if m.validate(self, activity_tool): if m.validate(self, activity_tool) is VALID:
activity_tool.invoke(m) activity_tool.invoke(m)
if m.is_executed: if m.is_executed:
del self.getDict(activity_tool)[key] del self.getDict(activity_tool)[key]
...@@ -110,7 +110,7 @@ class RAMDict(Queue): ...@@ -110,7 +110,7 @@ class RAMDict(Queue):
if not method_dict.has_key(m.method_id): if not method_dict.has_key(m.method_id):
if invoke: if invoke:
# First Validate # First Validate
if m.validate(self, activity_tool): if m.validate(self, activity_tool) is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if not m.is_executed: # Make sure message could be invoked if not m.is_executed: # Make sure message could be invoked
# The message no longer exists # The message no longer exists
......
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
############################################################################## ##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity from Products.CMFActivity.ActivityTool import registerActivity
from Queue import Queue from Queue import Queue, VALID
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
class RAMQueue(Queue): class RAMQueue(Queue):
...@@ -63,7 +63,7 @@ class RAMQueue(Queue): ...@@ -63,7 +63,7 @@ class RAMQueue(Queue):
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
for m in self.getQueue(activity_tool): for m in self.getQueue(activity_tool):
if not m.validate(self, activity_tool): if m.validate(self, activity_tool) is not VALID:
self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling) self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling)
get_transaction().commit() # Start a new transaction get_transaction().commit() # Start a new transaction
return 0 # Keep on ticking return 0 # Keep on ticking
...@@ -89,7 +89,7 @@ class RAMQueue(Queue): ...@@ -89,7 +89,7 @@ class RAMQueue(Queue):
# Parse each message in registered # Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self): for m in activity_tool.getRegisteredMessageList(self):
if object_path == m.object_path and (method_id is None or method_id == m.method_id): if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if not m.validate(self, activity_tool): if m.validate(self, activity_tool) is not VALID:
activity_tool.unregisterMessage(self, m) # Trash messages which are not validated (no error handling) activity_tool.unregisterMessage(self, m) # Trash messages which are not validated (no error handling)
else: else:
if invoke: if invoke:
...@@ -101,7 +101,7 @@ class RAMQueue(Queue): ...@@ -101,7 +101,7 @@ class RAMQueue(Queue):
# Parse each message in queue # Parse each message in queue
for m in self.getQueue(activity_tool): for m in self.getQueue(activity_tool):
if object_path == m.object_path and (method_id is None or method_id == m.method_id): if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if not m.validate(self, activity_tool): if m.validate(self, activity_tool) is not VALID:
self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling) self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling)
else: else:
if invoke: if invoke:
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
import random import random
from Products.CMFActivity.ActivityTool import registerActivity from Products.CMFActivity.ActivityTool import registerActivity
from Queue import VALID
from RAMDict import RAMDict from RAMDict import RAMDict
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
...@@ -123,11 +124,11 @@ class SQLDict(RAMDict): ...@@ -123,11 +124,11 @@ class SQLDict(RAMDict):
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
# This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid = line.uid)
# Make sure object exists # Validate message (make sure object exists, priority OK, etc.)
if not m.validate(self, activity_tool): if m.validate(self, activity_tool) is not VALID:
if line.priority > MAX_PRIORITY: if line.priority > MAX_PRIORITY:
# This is an error # This is an error
if len(uid_list) > 0: if len(uid_list) > 0: # Add some delay here
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE) activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE)
# Assign message back to 'error' state # Assign message back to 'error' state
#m.notifyUser(activity_tool) # Notify Error #m.notifyUser(activity_tool) # Notify Error
...@@ -205,7 +206,7 @@ class SQLDict(RAMDict): ...@@ -205,7 +206,7 @@ class SQLDict(RAMDict):
method_dict[m.method_id] = 1 # Prevents calling invoke twice method_dict[m.method_id] = 1 # Prevents calling invoke twice
if invoke: if invoke:
# First Validate # First Validate
if m.validate(self, activity_tool): if m.validate(self, activity_tool) is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if not m.is_executed: # Make sure message could be invoked if not m.is_executed: # Make sure message could be invoked
# The message no longer exists # The message no longer exists
...@@ -227,7 +228,7 @@ class SQLDict(RAMDict): ...@@ -227,7 +228,7 @@ class SQLDict(RAMDict):
self.deleteMessage(activity_tool, m) self.deleteMessage(activity_tool, m)
if invoke: if invoke:
# First Validate # First Validate
if m.validate(self, activity_tool): if m.validate(self, activity_tool) is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if not m.is_executed: # Make sure message could be invoked if not m.is_executed: # Make sure message could be invoked
# The message no longer exists # The message no longer exists
......
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
import random import random
from Products.CMFActivity.ActivityTool import registerActivity from Products.CMFActivity.ActivityTool import registerActivity
from RAMQueue import RAMQueue from RAMQueue import RAMQueue
from Queue import VALID
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from zLOG import LOG from zLOG import LOG
...@@ -81,7 +82,7 @@ class SQLQueue(RAMQueue): ...@@ -81,7 +82,7 @@ class SQLQueue(RAMQueue):
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
m = self.loadMessage(line.message) m = self.loadMessage(line.message)
# Make sure object exists # Make sure object exists
if not m.validate(self, activity_tool): if m.validate(self, activity_tool) is not VALID:
if line.priority > MAX_PRIORITY: if line.priority > MAX_PRIORITY:
# This is an error # This is an error
activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE) activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE)
...@@ -160,7 +161,7 @@ class SQLQueue(RAMQueue): ...@@ -160,7 +161,7 @@ class SQLQueue(RAMQueue):
self.deleteMessage(activity_tool, m) self.deleteMessage(activity_tool, m)
if invoke: if invoke:
# First Validate # First Validate
if m.validate(self, activity_tool): if m.validate(self, activity_tool) is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if not m.is_executed: # Make sure message could be invoked if not m.is_executed: # Make sure message could be invoked
# The message no longer exists # The message no longer exists
......
...@@ -416,5 +416,15 @@ class ActivityTool (Folder, UniqueObject): ...@@ -416,5 +416,15 @@ class ActivityTool (Folder, UniqueObject):
return self.unrestrictedTraverse(active_process) return self.unrestrictedTraverse(active_process)
return None return None
# Active synchronisation methods
def validateOrder(self, message, validator_id, validation_value):
global is_initialized
if not is_initialized: self.initialize()
for activity in activity_list:
method_id = "_validate_%s" % validator_id
if hasattr(activity, method_id):
if getattr(activity,method_id)(self, message, validation_value):
return 1
return 0
InitializeClass(ActivityTool) InitializeClass(ActivityTool)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment