Commit ca687acf authored by Jérome Perrin's avatar Jérome Perrin

Use getattr instead of hasattr when checking ActivityBuffer.

Attach the traceback to failure notification messages.
Add some docstrings.



git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@6958 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 0ae0218b
......@@ -30,13 +30,14 @@ import ExtensionClass
from AccessControl import ClassSecurityInfo
from Acquisition import aq_base
from ZODB.POSException import ConflictError
from Products.CMFCore.utils import getToolByName
try:
from Products.CMFCore import permissions
except ImportError:
from Products.CMFCore import CMFCorePermissions as permissions
from zLOG import LOG
from zLOG import LOG, WARNING
DEFAULT_ACTIVITY = 'SQLDict'
......@@ -45,16 +46,28 @@ DISTRIBUTABLE_STATE = -1
INVOKE_ERROR_STATE = -2
VALIDATE_ERROR_STATE = -3
STOP_STATE = -4
POSITIVE_NODE_STATE = 'Positive Node State' # Special state which allows to select positive nodes
# Special state which allows to select positive nodes
POSITIVE_NODE_STATE = 'Positive Node State'
class ActiveObject(ExtensionClass.Base):
"""Active Object Mixin Class.
Active object are objects whose methods are lazilly evaluated in the
Activity Queue. To use an active object, you just have to call the
method on the wrapper returned by the `activate` method like this:
>>> obj.activate().aMethod()
This will defer the call to obj.aMethod()
"""
security = ClassSecurityInfo()
def activate(self, activity=DEFAULT_ACTIVITY, active_process=None, passive_commit=0,
activate_kw=None,**kw):
"""
Reserved Optional parameters
def activate(self, activity=DEFAULT_ACTIVITY, active_process=None,
passive_commit=0, activate_kw=None, **kw):
"""Returns an active wrapper for this object.
Reserved Optional parameters:
at_date -- request execution date for this activate call
......@@ -68,8 +81,17 @@ class ActiveObject(ExtensionClass.Base):
after_path -- never validate message if after_path
is in the list of path which are
going to be executed
going to be executed
after_path_and_method_id
-- never validate message if a message for
method_id on path is in the queue.
tag -- add a tag to a message
after_tag -- never validate message if there is a message
tagged with this tag.
"""
# Get activate values from activate_kw, then _v_activate_kw
# only if they are not set directly as arguments to activate()
......@@ -79,11 +101,11 @@ class ActiveObject(ExtensionClass.Base):
kw[key] = value
# This volatile variable '_v_activate_kw' can be used to pass parameters
# automatically to activate.
if hasattr(self, '_v_activate_kw'):
if getattr(self, '_v_activate_kw', None) is not None:
for key,value in self._v_activate_kw.items():
if not kw.has_key(key):
kw[key] = value
activity_tool = getattr(self, 'portal_activities', None)
activity_tool = getToolByName(self, 'portal_activities', None)
if activity_tool is None: return self # Do nothing if no portal_activities
# activate returns an ActiveWrapper
# a queue can be provided as well as extra parameters
......@@ -93,7 +115,8 @@ class ActiveObject(ExtensionClass.Base):
except ConflictError:
raise
except:
LOG("WARNING CMFActivity:",0, 'could not create activity for %s' % self.getRelativeUrl())
LOG("CMFActivity", WARNING,
'could not create activity for %s' % self.getRelativeUrl())
# If the portal_activities were not created
# return a passive object
if passive_commit: get_transaction().commit()
......@@ -101,18 +124,13 @@ class ActiveObject(ExtensionClass.Base):
security.declareProtected( permissions.ModifyPortalContent, 'flushActivity' )
def flushActivity(self, invoke=0, **kw):
activity_tool = getattr(self, 'portal_activities', None)
activity_tool = getToolByName(self, 'portal_activities', None)
if activity_tool is None: return # Do nothing if no portal_activities
# flush all activities related to this object
#try:
if 1:
activity_tool.flush(self, invoke=invoke, **kw)
#except:
# # If the portal_activities were not created
# # nothing to do
# pass
security.declareProtected( permissions.ModifyPortalContent, 'recursiveFlushActivity' )
activity_tool.flush(self, invoke=invoke, **kw)
security.declareProtected( permissions.ModifyPortalContent,
'recursiveFlushActivity' )
def recursiveFlushActivity(self, invoke=0, **kw):
# flush all activities related to this object
self.flushActivity(invoke=invoke, **kw)
......@@ -123,10 +141,9 @@ class ActiveObject(ExtensionClass.Base):
security.declareProtected( permissions.View, 'hasActivity' )
def hasActivity(self, **kw):
"""Tells if there is pending activities for this object.
"""
Tells if an object if active
"""
activity_tool = getattr(self, 'portal_activities', None)
activity_tool = getToolByName(self, 'portal_activities', None)
if activity_tool is None: return 0 # Do nothing if no portal_activities
try:
return activity_tool.hasActivity(self, **kw)
......@@ -139,20 +156,19 @@ class ActiveObject(ExtensionClass.Base):
security.declareProtected( permissions.View, 'hasErrorActivity' )
def hasErrorActivity(self, **kw):
"""
Tells if an object if active
"""Tells if there is failed activities for this object.
"""
return self.hasActivity(processing_node = INVOKE_ERROR_STATE)
security.declareProtected( permissions.View, 'hasInvalidActivity' )
def hasInvalidActivity(self, **kw):
"""
Tells if an object if active
"""Tells if there is invalied activities for this object.
"""
return self.hasActivity(processing_node = VALIDATE_ERROR_STATE)
security.declareProtected( permissions.View, 'getActiveProcess' )
def getActiveProcess(self):
activity_tool = getattr(self, 'portal_activities', None)
activity_tool = getToolByName(self, 'portal_activities', None)
if activity_tool is None: return None # Do nothing if no portal_activities
return self.portal_activities.getActiveProcess()
return activity_tool.getActiveProcess()
......@@ -26,27 +26,27 @@
#
##############################################################################
import socket, asyncore, urllib
import socket
import urllib
import traceback
import threading
import sys
from types import TupleType, StringType
import re
from Products.CMFCore import CMFCorePermissions
from Products.ERP5Type.Document.Folder import Folder
from Products.ERP5Type.Utils import getPath
from Products.ERP5Type.Error import Error
from Products.PythonScripts.Utility import allow_class
from App.ApplicationManager import ApplicationManager
from AccessControl import ClassSecurityInfo, Permissions
from AccessControl.SecurityManagement import newSecurityManager, noSecurityManager
from Products.CMFCore.utils import UniqueObject, _checkPermission, _getAuthenticatedUser, getToolByName
from Globals import InitializeClass, DTMLFile, get_request
from Products.CMFCore.utils import UniqueObject, _getAuthenticatedUser, getToolByName
from Globals import InitializeClass, DTMLFile
from Acquisition import aq_base
from DateTime.DateTime import DateTime
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from ActivityBuffer import ActivityBuffer
import threading
import sys
from ZODB.POSException import ConflictError
from OFS.Traversable import NotFound
from types import TupleType, StringType
import re
from zLOG import LOG, INFO, WARNING
......@@ -81,7 +81,10 @@ def registerActivity(activity):
activity_dict[activity.__name__] = activity_instance
class Message:
"""Activity Message Class.
Message instances are stored in an activity queue, inside the Activity Tool.
"""
def __init__(self, object, active_process, activity_kw, method_id, args, kw):
if type(object) is StringType:
self.object_path = object.split('/')
......@@ -105,9 +108,11 @@ class Message:
# Store REQUEST Info ?
def getObject(self, activity_tool):
"""return the object referenced in this message."""
return activity_tool.unrestrictedTraverse(self.object_path)
def getObjectList(self, activity_tool):
"""return the list of object that can be expanded from this message."""
try:
expand_method_id = self.activity_kw['expand_method_id']
obj = self.getObject(activity_tool)
......@@ -119,15 +124,22 @@ class Message:
return object_list
def hasExpandMethod(self):
"""return true if the message has an expand method.
An expand method is used to expand the list of objects and to turn a
big recursive transaction affecting many objects into multiple
transactions affecting only one object at a time (this can prevent
duplicated method calls)."""
return self.activity_kw.has_key('expand_method_id')
def changeUser(self, user_name, activity_tool):
"""restore the security context for the calling user."""
uf = activity_tool.getPortalObject().acl_users
user = uf.getUserById(user_name)
# if the user is not found, try to get it from a parent acl_users
# XXX this is still far from perfect, because we need to store all informations
# about the user (like original user folder, roles) to replay the activity with
# exactly the same security context as if it had been executed without activity.
# XXX this is still far from perfect, because we need to store all
# informations about the user (like original user folder, roles) to
# replay the activity with exactly the same security context as if
# it had been executed without activity.
if user is None:
uf = activity_tool.getPortalObject().aq_parent.acl_users
user = uf.getUserById(user_name)
......@@ -135,7 +147,8 @@ class Message:
user = user.__of__(uf)
newSecurityManager(None, user)
else :
LOG("CMFActivity", 0, "Unable to find user %s in the portal" % user_name)
LOG("CMFActivity", WARNING,
"Unable to find user %s in the portal" % user_name)
noSecurityManager()
return user
......@@ -145,14 +158,16 @@ class Message:
if isinstance(result,Error):
result.edit(object_path=object)
result.edit(method_id=self.method_id)
active_process.activateResult(result) # XXX Allow other method_id in future
# XXX Allow other method_id in future
active_process.activateResult(result)
else:
active_process.activateResult(Error(object_path=object,method_id=self.method_id,result=result)) # XXX Allow other method_id in future
active_process.activateResult(
Error(object_path=object,
method_id=self.method_id,
result=result)) # XXX Allow other method_id in future
def __call__(self, activity_tool):
try:
# LOG('WARNING ActivityTool', 0,
# 'Trying to call method %s on object %s' % (self.method_id, self.object_path))
obj = self.getObject(activity_tool)
# Change user if required (TO BE DONE)
# We will change the user only in order to execute this method
......@@ -168,19 +183,26 @@ class Message:
self.is_executed = 1
except:
self.is_executed = 0
self.exc_type = sys.exc_info()[0]
exc_info = sys.exc_info()
self.exc_type = exc_info[0]
self.traceback = exc_info[2]
LOG('WARNING ActivityTool', 0,
'Could not call method %s on object %s' % (self.method_id, self.object_path), error=sys.exc_info())
'Could not call method %s on object %s' % (
self.method_id, self.object_path), error=exc_info)
def validate(self, activity, activity_tool):
return activity.validate(activity_tool, self, **self.activity_kw)
def notifyUser(self, activity_tool, message="Failed Processing Activity"):
#LOG('notifyUser begin', 0, str(self.user_name))
user_email = activity_tool.portal_membership.getMemberById(self.user_name).getProperty('email')
"""Notify the user that the activity failed."""
portal = activity_tool.getPortalObject()
user_email = None
user = portal.portal_membership.getMemberById(self.user_name)
if user is not None:
user_email = user.getProperty('email')
if user_email in ('', None):
user_email = getattr(activity_tool, 'email_to_address', activity_tool.email_from_address)
#LOG('notifyUser user_email', 0, str(user_email))
user_email = portal.getProperty('email_to_address',
portal.getProperty('email_from_address'))
mail_text = """From: %s
To: %s
Subject: %s
......@@ -189,11 +211,12 @@ Subject: %s
Document: %s
Method: %s
""" % (activity_tool.email_from_address, user_email,
message, message, '/'.join(self.object_path), self.method_id)
#LOG('notifyUser mail_text', 0, str(mail_text))
Traceback:
%s
""" % (activity_tool.email_from_address, user_email,
message, message, '/'.join(self.object_path), self.method_id,
''.join(traceback.format_tb(self.traceback)))
activity_tool.MailHost.send( mail_text )
#LOG('notifyUser send', 0, '')
def reactivate(self, activity_tool):
# Reactivate the original object.
......@@ -570,30 +593,31 @@ class ActivityTool (Folder, UniqueObject):
# Check in each queue if the object has deferred tasks
# if not argument is provided, then check on self
if len(args) > 0:
object = args[0]
obj = args[0]
else:
object = self
obj = self
for activity in activity_list:
if activity.hasActivity(self, object, **kw):
if activity.hasActivity(self, obj, **kw):
return 1
return 0
def activate(self, object, activity, active_process, **kw):
global is_initialized
if not is_initialized: self.initialize()
if not hasattr(self, '_v_activity_buffer'): self._v_activity_buffer = ActivityBuffer()
if getattr(self, '_v_activity_buffer', None) is None:
self._v_activity_buffer = ActivityBuffer()
return ActiveWrapper(object, activity, active_process, **kw)
def deferredQueueMessage(self, activity, message):
self._v_activity_buffer.deferredQueueMessage(self, activity, message)
def deferredDeleteMessage(self, activity, message):
if not hasattr(self, '_v_activity_buffer'): self._v_activity_buffer = ActivityBuffer()
if getattr(self, '_v_activity_buffer', None) is None:
self._v_activity_buffer = ActivityBuffer()
self._v_activity_buffer.deferredDeleteMessage(self, activity, message)
def getRegisteredMessageList(self, activity):
activity_buffer = getattr(self, '_v_activity_buffer', None)
#if getattr(self, '_v_activity_buffer', None):
if activity_buffer is not None:
activity_buffer._register() # This is required if flush flush is called outside activate
return activity.getRegisteredMessageList(self._v_activity_buffer, self)
......@@ -607,27 +631,25 @@ class ActivityTool (Folder, UniqueObject):
def flush(self, object, invoke=0, **kw):
global is_initialized
if not is_initialized: self.initialize()
if not hasattr(self, '_v_activity_buffer'): self._v_activity_buffer = ActivityBuffer()
if getattr(self, '_v_activity_buffer', None) is None:
self._v_activity_buffer = ActivityBuffer()
if type(object) is TupleType:
object_path = object
else:
object_path = object.getPhysicalPath()
for activity in activity_list:
# LOG('CMFActivity: ', 0, 'flushing activity %s' % activity.__class__.__name__)
activity.flush(self, object_path, invoke=invoke, **kw)
def start(self, **kw):
global is_initialized
if not is_initialized: self.initialize()
for activity in activity_list:
# LOG('CMFActivity: ', 0, 'starting activity %s' % activity.__class__.__name__)
activity.start(self, **kw)
def stop(self, **kw):
global is_initialized
if not is_initialized: self.initialize()
for activity in activity_list:
# LOG('CMFActivity: ', 0, 'starting activity %s' % activity.__class__.__name__)
activity.stop(self, **kw)
def invoke(self, message):
......@@ -659,7 +681,6 @@ class ActivityTool (Folder, UniqueObject):
if 'group_method_id' in activity_kw:
del activity_kw['group_method_id']
active_obj = subobj.activate(**activity_kw)
#LOG('ActivityTool', 0, 'sub object %r has an alternate method %r, so invoking separately' % (subobj, alternate_method_id))
getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
else:
expanded_object_list.append(subobj)
......@@ -674,7 +695,6 @@ class ActivityTool (Folder, UniqueObject):
if 'group_method_id' in activity_kw:
del activity_kw['group_method_id']
active_obj = obj.activate(**activity_kw)
#LOG('ActivityTool', 0, 'object %r has an alternate method %r, so invoking separately' % (obj, alternate_method_id))
getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
else:
expanded_object_list.append(obj)
......@@ -684,7 +704,8 @@ class ActivityTool (Folder, UniqueObject):
m.is_executed = 0
m.exc_type = sys.exc_info()[0]
LOG('WARNING ActivityTool', 0,
'Could not call method %s on object %s' % (m.method_id, m.object_path), error=sys.exc_info())
'Could not call method %s on object %s' %
(m.method_id, m.object_path), error=sys.exc_info())
try:
if len(expanded_object_list) > 0:
......@@ -702,7 +723,8 @@ class ActivityTool (Folder, UniqueObject):
m.is_executed = 0
m.exc_type = sys.exc_info()[0]
LOG('WARNING ActivityTool', 0,
'Could not call method %s on objects %s' % (method_id, expanded_object_list), error=sys.exc_info())
'Could not call method %s on objects %s' %
(method_id, expanded_object_list), error=sys.exc_info())
else:
# Obtain all indices of failed messages. Note that this can be a partial failure.
failed_message_dict = {}
......@@ -718,7 +740,8 @@ class ActivityTool (Folder, UniqueObject):
if i in failed_message_dict:
m.is_executed = 0
LOG('ActivityTool', WARNING,
'the method %s partially failed on object %s' % (m.method_id, m.object_path,))
'the method %s partially failed on object %s' %
(m.method_id, m.object_path,))
else:
try:
m.activateResult(self, result, object)
......@@ -727,14 +750,18 @@ class ActivityTool (Folder, UniqueObject):
m.is_executed = 0
m.exc_type = sys.exc_info()[0]
LOG('ActivityTool', WARNING,
'Could not call method %s on object %s' % (m.method_id, m.object_path), error=sys.exc_info())
'Could not call method %s on object %s' % (
m.method_id, m.object_path), error=sys.exc_info())
def newMessage(self, activity, path, active_process, activity_kw, method_id, *args, **kw):
def newMessage(self, activity, path, active_process,
activity_kw, method_id, *args, **kw):
# Some Security Cheking should be made here XXX
global is_initialized
if not is_initialized: self.initialize()
if not hasattr(self, '_v_activity_buffer'): self._v_activity_buffer = ActivityBuffer()
activity_dict[activity].queueMessage(self, Message(path, active_process, activity_kw, method_id, args, kw))
if getattr(self, '_v_activity_buffer', None) is None:
self._v_activity_buffer = ActivityBuffer()
activity_dict[activity].queueMessage(self,
Message(path, active_process, activity_kw, method_id, args, kw))
security.declareProtected( CMFCorePermissions.ManagePortal, 'manageInvoke' )
def manageInvoke(self, object_path, method_id, REQUEST=None):
......@@ -745,7 +772,8 @@ class ActivityTool (Folder, UniqueObject):
object_path = tuple(object_path.split('/'))
self.flush(object_path,method_id=method_id,invoke=1)
if REQUEST is not None:
return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(), 'manageActivities'))
return REQUEST.RESPONSE.redirect('%s/%s' %
(self.absolute_url(), 'manageActivities'))
security.declareProtected( CMFCorePermissions.ManagePortal, 'manageCancel' )
def manageCancel(self, object_path, method_id, REQUEST=None):
......@@ -756,9 +784,11 @@ class ActivityTool (Folder, UniqueObject):
object_path = tuple(object_path.split('/'))
self.flush(object_path,method_id=method_id,invoke=0)
if REQUEST is not None:
return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(), 'manageActivities'))
return REQUEST.RESPONSE.redirect('%s/%s' %
(self.absolute_url(), 'manageActivities'))
security.declareProtected( CMFCorePermissions.ManagePortal, 'manageClearActivities' )
security.declareProtected( CMFCorePermissions.ManagePortal,
'manageClearActivities' )
def manageClearActivities(self, keep=1, REQUEST=None):
"""
Clear all activities and recreate tables.
......@@ -775,7 +805,9 @@ class ActivityTool (Folder, UniqueObject):
except ConflictError:
raise
except:
LOG('ActivityTool', WARNING, 'could not dump messages from %s' % (activity,), error=sys.exc_info())
LOG('ActivityTool', WARNING,
'could not dump messages from %s' %
(activity,), error=sys.exc_info())
if hasattr(folder, 'SQLDict_createMessageTable'):
try:
......@@ -783,8 +815,7 @@ class ActivityTool (Folder, UniqueObject):
except ConflictError:
raise
except:
LOG('CMFActivities',
WARNING,
LOG('CMFActivity', WARNING,
'could not drop the message table',
error=sys.exc_info())
folder.SQLDict_createMessageTable()
......@@ -795,8 +826,7 @@ class ActivityTool (Folder, UniqueObject):
except ConflictError:
raise
except:
LOG('CMFActivities',
WARNING,
LOG('CMFActivity', WARNING,
'could not drop the message queue table',
error=sys.exc_info())
folder.SQLQueue_createMessageTable()
......@@ -809,11 +839,12 @@ class ActivityTool (Folder, UniqueObject):
raise
except:
LOG('ActivityTool', WARNING,
'could not reactivate the message %r, %r' % (m.object_path, m.method_id),
error=sys.exc_info())
'could not reactivate the message %r, %r' %
(m.object_path, m.method_id), error=sys.exc_info())
if REQUEST is not None:
return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(), 'manageActivitiesAdvanced?manage_tabs_message=Activities%20Cleared'))
return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(),
'manageActivitiesAdvanced?manage_tabs_message=Activities%20Cleared'))
security.declarePublic('getMessageList')
def getMessageList(self,**kw):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment