ActivityTool.py 16.5 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
Jean-Paul Smets's avatar
Jean-Paul Smets committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

from Products.CMFCore import CMFCorePermissions
30
from Products.ERP5Type.Document.Folder import Folder
31 32
from Products.ERP5Type.Utils import getPath
from Products.ERP5Type.Error import Error
33
from Products.PythonScripts.Utility import allow_class
Jean-Paul Smets's avatar
Jean-Paul Smets committed
34 35
from AccessControl import ClassSecurityInfo
from Products.CMFCore.utils import UniqueObject, _checkPermission, _getAuthenticatedUser
36
from Globals import InitializeClass, DTMLFile, get_request
Jean-Paul Smets's avatar
Jean-Paul Smets committed
37 38
from Acquisition import aq_base
from DateTime.DateTime import DateTime
39
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
40
from ActivityBuffer import ActivityBuffer
41
from AccessControl.SecurityManagement import newSecurityManager
Jean-Paul Smets's avatar
Jean-Paul Smets committed
42
import threading
43
import sys
Jean-Paul Smets's avatar
Jean-Paul Smets committed
44 45 46 47 48 49 50 51

from zLOG import LOG

# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
is_initialized = 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
52
tic_lock = threading.Lock() # A RAM based lock
Jean-Paul Smets's avatar
Jean-Paul Smets committed
53 54 55 56 57 58 59 60 61 62 63 64 65 66

# Activity Registration
activity_dict = {}
activity_list = []

def registerActivity(activity):
  # Must be rewritten to register
  # class and create instance for each activity
  LOG('Init Activity', 0, str(activity.__name__))
  activity_instance = activity()
  activity_list.append(activity_instance)
  activity_dict[activity.__name__] = activity_instance

class Message:
67
  
68
  def __init__(self, object, active_process, activity_kw, method_id, args, kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
69 70 71 72
    if type(object) is type('a'):
      self.object_path = object.split('/')
    else:
      self.object_path = object.getPhysicalPath()
73 74 75 76 77 78
    if type(active_process) is type('a'):
      self.active_process = active_process.split('/')
    elif active_process is None:
      self.active_process = None
    else:
      self.active_process = active_process.getPhysicalPath()
79
      self.active_process_uid = active_process.getUid()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
80 81 82 83
    self.activity_kw = activity_kw
    self.method_id = method_id
    self.args = args
    self.kw = kw
Jean-Paul Smets's avatar
Jean-Paul Smets committed
84
    self.is_executed = 0
85 86
    self.user_name = str(_getAuthenticatedUser(self))
    # Store REQUEST Info ?
Jean-Paul Smets's avatar
Jean-Paul Smets committed
87 88

  def __call__(self, activity_tool):
89
    try:
90 91
      LOG('WARNING ActivityTool', 0,
           'Trying to call method %s on object %s' % (self.method_id, self.object_path))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
92
      object = activity_tool.unrestrictedTraverse(self.object_path)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
93
      # Change user if required (TO BE DONE)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
94
      activity_tool._v_active_process = self.active_process # Store the active_process as volatile thread variable
95 96 97 98 99 100 101
      # We will change the user only in order to execute this method
      current_user = str(_getAuthenticatedUser(self))
      uf = object.getPortalObject().acl_users
      user = uf.getUserById(self.user_name)
      if user is not None:
        user = user.__of__(uf)
        newSecurityManager(None, user)
102
      result = getattr(object, self.method_id)(*self.args, **self.kw)
103 104 105 106
      # Use again the previous user
      if user is not None:
        user = uf.getUserById(current_user).__of__(uf)
        newSecurityManager(None, user)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
107
      if activity_tool._v_active_process is not None:
108
        active_process = activity_tool.getActiveProcess()
109 110 111 112 113 114
        if isinstance(result,Error):
          result.edit(object_path=object)
          result.edit(method_id=self.method_id)
          active_process.activateResult(result) # XXX Allow other method_id in future
        else:
          active_process.activateResult(Error(object_path=object,method_id=self.method_id,result=result)) # XXX Allow other method_id in future
Jean-Paul Smets's avatar
Jean-Paul Smets committed
115
      self.is_executed = 1
116
    except:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
117
      self.is_executed = 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
118
      LOG('WARNING ActivityTool', 0,
119
           'Could not call method %s on object %s' % (self.method_id, self.object_path), error=sys.exc_info())
Jean-Paul Smets's avatar
Jean-Paul Smets committed
120 121 122 123

  def validate(self, activity, activity_tool):
    return activity.validate(activity_tool, self, **self.activity_kw)

124
  def notifyUser(self, activity_tool, message="Failed Processing Activity"):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
125 126 127 128 129 130
    #LOG('notifyUser begin', 0, str(self.user_name))
    user_email = activity_tool.portal_membership.getMemberById(self.user_name).getProperty('email')
    if user_email in ('', None):
      user_email = activity_tool.email_from_address
    #LOG('notifyUser user_email', 0, str(user_email))
    mail_text = """From: %s
131 132 133 134 135 136 137 138 139
To: %s
Subject: %s

%s

Document: %s
Method: %s
    """ % (activity_tool.email_from_address, user_email,
           message, message, '/'.join(self.object_path), self.method_id)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
140 141 142
    #LOG('notifyUser mail_text', 0, str(mail_text))
    activity_tool.MailHost.send( mail_text )
    #LOG('notifyUser send', 0, '')
143

Jean-Paul Smets's avatar
Jean-Paul Smets committed
144 145
class Method:

146
  def __init__(self, passive_self, activity, active_process, kw, method_id):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
147 148
    self.__passive_self = passive_self
    self.__activity = activity
149
    self.__active_process = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
150 151 152 153
    self.__kw = kw
    self.__method_id = method_id

  def __call__(self, *args, **kw):
154
    m = Message(self.__passive_self, self.__active_process, self.__kw, self.__method_id, args, kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
155 156
    activity_dict[self.__activity].queueMessage(self.__passive_self.portal_activities, m)

157 158
allow_class(Method)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
159 160
class ActiveWrapper:

161
  def __init__(self, passive_self, activity, active_process, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
162 163
    self.__dict__['__passive_self'] = passive_self
    self.__dict__['__activity'] = activity
164
    self.__dict__['__active_process'] = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
165 166 167 168
    self.__dict__['__kw'] = kw

  def __getattr__(self, id):
    return Method(self.__dict__['__passive_self'], self.__dict__['__activity'],
169
                  self.__dict__['__active_process'],
Jean-Paul Smets's avatar
Jean-Paul Smets committed
170 171
                  self.__dict__['__kw'], id)

172
class ActivityTool (Folder, UniqueObject):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
173
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
174 175 176 177 178 179 180 181 182 183 184 185
    ActivityTool is the central point for activity management.

    Improvement to consider to reduce locks:

      Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
              thus allowing all SQL transaction to happen in a very short time
              (this would also be a great way of using MyISAM tables)

      Idea 2: do the same at the level of ActivityTool

      Idea 3: do the same at the level of each activity (ie. queueMessage
              accumulates and fires messages at the end of the transactino)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
186 187 188
    """
    id = 'portal_activities'
    meta_type = 'CMF Activity Tool'
189
    portal_type = 'Activity Tool'
190
    allowed_types = ( 'CMF Active Process', )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
191 192
    security = ClassSecurityInfo()

193 194
    manage_options = tuple(
                     [ { 'label' : 'Overview', 'action' : 'manage_overview' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
195 196
                     , { 'label' : 'Activities', 'action' : 'manageActivities' }
                     ,
197
                     ] + list(Folder.manage_options))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
198 199 200 201

    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
    manageActivities = DTMLFile( 'dtml/manageActivities', globals() )

202 203 204 205 206
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
    manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )

    def __init__(self):
        return Folder.__init__(self, ActivityTool.id)
207
    
208 209 210 211 212 213 214 215 216 217
    # Filter content (ZMI))
    def filtered_meta_types(self, user=None):
        # Filters the list of available meta types.
        all = ActivityTool.inheritedAttribute('filtered_meta_types')(self)
        meta_types = []
        for meta_type in self.all_meta_types():
            if meta_type['name'] in self.allowed_types:
                meta_types.append(meta_type)
        return meta_types

Jean-Paul Smets's avatar
Jean-Paul Smets committed
218 219
    def initialize(self):
      global is_initialized
Sebastien Robin's avatar
Sebastien Robin committed
220
      from Activity import RAMQueue, RAMDict, SQLQueue, SQLDict
Jean-Paul Smets's avatar
Jean-Paul Smets committed
221 222 223 224 225
      # Initialize each queue
      for activity in activity_list:
        activity.initialize(self)
      is_initialized = 1

Jean-Paul Smets's avatar
Jean-Paul Smets committed
226 227 228 229 230 231 232 233 234 235
    security.declarePublic('distribute')
    def distribute(self, node_count=1):
      """
        Distribute load
      """
      # Initialize if needed
      if not is_initialized: self.initialize()

      # Call distribute on each queue
      for activity in activity_list:
236
        try:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
237
          activity.distribute(self, node_count)
238
        except:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
239 240
          LOG('CMFActivity:', 100, 'Core call to distribute failed for activity %s' % activity)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
241
    security.declarePublic('tic')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
242
    def tic(self, processing_node=1, force=0):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
243 244
      """
        Starts again an activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
245
        processing_node starts from 1 (there is not node 0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
246 247 248 249
      """
      global active_threads, is_initialized

      # return if the number of threads is too high
Jean-Paul Smets's avatar
Jean-Paul Smets committed
250
      if active_threads >= max_active_threads:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
251 252
        if not force: return 'Too many threads'

Jean-Paul Smets's avatar
Jean-Paul Smets committed
253
      if tic_lock is None:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
254 255 256 257 258 259
        return

      # Initialize if needed
      if not is_initialized: self.initialize()

      # increase the number of active_threads
Jean-Paul Smets's avatar
Jean-Paul Smets committed
260
      tic_lock.acquire()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
261
      active_threads += 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
262
      tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
263 264 265

      # Wakeup each queue
      for activity in activity_list:
266
        try:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
267
          activity.wakeup(self, processing_node)
268
        except:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
269 270 271 272 273 274 275
          LOG('CMFActivity:', 100, 'Core call to wakeup failed for activity %s' % activity)

      # Process messages on each queue in round robin
      has_awake_activity = 1
      while has_awake_activity:
        has_awake_activity = 0
        for activity in activity_list:
276
          try:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
277 278
            activity.tic(self, processing_node) # Transaction processing is the responsability of the activity
            has_awake_activity = has_awake_activity or activity.isAwake(self, processing_node)
279
          except:
280
            LOG('CMFActivity:', 100, 'Core call to tic or isAwake failed for activity %s' % activity, error=sys.exc_info())
Jean-Paul Smets's avatar
Jean-Paul Smets committed
281 282

      # decrease the number of active_threads
Jean-Paul Smets's avatar
Jean-Paul Smets committed
283
      tic_lock.acquire()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
284
      active_threads -= 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
285
      tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
286

287
    def hasActivity(self, *args, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
288
      # Check in each queue if the object has deferred tasks
289 290 291 292 293
      # if not argument is provided, then check on self
      if len(args) > 0:
        object = args[0]
      else:
        object = self
Jean-Paul Smets's avatar
Jean-Paul Smets committed
294 295 296 297 298
      for activity in activity_list:
        if activity.hasActivity(self, object, **kw):
          return 1
      return 0

299
    def activate(self, object, activity, active_process, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
300 301
      global is_initialized
      if not is_initialized: self.initialize()
302
      if not hasattr(self, '_v_activity_buffer'): self._v_activity_buffer = ActivityBuffer()
303
      return ActiveWrapper(object, activity, active_process, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
304

305 306 307 308 309 310
    def deferredQueueMessage(self, activity, message):
      self._v_activity_buffer.deferredQueueMessage(self, activity, message)
      
    def deferredDeleteMessage(self, activity, message):
      self._v_activity_buffer.deferredDeleteMessage(self, activity, message)
          
Jean-Paul Smets's avatar
Jean-Paul Smets committed
311
    def getRegisteredMessageList(self, activity):
312 313 314 315
      activity_buffer = getattr(self, '_v_activity_buffer', None)
      #if getattr(self, '_v_activity_buffer', None):
      if activity_buffer is not None:
        activity_buffer._register() # This is required if flush flush is called outside activate
316 317 318
        return activity.getRegisteredMessageList(self._v_activity_buffer, self)
      else:
        return []
Jean-Paul Smets's avatar
Jean-Paul Smets committed
319 320
          
    def unregisterMessage(self, activity, message):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
321
      self._v_activity_buffer._register() # Required if called by flush, outside activate
Jean-Paul Smets's avatar
Jean-Paul Smets committed
322
      return activity.unregisterMessage(self._v_activity_buffer, self, message)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
323
          
Jean-Paul Smets's avatar
Jean-Paul Smets committed
324 325 326
    def flush(self, object, invoke=0, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
327 328 329 330 331
      if not hasattr(self, '_v_activity_buffer'): self._v_activity_buffer = ActivityBuffer()
      if type(object) is type(()):
        object_path = object
      else:
        object_path = object.getPhysicalPath()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
332 333 334 335
      for activity in activity_list:
        LOG('CMFActivity: ', 0, 'flushing activity %s' % activity.__class__.__name__)
        activity.flush(self, object_path, invoke=invoke, **kw)

336 337 338 339 340 341 342 343 344 345 346 347 348 349
    def start(self, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
        LOG('CMFActivity: ', 0, 'starting activity %s' % activity.__class__.__name__)
        activity.start(self, **kw)

    def stop(self, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
        LOG('CMFActivity: ', 0, 'starting activity %s' % activity.__class__.__name__)
        activity.stop(self, **kw)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
350 351 352
    def invoke(self, message):
      message(self)

353 354
    def newMessage(self, activity, path, active_process, activity_kw, method_id, *args, **kw):
      # Some Security Cheking should be made here XXX
Jean-Paul Smets's avatar
Jean-Paul Smets committed
355 356
      global is_initialized
      if not is_initialized: self.initialize()
357
      if not hasattr(self, '_v_activity_buffer'): self._v_activity_buffer = ActivityBuffer()
358
      activity_dict[activity].queueMessage(self, Message(path, active_process, activity_kw, method_id, args, kw))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
359 360 361 362 363 364 365

    def manageInvoke(self, object_path, method_id, REQUEST=None):
      """
        Invokes all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
366
      self.flush(object_path,method_id=method_id,invoke=1)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
367 368 369 370 371 372 373 374 375
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(), 'manageActivities'))

    def manageCancel(self, object_path, method_id, REQUEST=None):
      """
        Cancel all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
376
      self.flush(object_path,method_id=method_id,invoke=0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
377 378 379 380 381 382 383 384
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(), 'manageActivities'))

    security.declarePublic('getMessageList')
    def getMessageList(self):
      """
        List messages waiting in queues
      """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
385 386 387
      # Initialize if needed
      if not is_initialized: self.initialize()

Jean-Paul Smets's avatar
Jean-Paul Smets committed
388 389
      message_list = []
      for activity in activity_list:
Sebastien Robin's avatar
Sebastien Robin committed
390 391 392 393
        try:
          message_list += activity.getMessageList(self)
        except AttributeError:
          LOG('getMessageList, could not get message from Activity:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
394 395
      return message_list

396
    security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
397
    def newActiveProcess(self, **kw):
398 399 400
      from ActiveProcess import addActiveProcess
      new_id = str(self.generateNewId())
      addActiveProcess(self, new_id)
401 402 403
      active_process = self._getOb(new_id)
      active_process.edit(**kw)
      return active_process
404 405 406 407 408

    def reindexObject(self):
      self.immediateReindexObject()

    def getActiveProcess(self):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
409 410 411
      active_process = getattr(self, '_v_active_process')
      if active_process:
        return self.unrestrictedTraverse(active_process)
412 413
      return None

414 415 416 417 418 419 420
    # Active synchronisation methods
    def validateOrder(self, message, validator_id, validation_value):
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
        method_id = "_validate_%s" % validator_id
        if hasattr(activity, method_id):
421
          LOG('CMFActivity: ', 0, 'validateOrder calling method_id %s' % method_id)
422 423 424
          if getattr(activity,method_id)(self, message, validation_value):
            return 1
      return 0
425 426 427 428 429 430 431 432

    # Required for tests (time shift)        
    def timeShift(self, delay):    
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
        activity.timeShift(self, delay)
                              
433
InitializeClass(ActivityTool)