ActivityTool.py 41.7 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
Jean-Paul Smets's avatar
Jean-Paul Smets committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29 30 31 32
import socket
import urllib
import threading
import sys
Vincent Pelletier's avatar
Vincent Pelletier committed
33
from types import StringType
34 35
import re

Jean-Paul Smets's avatar
Jean-Paul Smets committed
36
from Products.CMFCore import CMFCorePermissions
Jean-Paul Smets's avatar
Jean-Paul Smets committed
37
from Products.ERP5Type.Core.Folder import Folder
38
from Products.CMFActivity.ActiveResult import ActiveResult
39
from Products.PythonScripts.Utility import allow_class
40
from AccessControl import ClassSecurityInfo, Permissions
Jérome Perrin's avatar
Jérome Perrin committed
41 42 43 44
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
from AccessControl.SecurityManagement import getSecurityManager
45 46
from Products.CMFCore.utils import UniqueObject, _getAuthenticatedUser, getToolByName
from Globals import InitializeClass, DTMLFile
Jean-Paul Smets's avatar
Jean-Paul Smets committed
47
from Acquisition import aq_base
48
from Acquisition import aq_inner
49
from ActivityBuffer import ActivityBuffer
50
from zExceptions import ExceptionFormatter
51
from BTrees.OIBTree import OIBTree
52

53
from ZODB.POSException import ConflictError
54
from Products.MailHost.MailHost import MailHostError
Jean-Paul Smets's avatar
Jean-Paul Smets committed
55

56
from zLOG import LOG, INFO, WARNING
57 58

try:
59
  from Products.TimerService import getTimerService
60
except ImportError:
61 62
  def getTimerService(self):
    pass
Jean-Paul Smets's avatar
Jean-Paul Smets committed
63

64
# minimal IP:Port regexp
65
NODE_RE = re.compile('^\d+\.\d+\.\d+\.\d+:\d+$')
66

Jean-Paul Smets's avatar
Jean-Paul Smets committed
67 68 69 70
# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
Vincent Pelletier's avatar
Vincent Pelletier committed
71
is_initialized = False
72 73
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
74
first_run = True
75 76 77
currentNode = None
ROLE_IDLE = 0
ROLE_PROCESSING = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
78 79 80 81

# Activity Registration
activity_dict = {}

82 83 84 85 86 87 88
# Here go ActivityBuffer instances
# Structure:
#  global_activity_buffer[activity_tool_path][thread_id] = ActivityBuffer
global_activity_buffer = {}
from thread import get_ident, allocate_lock
global_activity_buffer_lock = allocate_lock()

Jean-Paul Smets's avatar
Jean-Paul Smets committed
89 90 91
def registerActivity(activity):
  # Must be rewritten to register
  # class and create instance for each activity
92
  #LOG('Init Activity', 0, str(activity.__name__))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
93 94 95 96
  activity_instance = activity()
  activity_dict[activity.__name__] = activity_instance

class Message:
97
  """Activity Message Class.
98

99 100
  Message instances are stored in an activity queue, inside the Activity Tool.
  """
101 102
  def __init__(self, obj, active_process, activity_kw, method_id, args, kw):
    if isinstance(obj, str):
103
      self.object_path = tuple(obj.split('/'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
104
    else:
105
      self.object_path = obj.getPhysicalPath()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
106
    if type(active_process) is StringType:
107 108 109 110 111
      self.active_process = active_process.split('/')
    elif active_process is None:
      self.active_process = None
    else:
      self.active_process = active_process.getPhysicalPath()
112
      self.active_process_uid = active_process.getUid()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
113 114 115 116
    self.activity_kw = activity_kw
    self.method_id = method_id
    self.args = args
    self.kw = kw
Jean-Paul Smets's avatar
Jean-Paul Smets committed
117
    self.is_executed = 0
118
    self.exc_info = (None, None, None)
119
    self.processing = None
120 121
    self.user_name = str(_getAuthenticatedUser(self))
    # Store REQUEST Info ?
Jean-Paul Smets's avatar
Jean-Paul Smets committed
122

123
  def getObject(self, activity_tool):
124
    """return the object referenced in this message."""
125
    return activity_tool.unrestrictedTraverse(self.object_path)
126

127
  def getObjectList(self, activity_tool):
128
    """return the list of object that can be expanded from this message."""
129
    object_list = []
130
    try:
131
      object_list.append(self.getObject(activity_tool))
132
    except KeyError:
133 134 135 136 137 138
      pass
    else:
      if self.hasExpandMethod():
        expand_method_id = self.activity_kw['expand_method_id']
        # FIXME: how to pass parameters?
        object_list = getattr(object_list[0], expand_method_id)()
139
    return object_list
140

141
  def hasExpandMethod(self):
142 143 144 145 146
    """return true if the message has an expand method.
    An expand method is used to expand the list of objects and to turn a
    big recursive transaction affecting many objects into multiple
    transactions affecting only one object at a time (this can prevent
    duplicated method calls)."""
147
    return self.activity_kw.has_key('expand_method_id')
148

149
  def changeUser(self, user_name, activity_tool):
150
    """restore the security context for the calling user."""
151 152
    uf = activity_tool.getPortalObject().acl_users
    user = uf.getUserById(user_name)
153
    # if the user is not found, try to get it from a parent acl_users
154 155 156 157
    # XXX this is still far from perfect, because we need to store all
    # informations about the user (like original user folder, roles) to
    # replay the activity with exactly the same security context as if
    # it had been executed without activity.
158 159 160
    if user is None:
      uf = activity_tool.getPortalObject().aq_parent.acl_users
      user = uf.getUserById(user_name)
161 162 163
    if user is not None:
      user = user.__of__(uf)
      newSecurityManager(None, user)
164
    else :
165 166
      LOG("CMFActivity", WARNING,
          "Unable to find user %s in the portal" % user_name)
167
      noSecurityManager()
168 169 170 171 172
    return user

  def activateResult(self, activity_tool, result, object):
    if self.active_process is not None:
      active_process = activity_tool.unrestrictedTraverse(self.active_process)
173
      if isinstance(result,ActiveResult):
174 175
        result.edit(object_path=object)
        result.edit(method_id=self.method_id)
176 177
        # XXX Allow other method_id in future
        active_process.activateResult(result)
178
      else:
179
        active_process.activateResult(
180
                    ActiveResult(object_path=object,
181 182
                          method_id=self.method_id,
                          result=result)) # XXX Allow other method_id in future
183

Jean-Paul Smets's avatar
Jean-Paul Smets committed
184
  def __call__(self, activity_tool):
185
    try:
186
      obj = self.getObject(activity_tool)
187
      old_security_manager = getSecurityManager()
188
      # Change user if required (TO BE DONE)
189
      # We will change the user only in order to execute this method
190
      user = self.changeUser(self.user_name, activity_tool)
191 192 193
      try:
        result = getattr(obj, self.method_id)(*self.args, **self.kw)
      finally:
194 195
        setSecurityManager(old_security_manager)

196
      self.activateResult(activity_tool, result, obj)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
197
      self.is_executed = 1
198
    except:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
199
      self.is_executed = 0
200
      self.exc_info = sys.exc_info()
201
      LOG('ActivityTool', WARNING,
202
          'Could not call method %s on object %s' % (
203
          self.method_id, self.object_path), error=self.exc_info)
204
      # push the error in ZODB error_log
205
      if getattr(activity_tool, 'error_log', None) is not None:
206
        activity_tool.error_log.raising(self.exc_info)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
207

208 209 210 211 212 213 214
  def validate(self, activity, activity_tool, check_order_validation=1):
    return activity.validate(activity_tool, self,
                             check_order_validation=check_order_validation,
                             **self.activity_kw)

  def getDependentMessageList(self, activity, activity_tool):
    return activity.getDependentMessageList(activity_tool, self, **self.activity_kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
215

216
  def notifyUser(self, activity_tool, message="Failed Processing Activity"):
217 218 219 220 221 222
    """Notify the user that the activity failed."""
    portal = activity_tool.getPortalObject()
    user_email = None
    user = portal.portal_membership.getMemberById(self.user_name)
    if user is not None:
      user_email = user.getProperty('email')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
223
    if user_email in ('', None):
224 225
      user_email = portal.getProperty('email_to_address',
                       portal.getProperty('email_from_address'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
226
    mail_text = """From: %s
227 228 229 230 231 232 233
To: %s
Subject: %s

%s

Document: %s
Method: %s
234

235
%s
236
""" % (activity_tool.email_from_address, user_email, message,
237
       message, '/'.join(self.object_path), self.method_id,
238
       ''.join(ExceptionFormatter.format_exception(*self.exc_info)))
239 240
    try:
      activity_tool.MailHost.send( mail_text )
241 242 243 244
    except (socket.error, MailHostError):
      LOG('ActivityTool.notifyUser', WARNING, 'Mail containing failure information failed to be sent.', error=sys.exc_info())
      if self.exc_info[0] is not None:
        LOG('ActivityTool.notifyUser', WARNING, 'Original exception', error=self.exc_info)
245

246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
  def reactivate(self, activity_tool):
    # Reactivate the original object.
    obj= self.getObject(activity_tool)
    # Change user if required (TO BE DONE)
    # We will change the user only in order to execute this method
    current_user = str(_getAuthenticatedUser(self))
    user = self.changeUser(self.user_name, activity_tool)
    try:
      active_obj = obj.activate(**self.activity_kw)
      getattr(active_obj, self.method_id)(*self.args, **self.kw)
    finally:
      # Use again the previous user
      if user is not None:
        self.changeUser(current_user, activity_tool)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
261 262
class Method:

263
  def __init__(self, passive_self, activity, active_process, kw, method_id):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
264 265
    self.__passive_self = passive_self
    self.__activity = activity
266
    self.__active_process = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
267 268 269 270
    self.__kw = kw
    self.__method_id = method_id

  def __call__(self, *args, **kw):
271
    m = Message(self.__passive_self, self.__active_process, self.__kw, self.__method_id, args, kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
272 273
    activity_dict[self.__activity].queueMessage(self.__passive_self.portal_activities, m)

274 275
allow_class(Method)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
276 277
class ActiveWrapper:

278
  def __init__(self, passive_self, activity, active_process, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
279 280
    self.__dict__['__passive_self'] = passive_self
    self.__dict__['__activity'] = activity
281
    self.__dict__['__active_process'] = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
282 283 284 285
    self.__dict__['__kw'] = kw

  def __getattr__(self, id):
    return Method(self.__dict__['__passive_self'], self.__dict__['__activity'],
286
                  self.__dict__['__active_process'],
Jean-Paul Smets's avatar
Jean-Paul Smets committed
287 288
                  self.__dict__['__kw'], id)

289
class ActivityTool (Folder, UniqueObject):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
290
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
291 292 293 294 295 296 297 298 299 300 301 302
    ActivityTool is the central point for activity management.

    Improvement to consider to reduce locks:

      Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
              thus allowing all SQL transaction to happen in a very short time
              (this would also be a great way of using MyISAM tables)

      Idea 2: do the same at the level of ActivityTool

      Idea 3: do the same at the level of each activity (ie. queueMessage
              accumulates and fires messages at the end of the transactino)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
303 304 305
    """
    id = 'portal_activities'
    meta_type = 'CMF Activity Tool'
306
    portal_type = 'Activity Tool'
307
    allowed_types = ( 'CMF Active Process', )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
308 309
    security = ClassSecurityInfo()

310 311
    manage_options = tuple(
                     [ { 'label' : 'Overview', 'action' : 'manage_overview' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
312
                     , { 'label' : 'Activities', 'action' : 'manageActivities' }
313
                     , { 'label' : 'LoadBalancing', 'action' : 'manageLoadBalancing'}
314
                     , { 'label' : 'Advanced', 'action' : 'manageActivitiesAdvanced' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
315
                     ,
316
                     ] + list(Folder.manage_options))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
317 318 319 320

    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
    manageActivities = DTMLFile( 'dtml/manageActivities', globals() )

321 322 323
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivitiesAdvanced' )
    manageActivitiesAdvanced = DTMLFile( 'dtml/manageActivitiesAdvanced', globals() )

324 325
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
    manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )
326 327 328 329 330 331
    
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageLoadBalancing' )
    manageLoadBalancing = DTMLFile( 'dtml/manageLoadBalancing', globals() )
    
    distributingNode = ''
    _nodes = ()
332 333 334

    def __init__(self):
        return Folder.__init__(self, ActivityTool.id)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
335

336 337 338 339 340 341 342 343 344 345
    # Filter content (ZMI))
    def filtered_meta_types(self, user=None):
        # Filters the list of available meta types.
        all = ActivityTool.inheritedAttribute('filtered_meta_types')(self)
        meta_types = []
        for meta_type in self.all_meta_types():
            if meta_type['name'] in self.allowed_types:
                meta_types.append(meta_type)
        return meta_types

Jean-Paul Smets's avatar
Jean-Paul Smets committed
346 347
    def initialize(self):
      global is_initialized
Sebastien Robin's avatar
Sebastien Robin committed
348
      from Activity import RAMQueue, RAMDict, SQLQueue, SQLDict
Jean-Paul Smets's avatar
Jean-Paul Smets committed
349
      # Initialize each queue
350
      for activity in activity_dict.itervalues():
Jean-Paul Smets's avatar
Jean-Paul Smets committed
351
        activity.initialize(self)
Vincent Pelletier's avatar
Vincent Pelletier committed
352
      is_initialized = True
353 354 355
      
    security.declareProtected(Permissions.manage_properties, 'isSubscribed')
    def isSubscribed(self):
Aurel's avatar
Aurel committed
356
        """
357 358 359 360 361 362 363 364 365 366 367 368
        return True, if we are subscribed to TimerService.
        Otherwise return False.
        """
        service = getTimerService(self)
        if not service:
            LOG('ActivityTool', INFO, 'TimerService not available')
            return False
        
        path = '/'.join(self.getPhysicalPath())
        if path in service.lisSubscriptions():
            return True
        return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
369

370
    security.declareProtected(Permissions.manage_properties, 'subscribe')
371
    def subscribe(self, REQUEST=None, RESPONSE=None):
372 373
        """ subscribe to the global Timer Service """
        service = getTimerService(self)
374
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
375
        if not service:
376
            LOG('ActivityTool', INFO, 'TimerService not available')
377 378 379 380
            url += urllib.quote('TimerService not available')
        else:
            service.subscribe(self)
            url += urllib.quote("Subscribed to Timer Service")
381 382
        if RESPONSE is not None:
            RESPONSE.redirect(url)
383 384

    security.declareProtected(Permissions.manage_properties, 'unsubscribe')
385
    def unsubscribe(self, REQUEST=None, RESPONSE=None):
386 387
        """ unsubscribe from the global Timer Service """
        service = getTimerService(self)
388
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
389
        if not service:
390
            LOG('ActivityTool', INFO, 'TimerService not available')
391 392 393 394
            url += urllib.quote('TimerService not available')
        else:
            service.unsubscribe(self)
            url += urllib.quote("Unsubscribed from Timer Service")
395 396
        if RESPONSE is not None:
            RESPONSE.redirect(url)
397 398 399

    def manage_beforeDelete(self, item, container):
        self.unsubscribe()
400 401
        Folder.inheritedAttribute('manage_beforeDelete')(self, item, container)
    
402 403
    def manage_afterAdd(self, item, container):
        self.subscribe()
404 405
        Folder.inheritedAttribute('manage_afterAdd')(self, item, container)
       
406 407
    def getCurrentNode(self):
        """ Return current node in form ip:port """
408 409 410 411 412 413 414 415 416 417 418 419 420
        global currentNode
        if currentNode is None:
          port = ''
          from asyncore import socket_map
          for k, v in socket_map.items():
              if hasattr(v, 'port'):
                  # see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
                  type = str(getattr(v, '__class__', 'unknown'))
                  if type == 'ZServer.HTTPServer.zhttp_server':
                      port = v.port
                      break
          ip = socket.gethostbyname(socket.gethostname())
          currentNode = '%s:%s' %(ip, port)
421 422 423 424 425 426 427
        return currentNode
        
    security.declarePublic('getDistributingNode')
    def getDistributingNode(self):
        """ Return the distributingNode """
        return self.distributingNode

428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464
    def getNodeList(self, role=None):
      node_dict = self.getNodeDict()
      if role is None:
        result = [x for x in node_dict.keys()]
      else:
        result = [node_id for node_id, node_role in node_dict.items() if node_role == role]
      result.sort()
      return result

    def getNodeDict(self):
      nodes = self._nodes
      if isinstance(nodes, tuple):
        new_nodes = OIBTree()
        new_nodes.update([(x, ROLE_PROCESSING) for x in self._nodes])
        self._nodes = nodes = new_nodes
      return nodes

    def registerNode(self, node):
      node_dict = self.getNodeDict()
      if not node_dict.has_key(node):
        if len(node_dict) == 0: # If we are registering the first node, make
                                # it both the distributing node and a processing
                                # node.
          role = ROLE_PROCESSING
          self.distributingNode = node
        else:
          role = ROLE_IDLE
        self.updateNode(node, role)

    def updateNode(self, node, role):
      node_dict = self.getNodeDict()
      node_dict[node] = role

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getProcessingNodeList')
    def getProcessingNodeList(self):
      return self.getNodeList(role=ROLE_PROCESSING)

465
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getIdleNodeList')
466 467
    def getIdleNodeList(self):
      return self.getNodeList(role=ROLE_IDLE)
468

469 470 471 472
    def _isValidNodeName(self, node_name) :
      """Check we have been provided a good node name"""
      return isinstance(node_name, str) and NODE_RE.match(node_name)
      
473 474
    security.declarePublic('manage_setDistributingNode')
    def manage_setDistributingNode(self, distributingNode, REQUEST=None):
475
        """ set the distributing node """   
476
        if not distributingNode or self._isValidNodeName(distributingNode):
477 478 479 480 481 482 483 484 485 486 487 488 489
          self.distributingNode = distributingNode
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Distributing Node successfully changed."))
        else :
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Malformed Distributing Node."))

490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_delNode')
    def manage_delNode(self, unused_node_list=None, REQUEST=None):
      """ delete selected unused nodes """
      processing_node = self.getDistributingNode()
      updated_processing_node = False
      if unused_node_list is not None:
        node_dict = self.getNodeDict()
        for node in unused_node_list:
          if node in node_dict:
            del node_dict[node]
          if node == processing_node:
            self.processing_node = ''
            updated_processing_node = True
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing deleted."
        else:
          message = "Deleted nodes %r." % (unused_node_list, )
        if updated_processing_node:
          message += "Disabled distributing node because it was deleted."
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_addToProcessingList')
    def manage_addToProcessingList(self, unused_node_list=None, REQUEST=None):
      """ Change one or more idle nodes into processing nodes """
      if unused_node_list is not None:
        node_dict = self.getNodeDict()
        for node in unused_node_list:
          self.updateNode(node, ROLE_PROCESSING)
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing done."
        else:
          message = "Nodes now procesing: %r." % (unused_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_removeFromProcessingList')
    def manage_removeFromProcessingList(self, processing_node_list=None, REQUEST=None):
      """ Change one or more procesing nodes into idle nodes """
      if processing_node_list is not None:
        node_dict = self.getNodeDict()
        for node in processing_node_list:
          self.updateNode(node, ROLE_IDLE)
      if REQUEST is not None:
        if processing_node_list is None:
          message = "No used node selected, nothing done."
        else:
          message = "Nodes now unused %r." % (processing_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))
548

549
    def process_timer(self, tick, interval, prev="", next=""):
550
        """
551 552 553 554 555
        Call distribute() if we are the Distributing Node and call tic()
        with our node number.
        This method is called by TimerService in the interval given
        in zope.conf. The Default is every 5 seconds.
        """
556 557 558 559
        # Prevent TimerService from starting multiple threads in parallel
        acquired = timerservice_lock.acquire(0)
        if not acquired:
          return
560

561
        try:
562
          old_sm = getSecurityManager()
563
          try:
564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595
            try:
              # get owner of portal_catalog, so normally we should be able to
              # have the permission to invoke all activities
              user = self.portal_catalog.getWrappedOwner()
              newSecurityManager(self.REQUEST, user)

              currentNode = self.getCurrentNode()
              self.registerNode(currentNode)
              processing_node_list = self.getNodeList(role=ROLE_PROCESSING)

              # only distribute when we are the distributingNode or if it's empty
              if (self.getDistributingNode() == currentNode):
                self.distribute(len(processing_node_list))

              # SkinsTool uses a REQUEST cache to store skin objects, as
              # with TimerService we have the same REQUEST over multiple
              # portals, we clear this cache to make sure the cache doesn't
              # contains skins from another portal.
              stool = getToolByName(self, 'portal_skins', None)
              if stool is not None:
                stool.changeSkin(None)

              # call tic for the current processing_node
              # the processing_node numbers are the indices of the elements in the node tuple +1
              # because processing_node starts form 1
              if currentNode in processing_node_list:
                self.tic(processing_node_list.index(currentNode)+1)
            except:
              # Catch ALL exception to avoid killing timerserver.
              LOG('ActivityTool', ERROR, 'process_timer received an exception', error=sys.exc_info())
          finally:
            setSecurityManager(old_sm)
Jérome Perrin's avatar
Jérome Perrin committed
596
        finally:
597
          timerservice_lock.release()
598

Jean-Paul Smets's avatar
Jean-Paul Smets committed
599 600 601 602 603 604
    security.declarePublic('distribute')
    def distribute(self, node_count=1):
      """
        Distribute load
      """
      # Initialize if needed
Vincent Pelletier's avatar
Vincent Pelletier committed
605 606
      if not is_initialized:
        self.initialize()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
607 608

      # Call distribute on each queue
609
      for activity in activity_dict.itervalues():
610
        activity.distribute(aq_inner(self), node_count)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
611

Jean-Paul Smets's avatar
Jean-Paul Smets committed
612
    security.declarePublic('tic')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
613
    def tic(self, processing_node=1, force=0):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
614 615
      """
        Starts again an activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
616
        processing_node starts from 1 (there is not node 0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
617
      """
Vincent Pelletier's avatar
Vincent Pelletier committed
618
      global active_threads, first_run
Jean-Paul Smets's avatar
Jean-Paul Smets committed
619 620

      # return if the number of threads is too high
621
      # else, increase the number of active_threads and continue
622 623
      tic_lock.acquire()
      too_many_threads = (active_threads >= max_active_threads)
624
      if not too_many_threads or force:
625
        active_threads += 1
626 627 628
      else:
        tic_lock.release()
        raise RuntimeError, 'Too many threads'
629
      tic_lock.release()
630

Jean-Paul Smets's avatar
Jean-Paul Smets committed
631
      # Initialize if needed
Vincent Pelletier's avatar
Vincent Pelletier committed
632 633
      if not is_initialized:
        self.initialize()
634

635
      inner_self = aq_inner(self)
636

637 638 639
      # If this is the first tic after zope is started, reset the processing
      # flag for activities of this node
      if first_run:
640 641 642 643
        inner_self.SQLDict_clearProcessingFlag(
                                processing_node=processing_node)
        inner_self.SQLQueue_clearProcessingFlag(
                                processing_node=processing_node)
644
        first_run = False
645

646 647
      try:
        # Wakeup each queue
648
        for activity in activity_dict.itervalues():
649
          activity.wakeup(inner_self, processing_node)
650

651 652 653 654
        # Process messages on each queue in round robin
        has_awake_activity = 1
        while has_awake_activity:
          has_awake_activity = 0
655
          for activity in activity_dict.itervalues():
656 657
            activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity
            has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node)
658 659 660 661 662
      finally:
        # decrease the number of active_threads
        tic_lock.acquire()
        active_threads -= 1
        tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
663

664
    def hasActivity(self, *args, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
665
      # Check in each queue if the object has deferred tasks
666 667
      # if not argument is provided, then check on self
      if len(args) > 0:
668
        obj = args[0]
669
      else:
670
        obj = self
671
      for activity in activity_dict.itervalues():
672
        if activity.hasActivity(aq_inner(self), obj, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
673 674 675
          return 1
      return 0

676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712
    def getActivityBuffer(self, create_if_not_found=True):
      """
        Get activtity buffer for this thread for this activity tool.
        If no activity buffer is found at lowest level and create_if_not_found
        is True, create one.
        Intermediate level is unconditionaly created if non existant because
        chances are it will be used in the instance life.
        Lock is held when checking for intermediate level existance
        because:
         - intermediate level dict must not be created in 2 threads at the
           same time, since one creation would destroy the existing one.
        It's released after that step because:
         - lower level access is at thread scope, thus by definition there
           can be only one access at a time to a key
         - GIL protects us when accessing python instances
      """
      global global_activity_buffer
      global global_activity_buffer_lock
      assert getattr(self, 'aq_self', None) is not None
      my_instance_key = self.getPhysicalPath()
      my_thread_key = get_ident()
      global_activity_buffer_lock.acquire()
      try:
        if my_instance_key not in global_activity_buffer:
          global_activity_buffer[my_instance_key] = {}
      finally:
        global_activity_buffer_lock.release()
      thread_activity_buffer = global_activity_buffer[my_instance_key]
      if my_thread_key not in thread_activity_buffer:
        if create_if_not_found:
          buffer = ActivityBuffer(activity_tool=self)
        else:
          buffer = None
        thread_activity_buffer[my_thread_key] = buffer
      activity_buffer = thread_activity_buffer[my_thread_key]
      return activity_buffer

713 714
    security.declarePrivate('activateObject')
    def activateObject(self, object, activity, active_process, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
715 716
      if not is_initialized:
        self.initialize()
717
      self.getActivityBuffer()
718
      return ActiveWrapper(object, activity, active_process, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
719

720
    def deferredQueueMessage(self, activity, message):
721 722
      activity_buffer = self.getActivityBuffer()
      activity_buffer.deferredQueueMessage(self, activity, message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
723

724
    def deferredDeleteMessage(self, activity, message):
725 726
      activity_buffer = self.getActivityBuffer()
      activity_buffer.deferredDeleteMessage(self, activity, message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
727

Jean-Paul Smets's avatar
Jean-Paul Smets committed
728
    def getRegisteredMessageList(self, activity):
729
      activity_buffer = self.getActivityBuffer(create_if_not_found=False)
730
      if activity_buffer is not None:
731 732
        #activity_buffer._register() # This is required if flush flush is called outside activate
        return activity.getRegisteredMessageList(activity_buffer,
733
                                                 aq_inner(self))
734 735
      else:
        return []
Yoshinori Okuji's avatar
Yoshinori Okuji committed
736

Jean-Paul Smets's avatar
Jean-Paul Smets committed
737
    def unregisterMessage(self, activity, message):
738 739 740
      activity_buffer = self.getActivityBuffer()
      #activity_buffer._register()
      return activity.unregisterMessage(activity_buffer, aq_inner(self), message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
741

742
    def flush(self, obj, invoke=0, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
743 744
      if not is_initialized:
        self.initialize()
745
      self.getActivityBuffer()
746 747
      if isinstance(obj, tuple):
        object_path = obj
748
      else:
749
        object_path = obj.getPhysicalPath()
750
      for activity in activity_dict.itervalues():
751
        activity.flush(aq_inner(self), object_path, invoke=invoke, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
752

753
    def start(self, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
754 755
      if not is_initialized:
        self.initialize()
756
      for activity in activity_dict.itervalues():
757
        activity.start(aq_inner(self), **kw)
758 759

    def stop(self, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
760 761
      if not is_initialized:
        self.initialize()
762
      for activity in activity_dict.itervalues():
763
        activity.stop(aq_inner(self), **kw)
764

Jean-Paul Smets's avatar
Jean-Paul Smets committed
765
    def invoke(self, message):
766 767
      if getattr(self, 'aq_chain', None) is not None:
        # Grab existing acquisition chain and extrach base objects.
768
        base_chain = [aq_base(x) for x in self.aq_chain]
769 770 771 772 773 774 775 776
        # Grab existig request (last chain item) and create a copy.
        request_container = base_chain.pop()
        request = request_container.REQUEST
        # XXX: REQUEST.clone() requires PARENTS to be set, and it's not when
        # runing unit tests. Recreate it if it does not exist.
        parents = getattr(request, 'PARENTS', None)
        if parents is None:
          LOG('CMFActivity.ActivityTool.invoke', INFO, 'PARENTS is not defined in REQUEST. It should only happen in unit tests.')
777
          request['PARENTS'] = self.aq_chain[:]
778 779 780 781 782 783 784 785 786 787
        new_request_container = request_container.__class__(REQUEST=request.clone())
        # Recreate acquisition chain.
        my_self = new_request_container
        base_chain.reverse()
        for item in base_chain:
          my_self = item.__of__(my_self)
      else:
        my_self = self
        LOG('CMFActivity.ActivityTool.invoke', INFO, 'Strange: invoke is called outside of acquisition context.')
      message(my_self)
788 789 790
      if my_self is not self: # We rewrapped self
        for held in my_self.REQUEST._held:
          self.REQUEST._hold(held)
791

792 793 794 795 796 797 798 799 800
    def invokeGroup(self, method_id, message_list):
      # Invoke a group method.
      object_list = []
      expanded_object_list = []
      new_message_list = []
      path_dict = {}
      # Filter the list of messages. If an object is not available, ignore such a message.
      # In addition, expand an object if necessary, and make sure that no duplication happens.
      for m in message_list:
801 802
        # alternate method is used to segregate objects which cannot be grouped.
        alternate_method_id = m.activity_kw.get('alternate_method_id')
803 804
        try:
          obj = m.getObject(self)
805
          i = len(new_message_list) # This is an index of this message in new_message_list.
806
          if m.hasExpandMethod():
807 808
            for subobj in m.getObjectList(self):
              path = subobj.getPath()
809
              if path not in path_dict:
810
                path_dict[path] = i
811 812 813 814 815 816
                if alternate_method_id is not None \
                   and hasattr(aq_base(subobj), alternate_method_id):
                  # if this object is alternated, generate a new single active object.
                  activity_kw = m.activity_kw.copy()
                  if 'group_method_id' in activity_kw:
                    del activity_kw['group_method_id']
817 818
                  if 'group_id' in activity_kw:
                    del activity_kw['group_id']                    
819 820 821 822
                  active_obj = subobj.activate(**activity_kw)
                  getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
                else:
                  expanded_object_list.append(subobj)
823 824 825
          else:
            path = obj.getPath()
            if path not in path_dict:
826
              path_dict[path] = i
827 828 829 830 831 832
              if alternate_method_id is not None \
                  and hasattr(aq_base(obj), alternate_method_id):
                # if this object is alternated, generate a new single active object.
                activity_kw = m.activity_kw.copy()
                if 'group_method_id' in activity_kw:
                  del activity_kw['group_method_id']
833 834
                if 'group_id' in activity_kw:
                  del activity_kw['group_id']
835 836 837 838
                active_obj = obj.activate(**activity_kw)
                getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
              else:
                expanded_object_list.append(obj)
839
          object_list.append(obj)
840 841 842
          new_message_list.append(m)
        except:
          m.is_executed = 0
843
          m.exc_info = sys.exc_info()
844
          LOG('WARNING ActivityTool', 0,
845
              'Could not call method %s on object %s' %
846
              (m.method_id, m.object_path), error=m.exc_info)
847

848 849
      try:
        if len(expanded_object_list) > 0:
850 851
          method = self.unrestrictedTraverse(method_id)
          # FIXME: how to apply security here?
852 853
          # NOTE: expanded_object_list must be set to failed objects by the callee.
          #       If it fully succeeds, expanded_object_list must be empty when returning.
854
          result = method(expanded_object_list, **m.kw)
855
        else:
856 857 858 859 860
          result = None
      except:
        # In this case, the group method completely failed.
        for m in new_message_list:
          m.is_executed = 0
861
          m.exc_info = sys.exc_info()
862
        LOG('WARNING ActivityTool', 0,
863
            'Could not call method %s on objects %s' %
864
            (method_id, expanded_object_list), error=m.exc_info)
865 866 867 868 869 870 871 872 873 874 875 876 877 878 879
      else:
        # Obtain all indices of failed messages. Note that this can be a partial failure.
        failed_message_dict = {}
        for obj in expanded_object_list:
          path = obj.getPath()
          i = path_dict[path]
          failed_message_dict[i] = None

        # Only for succeeded messages, an activity process is invoked (if any).
        for i in xrange(len(object_list)):
          object = object_list[i]
          m = new_message_list[i]
          if i in failed_message_dict:
            m.is_executed = 0
            LOG('ActivityTool', WARNING,
880 881
                'the method %s partially failed on object %s' %
                (m.method_id, m.object_path,))
882 883 884 885 886
          else:
            try:
              m.activateResult(self, result, object)
              m.is_executed = 1
            except:
887
              m.is_executed = 0
888
              m.exc_info = sys.exc_info()
889
              LOG('ActivityTool', WARNING,
890
                  'Could not call method %s on object %s' % (
891
                  m.method_id, m.object_path), error=m.exc_info)
892

893 894
    def newMessage(self, activity, path, active_process,
                   activity_kw, method_id, *args, **kw):
895
      # Some Security Cheking should be made here XXX
Vincent Pelletier's avatar
Vincent Pelletier committed
896 897
      if not is_initialized:
        self.initialize()
898
      self.getActivityBuffer()
899
      activity_dict[activity].queueMessage(aq_inner(self),
900
        Message(path, active_process, activity_kw, method_id, args, kw))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
901

902
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageInvoke' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
903 904 905 906 907 908
    def manageInvoke(self, object_path, method_id, REQUEST=None):
      """
        Invokes all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
909
      self.flush(object_path,method_id=method_id,invoke=1)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
910
      if REQUEST is not None:
911 912
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
913

914
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageCancel' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
915 916 917 918 919 920
    def manageCancel(self, object_path, method_id, REQUEST=None):
      """
        Cancel all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
921
      self.flush(object_path,method_id=method_id,invoke=0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
922
      if REQUEST is not None:
923 924
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
925

926 927
    security.declareProtected( CMFCorePermissions.ManagePortal,
                               'manageClearActivities' )
928
    def manageClearActivities(self, keep=1, REQUEST=None):
929 930 931 932 933
      """
        Clear all activities and recreate tables.
      """
      folder = getToolByName(self, 'portal_skins').activity

934 935
      # Obtain all pending messages.
      message_list = []
936
      if keep:
937
        for activity in activity_dict.itervalues():
938 939 940 941 942 943
          if hasattr(activity, 'dumpMessageList'):
            try:
              message_list.extend(activity.dumpMessageList(self))
            except ConflictError:
              raise
            except:
944 945 946
              LOG('ActivityTool', WARNING,
                  'could not dump messages from %s' %
                  (activity,), error=sys.exc_info())
947 948

      if getattr(folder, 'SQLDict_createMessageTable', None) is not None:
949 950 951 952 953
        try:
          folder.SQLDict_dropMessageTable()
        except ConflictError:
          raise
        except:
954
          LOG('CMFActivity', WARNING,
955
              'could not drop the message table',
956 957 958
              error=sys.exc_info())
        folder.SQLDict_createMessageTable()

959
      if getattr(folder, 'SQLQueue_createMessageTable', None) is not None:
960 961 962 963 964
        try:
          folder.SQLQueue_dropMessageTable()
        except ConflictError:
          raise
        except:
965
          LOG('CMFActivity', WARNING,
966
              'could not drop the message queue table',
967 968 969
              error=sys.exc_info())
        folder.SQLQueue_createMessageTable()

970 971 972
      # Reactivate the messages.
      for m in message_list:
        try:
973
          m.reactivate(aq_inner(self))
974 975 976 977
        except ConflictError:
          raise
        except:
          LOG('ActivityTool', WARNING,
978 979
              'could not reactivate the message %r, %r' %
              (m.object_path, m.method_id), error=sys.exc_info())
980

981
      if REQUEST is not None:
982 983
        return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(),
          'manageActivitiesAdvanced?manage_tabs_message=Activities%20Cleared'))
984

Jean-Paul Smets's avatar
Jean-Paul Smets committed
985
    security.declarePublic('getMessageList')
986
    def getMessageList(self,**kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
987 988 989
      """
        List messages waiting in queues
      """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
990
      # Initialize if needed
Vincent Pelletier's avatar
Vincent Pelletier committed
991 992
      if not is_initialized:
        self.initialize()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
993

Jean-Paul Smets's avatar
Jean-Paul Smets committed
994
      message_list = []
995
      for activity in activity_dict.itervalues():
Sebastien Robin's avatar
Sebastien Robin committed
996
        try:
997
          message_list += activity.getMessageList(aq_inner(self),**kw)
Sebastien Robin's avatar
Sebastien Robin committed
998 999
        except AttributeError:
          LOG('getMessageList, could not get message from Activity:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1000 1001
      return message_list

1002 1003 1004 1005 1006 1007
    security.declarePublic('countMessageWithTag')
    def countMessageWithTag(self, value):
      """
        Return the number of messages which match the given tag.
      """
      message_count = 0
1008
      for activity in activity_dict.itervalues():
1009
        message_count += activity.countMessageWithTag(aq_inner(self), value)
Sebastien Robin's avatar
Sebastien Robin committed
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019
      return message_count

    security.declarePublic('countMessage')
    def countMessage(self, **kw):
      """
        Return the number of messages which match the given parameter.

        Parameters allowed:

        method_id : the id of the method
Jérome Perrin's avatar
Jérome Perrin committed
1020
        path : for activities on a particular object
Sebastien Robin's avatar
Sebastien Robin committed
1021 1022 1023 1024
        tag : activities with a particular tag
        message_uid : activities with a particular uid
      """
      message_count = 0
1025
      for activity in activity_dict.itervalues():
1026
        message_count += activity.countMessage(aq_inner(self), **kw)
1027 1028
      return message_count

1029
    security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
1030
    def newActiveProcess(self, **kw):
1031 1032 1033
      from ActiveProcess import addActiveProcess
      new_id = str(self.generateNewId())
      addActiveProcess(self, new_id)
1034 1035 1036
      active_process = self._getOb(new_id)
      active_process.edit(**kw)
      return active_process
1037 1038 1039 1040

    def reindexObject(self):
      self.immediateReindexObject()

1041
    # Active synchronisation methods
1042
    security.declarePrivate('validateOrder')
1043
    def validateOrder(self, message, validator_id, validation_value):
1044 1045 1046 1047 1048
      message_list = self.getDependentMessageList(message, validator_id, validation_value)
      return len(message_list) > 0

    security.declarePrivate('getDependentMessageList')
    def getDependentMessageList(self, message, validator_id, validation_value):
Vincent Pelletier's avatar
Vincent Pelletier committed
1049 1050
      if not is_initialized:
        self.initialize()
1051
      message_list = []
Vincent Pelletier's avatar
Vincent Pelletier committed
1052
      method_id = "_validate_%s" % validator_id
1053
      for activity in activity_dict.itervalues():
1054 1055 1056 1057 1058 1059
        method = getattr(activity, method_id, None)
        if method is not None:
          result = method(aq_inner(self), message, validation_value)
          if result:
            message_list.extend([(activity, m) for m in result])
      return message_list
1060

Yoshinori Okuji's avatar
Yoshinori Okuji committed
1061 1062
    # Required for tests (time shift)
    def timeShift(self, delay):
Vincent Pelletier's avatar
Vincent Pelletier committed
1063 1064
      if not is_initialized:
        self.initialize()
1065
      for activity in activity_dict.itervalues():
1066
        activity.timeShift(aq_inner(self), delay)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1067

1068
InitializeClass(ActivityTool)