Commit 8ccc40e0 authored by Jean-Paul Smets's avatar Jean-Paul Smets

New API for active process (start, stop, notify, result)


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@573 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 457a40f0
......@@ -34,9 +34,13 @@ from Acquisition import aq_base
from zLOG import LOG
DEFAULT_ACTIVITY = 'SQLDict'
#DEFAULT_ACTIVITY = 'ZODBDict'
#DEFAULT_ACTIVITY = 'RAMDict'
# Processing node are used to store processing state or processing node
DISTRIBUTABLE_STATE = -1
INVOKE_ERROR_STATE = -2
VALIDATE_ERROR_STATE = -3
STOP_STATE = -4
POSITIVE_NODE_STATE = 'Positive Node State' # Special state which allows to select positive nodes
class ActiveObject(ExtensionClass.Base):
......@@ -95,6 +99,20 @@ class ActiveObject(ExtensionClass.Base):
# there can not be any activity
return 0
security.declareProtected( CMFCorePermissions.View, 'hasErrorActivity' )
def hasErrorActivity(self, **kw):
"""
Tells if an object if active
"""
return self.hasActivity(processing_node = INVOKE_ERROR_STATE)
security.declareProtected( CMFCorePermissions.View, 'hasInvalidActivity' )
def hasInvalidActivity(self, **kw):
"""
Tells if an object if active
"""
return self.hasActivity(processing_node = VALIDATE_ERROR_STATE)
security.declareProtected( CMFCorePermissions.View, 'getActiveProcess' )
def getActiveProcess(self):
activity_tool = getattr(self, 'portal_activities', None)
......
......@@ -33,6 +33,7 @@ from Products.CMFCore import CMFCorePermissions
from Products.ERP5Type.Base import Base
from Products.ERP5Type import PropertySheet
from BTrees.IOBTree import IOBTree
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from zLOG import LOG
......@@ -98,7 +99,8 @@ class ActiveProcess(Base):
def postResult(self, result):
if not hasattr(self, 'result_list'):
self.result_list = IOBTree()
self.result_list[self._generateNewId()] = result
result.id = self._generateNewId()
self.result_list[result.id] = result
security.declareProtected(CMFCorePermissions.ManagePortal, 'getResultList')
def getResultList(self, **kw):
......@@ -126,4 +128,42 @@ class ActiveProcess(Base):
#if callable(result):
# return self.activateResult(Result(self, 'activateResult',result())
security.declareProtected( CMFCorePermissions.View, 'hasActivity' )
def hasActivity(self, **kw):
"""
Tells if an object if active
"""
activity_tool = getattr(self, 'portal_activities', None)
if activity_tool is None: return 0 # Do nothing if no portal_activities
return activity_tool.hasActivity(None, active_process = self, **kw)
security.declareProtected( CMFCorePermissions.View, 'hasErrorActivity' )
def hasErrorActivity(self, **kw):
"""
Tells if an object if active
"""
return self.hasActivity(processing_node = INVOKE_ERROR_STATE)
security.declareProtected( CMFCorePermissions.View, 'hasInvalidActivity' )
def hasInvalidActivity(self, **kw):
"""
Tells if an object if active
"""
return self.hasActivity(processing_node = VALIDATE_ERROR_STATE)
def start():
# start activities related to this process
pass
def stop():
# stop activities related to this process
pass
def flush(self):
# flush activities related to this process
activity_tool = getattr(self, 'portal_activities', None)
if activity_tool is None: return # Do nothing if no portal_activities
return activity_tool.flush(None, active_process = self, invoke = 0) # FLush
InitializeClass( ActiveProcess )
......@@ -123,12 +123,20 @@ class Queue:
def isAwake(self, activity_tool, processing_node):
return self.is_awake[processing_node]
def hasActivity(self, activity_tool, object, **kw):
def hasActivity(self, activity_tool, object, processing_node=None, active_process=None, **kw):
return 0
def flush(self, activity_tool, object, **kw):
pass
def start(self, active_process=None):
# Start queue / activities in queue for given process
pass
def stop(self, active_process=None):
# Stop queue / activities in queue for given process
pass
def loadMessage(self, s):
return pickle.loads(s)
......@@ -137,3 +145,4 @@ class Queue:
def getMessageList(self, activity_tool, processing_node=None):
return []
......@@ -28,6 +28,7 @@
from Products.CMFActivity.ActivityTool import registerActivity
from Queue import Queue
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from zLOG import LOG
......@@ -57,7 +58,7 @@ class RAMDict(Queue):
return 0
return 1
def hasActivity(self, object, method_id=None, **kw):
def hasActivity(self, activity_tool, object, **kw):
object_path = object.getPhysicalPath()
for m in self.dict.values():
if m.object_path == object_path:
......
......@@ -28,6 +28,7 @@
from Products.CMFActivity.ActivityTool import registerActivity
from Queue import Queue
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
class RAMQueue(Queue):
"""
......@@ -49,7 +50,7 @@ class RAMQueue(Queue):
del self.queue[0]
return 0 # Keep on ticking
def hasActivity(self, object, method_id=None, **kw):
def hasActivity(self, activity_tool, object, **kw):
object_path = object.getPhysicalPath()
for m in self.queue:
if m.object_path == object_path:
......
......@@ -29,15 +29,12 @@
import random
from Products.CMFActivity.ActivityTool import registerActivity
from RAMDict import RAMDict
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from zLOG import LOG
MAX_PRIORITY = 5
DISTRIBUTABLE_STATE = -1
INVOKE_ERROR_STATE = -2
VALIDATE_ERROR_STATE = -3
priority_weight = \
[1] * 64 + \
[2] * 20 + \
......@@ -60,6 +57,7 @@ class SQLDict(RAMDict):
method_id = m.method_id,
priority = m.activity_kw.get('priority', 1),
message = self.dumpMessage(m))
# Also store uid of activity
def dequeueMessage(self, activity_tool, processing_node):
priority = random.choice(priority_weight)
......@@ -88,6 +86,7 @@ class SQLDict(RAMDict):
if len(uid_list) > 0:
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE)
# Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else:
# Lower priority
......@@ -100,8 +99,13 @@ class SQLDict(RAMDict):
activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
if m.is_executed: # Make sure message could be invoked
if len(uid_list) > 0:
activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it
activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it
get_transaction().commit() # If successful, commit
if m.active_process:
active_process = activity_tool.unrestrictedTraverse(m.active_process)
if not active_process.hasActivity():
# Not more activity
m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
else:
get_transaction().abort() # If not, abort transaction and start a new one
if line.priority > MAX_PRIORITY:
......@@ -120,9 +124,9 @@ class SQLDict(RAMDict):
get_transaction().commit() # Release locks before starting a potentially long calculation
return 1
def hasActivity(self, activity_tool, object, method_id=None, **kw):
def hasActivity(self, activity_tool, object, **kw):
my_object_path = '/'.join(object.getPhysicalPath())
result = activity_tool.SQLDict_hasMessage(path=my_object_path, method_id=method_id)
result = activity_tool.SQLDict_hasMessage(path=my_object_path, method_id=method_id, **kw)
if len(result) > 0:
return result[0].message_count > 0
return 0
......@@ -169,6 +173,14 @@ class SQLDict(RAMDict):
if len(uid_list) > 0:
activity_tool.SQLDict_delMessage(uid = uid_list) # Delete all "old" messages (not -1 processing)
def start(self, activity_tool, active_process=None):
uid_list = activity_tool.SQLDict_readUidList(path=path, active_process=active_process)
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = DISTRIBUTABLE_STATE)
def stop(self, activity_tool, active_process=None):
uid_list = activity_tool.SQLDict_readUidList(path=path, active_process=active_process)
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = STOP_STATE)
def getMessageList(self, activity_tool, processing_node=None):
# YO: reading all lines might cause a deadlock
message_list = []
......
......@@ -29,15 +29,12 @@
import random
from Products.CMFActivity.ActivityTool import registerActivity
from RAMQueue import RAMQueue
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from zLOG import LOG
MAX_PRIORITY = 5
DISTRIBUTABLE_STATE = -1
INVOKE_ERROR_STATE = -2
VALIDATE_ERROR_STATE = -3
priority_weight = \
[1] * 64 + \
[2] * 20 + \
......@@ -99,6 +96,7 @@ class SQLQueue(RAMQueue):
# This is an error
activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE)
# Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else:
# Lower priority
......@@ -108,9 +106,9 @@ class SQLQueue(RAMQueue):
get_transaction().commit() # Release locks before starting a potentially long calculation
return 1
def hasActivity(self, activity_tool, object, method_id=None, **kw):
def hasActivity(self, activity_tool, object, **kw):
my_object_path = '/'.join(object.getPhysicalPath())
result = activity_tool.SQLQueue_hasMessage(path=my_object_path, method_id=method_id)
result = activity_tool.SQLQueue_hasMessage(path=my_object_path, **kw)
if len(result) > 0:
return result[0].message_count > 0
return 0
......@@ -155,6 +153,14 @@ class SQLQueue(RAMQueue):
# Erase all messages in a single transaction
activity_tool.SQLQueue_delMessage(path=path, method_id=method_id) # Delete all "old" messages (not -1 processing)
def start(self, activity_tool, active_process=None):
uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process)
activity_tool.SQLQueue_assignMessage(uid = uid_list, processing_node = DISTRIBUTABLE_STATE)
def stop(self, activity_tool, active_process=None):
uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process)
activity_tool.SQLQueue_assignMessage(uid = uid_list, processing_node = STOP_STATE)
def getMessageList(self, activity_tool, processing_node=None):
message_list = []
result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node=None)
......
......@@ -34,9 +34,9 @@ from Products.CMFCore.utils import UniqueObject, _checkPermission, _getAuthentic
from Globals import InitializeClass, DTMLFile, get_request
from Acquisition import aq_base
from DateTime.DateTime import DateTime
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
import threading
from zLOG import LOG
# Using a RAM property (not a property of an instance) allows
......@@ -60,7 +60,7 @@ def registerActivity(activity):
class Result:
def __init__(self, object_or_path, method_id, result, title=None, id=None, message=None):
def __init__(self, object_or_path, method_id, result, log_title=None, log_id=None, log_message=None):
# Some utility function to do this would be useful since we use it everywhere XXX
if type(object_or_path) in (type([]), type(())):
url = '/'.join(object_or_path)
......@@ -71,12 +71,13 @@ class Result:
else:
path = object_or_path.getPhysicalPath()
url = '/'.join(path)
self.path = path
self.url = url
self.result = result # Include arbitrary result
self.title = title # Should follow Zope convention for LOG title
self.id = id # Should follow Zope convention for LOG ids
self.message = message # Should follow Zope convention for LOG message
self.object_path = path
self.object_url = url
self.method_id = method_id
self.result = result # Include arbitrary result
self.log_title = log_title # Should follow Zope convention for LOG title
self.log_id = log_id # Should follow Zope convention for LOG ids
self.log_message = log_message # Should follow Zope convention for LOG message
allow_class(Result)
......@@ -92,12 +93,14 @@ class Message:
self.active_process = None
else:
self.active_process = active_process.getPhysicalPath()
self.active_process_uid = active_process.getUid()
self.activity_kw = activity_kw
self.method_id = method_id
self.args = args
self.kw = kw
self.is_executed = 0
# User Info ? REQUEST Info ?
self.user_name = str(_getAuthenticatedUser(self))
# Store REQUEST Info ?
def __call__(self, activity_tool):
try:
......@@ -121,6 +124,20 @@ class Message:
def validate(self, activity, activity_tool):
return activity.validate(activity_tool, self, **self.activity_kw)
def notifyUser(self, activity_tool, message="Failed Processing Activity"):
user_email = activity_tool.portal_memberdata.getProperty('email')
mail_text = """From: %s
To: %s
Subject: %s
%s
Document: %s
Method: %s
""" % (activity_tool.email_from_address, user_email,
message, message, '/'.join(self.object_path), self.method_id)
activity_tool.MailHost.send( mail_text )
class Method:
def __init__(self, passive_self, activity, active_process, kw, method_id):
......@@ -293,6 +310,20 @@ class ActivityTool (Folder, UniqueObject):
LOG('CMFActivity: ', 0, 'flushing activity %s' % activity.__class__.__name__)
activity.flush(self, object_path, invoke=invoke, **kw)
def start(self, **kw):
global is_initialized
if not is_initialized: self.initialize()
for activity in activity_list:
LOG('CMFActivity: ', 0, 'starting activity %s' % activity.__class__.__name__)
activity.start(self, **kw)
def stop(self, **kw):
global is_initialized
if not is_initialized: self.initialize()
for activity in activity_list:
LOG('CMFActivity: ', 0, 'starting activity %s' % activity.__class__.__name__)
activity.stop(self, **kw)
def invoke(self, message):
message(self)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment