ActivityTool.py 72.6 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
Jean-Paul Smets's avatar
Jean-Paul Smets committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29 30 31 32
import socket
import urllib
import threading
import sys
Vincent Pelletier's avatar
Vincent Pelletier committed
33
from types import StringType
34
from collections import defaultdict
35
from cPickle import dumps, loads
36
from Products.CMFCore import permissions as CMFCorePermissions
37
from Products.CMFActivity.ActiveResult import ActiveResult
38
from Products.CMFActivity.ActiveObject import DEFAULT_ACTIVITY
39
from Products.CMFActivity.ActivityConnection import ActivityConnection
40
from Products.PythonScripts.Utility import allow_class
41
from AccessControl import ClassSecurityInfo, Permissions
Jérome Perrin's avatar
Jérome Perrin committed
42 43 44 45
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
from AccessControl.SecurityManagement import getSecurityManager
46
from AccessControl.User import system as system_user
47
from Products.CMFCore.utils import UniqueObject
48
from Products.ERP5Type.Globals import InitializeClass, DTMLFile
49
from Acquisition import aq_base, aq_inner, aq_parent
50
from ActivityBuffer import ActivityBuffer
51
from ActivityRuntimeEnvironment import BaseMessage
52
from zExceptions import ExceptionFormatter, Redirect
53
from BTrees.OIBTree import OIBTree
54
from BTrees.OOBTree import OOBTree
55 56
from Zope2 import app
from Products.ERP5Type.UnrestrictedMethod import PrivilegedUser
57
from zope.site.hooks import setSite
58
import transaction
59
from App.config import getConfiguration
60
from Shared.DC.ZRDB.Results import Results
61

62
from zope.globalrequest import getRequest, setRequest
63
from Products.MailHost.MailHost import MailHostError
Jean-Paul Smets's avatar
Jean-Paul Smets committed
64

65
from zLOG import LOG, INFO, WARNING, ERROR
66
import warnings
67
from time import time
68 69

try:
70
  from Products.TimerService import getTimerService
71
except ImportError:
72 73
  def getTimerService(self):
    pass
Jean-Paul Smets's avatar
Jean-Paul Smets committed
74

75
from traceback import format_list, extract_stack
76

Jean-Paul Smets's avatar
Jean-Paul Smets committed
77 78 79 80
# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
81 82
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
83
is_running_lock = threading.Lock()
84
currentNode = None
85
_server_address = None
86 87
ROLE_IDLE = 0
ROLE_PROCESSING = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
88

89 90 91 92 93
# Logging channel definitions
import logging
# Main logging channel
activity_logger = logging.getLogger('CMFActivity')
# Some logging subchannels
94
activity_tracking_logger = logging.getLogger('Tracking')
95
activity_timing_logger = logging.getLogger('CMFActivity.TimingLog')
96 97 98 99 100 101 102 103 104 105

# Direct logging to "[instancehome]/log/CMFActivity.log", if this directory exists.
# Otherwise, it will end up in root logging facility (ie, event.log).
from App.config import getConfiguration
import os
instancehome = getConfiguration().instancehome
if instancehome is not None:
  log_directory = os.path.join(instancehome, 'log')
  if os.path.isdir(log_directory):
    from Signals import Signals
106 107
    from ZConfig.components.logger.loghandler import FileHandler
    log_file_handler = FileHandler(os.path.join(log_directory, 'CMFActivity.log'))
108 109 110 111 112 113 114 115
    # Default zope log format string borrowed from
    # ZConfig/components/logger/factory.xml, but without the extra "------"
    # line separating entries.
    log_file_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s", "%Y-%m-%dT%H:%M:%S"))
    Signals.registerZopeSignals([log_file_handler])
    activity_logger.addHandler(log_file_handler)
    activity_logger.propagate = 0

116 117 118 119 120 121 122 123
def activity_timing_method(method, args, kw):
  begin = time()
  try:
    return method(*args, **kw)
  finally:
    end = time()
    activity_timing_logger.info('%.02fs: %r(*%r, **%r)' % (end - begin, method, args, kw))

124 125 126 127 128 129 130
def getServerAddress():
    """
    Return current server address
    """
    global _server_address
    if _server_address is None:
        ip = port = ''
131 132 133 134 135 136 137 138 139 140 141 142 143
        try:
            zopewsgi = sys.modules['Products.ERP5.bin.zopewsgi']
        except KeyError:
            from asyncore import socket_map
            for k, v in socket_map.items():
                if hasattr(v, 'addr'):
                    # see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
                    type = str(getattr(v, '__class__', 'unknown'))
                    if type == 'ZServer.HTTPServer.zhttp_server':
                        ip, port = v.addr
                        break
        else:
            ip, port = zopewsgi.server.addr
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
        if ip == '0.0.0.0':
            ip = socket.gethostbyname(socket.gethostname())
        _server_address = '%s:%s' %(ip, port)
    return _server_address

def getCurrentNode():
    """ Return current node identifier """
    global currentNode
    if currentNode is None:
      currentNode = getattr(
        getConfiguration(),
        'product_config',
        {},
      ).get('cmfactivity', {}).get('node-id')
    if currentNode is None:
      warnings.warn('Node name auto-generation is deprecated, please add a'
        '\n'
        '<product-config CMFActivity>\n'
        '  node-id = ...\n'
        '</product-config>\n'
        'section in your zope.conf, replacing "..." with a cluster-unique '
        'node identifier.', DeprecationWarning)
      currentNode = getServerAddress()
    return currentNode

169 170 171
# Here go ActivityBuffer instances
# Structure:
#  global_activity_buffer[activity_tool_path][thread_id] = ActivityBuffer
172 173
global_activity_buffer = defaultdict(dict)
from thread import get_ident
174

175 176 177 178
MESSAGE_NOT_EXECUTED = 0
MESSAGE_EXECUTED = 1
MESSAGE_NOT_EXECUTABLE = 2

179

180 181 182 183
class SkippedMessage(Exception):
  pass


184
class Message(BaseMessage):
185
  """Activity Message Class.
186

187 188
  Message instances are stored in an activity queue, inside the Activity Tool.
  """
189

190
  active_process = None
191
  active_process_uid = None
192
  call_traceback = None
193
  exc_info = None
194
  exc_type = None
195 196
  is_executed = MESSAGE_NOT_EXECUTED
  traceback = None
197
  document_uid = None
198
  is_registered = False
199

200 201 202
  def __init__(
      self,
      url,
203
      document_uid,
204 205 206 207 208 209 210 211 212
      active_process,
      active_process_uid,
      activity_kw,
      method_id,
      args, kw,
      request=None,
      portal_activities=None,
    ):
    self.object_path = url
213
    self.document_uid = document_uid
214 215
    self.active_process = active_process
    self.active_process_uid = active_process_uid
Jean-Paul Smets's avatar
Jean-Paul Smets committed
216 217 218 219
    self.activity_kw = activity_kw
    self.method_id = method_id
    self.args = args
    self.kw = kw
220
    if getattr(portal_activities, 'activity_creation_trace', False):
221 222 223 224
      # Save current traceback, to make it possible to tell where a message
      # was generated.
      # Strip last stack entry, since it will always be the same.
      self.call_traceback = ''.join(format_list(extract_stack()[:-1]))
225
    self.user_name = getSecurityManager().getUser().getIdOrUserName()
226
    # Store REQUEST Info
227
    self.request_info = {}
228
    if request is not None:
229 230 231 232 233 234 235 236 237
      if 'SERVER_URL' in request.other:
        self.request_info['SERVER_URL'] = request.other['SERVER_URL']
      if 'VirtualRootPhysicalPath' in request.other:
        self.request_info['VirtualRootPhysicalPath'] = \
          request.other['VirtualRootPhysicalPath']
      if 'HTTP_ACCEPT_LANGUAGE' in request.environ:
        self.request_info['HTTP_ACCEPT_LANGUAGE'] = \
          request.environ['HTTP_ACCEPT_LANGUAGE']
      self.request_info['_script'] = list(request._script)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
238

239 240 241 242 243 244 245 246
  @staticmethod
  def load(s, **kw):
    self = loads(s)
    self.__dict__.update(kw)
    return self

  dump = dumps

247 248 249 250 251 252 253
  def getGroupId(self):
    get = self.activity_kw.get
    group_method_id = get('group_method_id', '')
    if group_method_id is None:
      group_method_id = 'portal_activities/dummyGroupMethod/' + self.method_id
    return group_method_id + '\0' + get('group_id', '')

254 255 256 257
  def getGroupMethodCost(self):
    # Meaningless if called on a non-grouped message
    return self.activity_kw.get('group_method_cost', .01)

258 259 260 261 262 263
  def _getObject(self, activity_tool):
    obj = activity_tool.getPhysicalRoot()
    for id in self.object_path[1:]:
      obj = obj[id]
    return obj

264
  def getObject(self, activity_tool):
265
    """return the object referenced in this message."""
266
    try:
267
      obj = self._getObject(activity_tool)
268
    except KeyError:
269
      LOG('CMFActivity', WARNING, "Message dropped (no object found at path %r)"
270
          % (self.object_path,), error=True)
271
      self.setExecutionState(MESSAGE_NOT_EXECUTABLE)
272
    else:
273 274
      if self.document_uid and self.document_uid != getattr(aq_base(obj), 'uid', None):
        raise ValueError("UID mismatch for %r" % obj)
275 276 277 278
      return obj

  def getObjectList(self, activity_tool):
    """return the list of object that can be expanded from this message
279 280 281 282
    An expand method is used to expand the list of objects and to turn a
    big recursive transaction affecting many objects into multiple
    transactions affecting only one object at a time (this can prevent
    duplicated method calls)."""
283 284 285 286 287 288 289 290 291 292
    obj = self.getObject(activity_tool)
    if obj is None:
      return ()
    if 'expand_method_id' in self.activity_kw:
      return getattr(obj, self.activity_kw['expand_method_id'])()
    return obj,

  def getObjectCount(self, activity_tool):
    if 'expand_method_id' in self.activity_kw:
      try:
293
        obj = self._getObject(activity_tool)
294 295 296 297
        return len(getattr(obj, self.activity_kw['expand_method_id'])())
      except StandardError:
        pass
    return 1
298

299
  def changeUser(self, user_name, activity_tool):
300
    """restore the security context for the calling user."""
301 302 303
    portal = activity_tool.getPortalObject()
    portal_uf = portal.acl_users
    uf = portal_uf
304
    user = uf.getUserById(user_name)
305
    # if the user is not found, try to get it from a parent acl_users
306
    # XXX this is still far from perfect, because we need to store all
307
    # information about the user (like original user folder, roles) to
308 309
    # replay the activity with exactly the same security context as if
    # it had been executed without activity.
310
    if user is None:
311
      uf = portal.aq_parent.acl_users
312
      user = uf.getUserById(user_name)
313
    if user is None and user_name == system_user.getUserName():
314 315 316 317
      # The following logic partly comes from unrestricted_apply()
      # implementation in ERP5Type.UnrestrictedMethod but we get roles
      # from the portal to have more roles.
      uf = portal_uf
318 319
      role_list = uf.valid_roles()
      user = PrivilegedUser(user_name, None, role_list, ()).__of__(uf)
320 321 322
    if user is not None:
      user = user.__of__(uf)
      newSecurityManager(None, user)
323
      transaction.get().setUser(user_name, '/'.join(uf.getPhysicalPath()))
324
    else :
325
      LOG("CMFActivity", WARNING,
326
          "Unable to find user %r in the portal" % user_name)
327
      noSecurityManager()
328 329
    return user

330 331 332
  def activateResult(self, active_process, result, object):
    if not isinstance(result, ActiveResult):
      result = ActiveResult(result=result)
333 334 335
    signature = self.activity_kw.get('signature')
    if signature:
      result.edit(id=signature)
336 337
    # XXX Allow other method_id in future
    result.edit(object_path=object, method_id=self.method_id)
338
    active_process.postResult(result)
339

Jean-Paul Smets's avatar
Jean-Paul Smets committed
340
  def __call__(self, activity_tool):
341
    try:
342
      obj = self.getObject(activity_tool)
343
      if obj is not None:
344 345
        old_security_manager = getSecurityManager()
        try:
346 347 348
          # Change user if required (TO BE DONE)
          # We will change the user only in order to execute this method
          self.changeUser(self.user_name, activity_tool)
349 350 351
          # XXX: There is no check to see if user is allowed to access
          #      that method !
          method = getattr(obj, self.method_id)
352 353 354
          transaction.get().note(
            'CMFActivity ' + '/'.join(self.object_path) + '/' + self.method_id
          )
355 356 357 358
          # Store site info
          setSite(activity_tool.getParentValue())
          if activity_tool.activity_timing_log:
            result = activity_timing_method(method, self.args, self.kw)
359
          else:
360
            result = method(*self.args, **self.kw)
361 362 363 364
        finally:
          setSecurityManager(old_security_manager)

        if method is not None:
365 366 367 368
          if self.active_process and result is not None:
            self.activateResult(
              activity_tool.unrestrictedTraverse(self.active_process),
              result, obj)
369
          self.setExecutionState(MESSAGE_EXECUTED)
370 371
    except:
      self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
372

373
  def notifyUser(self, activity_tool, retry=False):
374
    """Notify the user that the activity failed."""
375 376 377
    if not activity_tool.activity_failure_mail_notification:
      return

378
    portal = activity_tool.getPortalObject()
379
    user_email = portal.getProperty('email_to_address',
380
                       portal.getProperty('email_from_address'))
381 382
    email_from_name = portal.getProperty('email_from_name',
                       portal.getProperty('email_from_address'))
383
    fail_count = self.line.retry + 1
384
    if retry:
385 386 387 388
      message = "Pending activity already failed %s times" % fail_count
    else:
      message = "Activity failed"
    path = '/'.join(self.object_path)
389
    mail_text = """From: %s <%s>
390
To: %s
391
Subject: %s: %s/%s
392

393
Node: %s
394
Failures: %s
395
User name: %r
396
Uid: %u
397 398
Document: %s
Method: %s
399 400
Arguments: %r
Named Parameters: %r
401
""" % (email_from_name, activity_tool.email_from_address, user_email, message,
402
       path, self.method_id, getCurrentNode(), fail_count,
403
       self.user_name, self.line.uid, path, self.method_id, self.args, self.kw)
404 405 406 407
    if self.traceback:
      mail_text += '\nException:\n' + self.traceback
    if self.call_traceback:
      mail_text += '\nCreated at:\n' + self.call_traceback
408
    try:
409
      portal.MailHost.send(mail_text)
Vincent Pelletier's avatar
Vincent Pelletier committed
410
    except (socket.error, MailHostError), message:
411 412
      LOG('ActivityTool.notifyUser', WARNING,
          'Mail containing failure information failed to be sent: %s' % message)
413

414
  def reactivate(self, activity_tool, activity=DEFAULT_ACTIVITY):
415
    # Reactivate the original object.
416
    obj = self._getObject(activity_tool)
417
    old_security_manager = getSecurityManager()
418
    try:
419 420 421
      # Change user if required (TO BE DONE)
      # We will change the user only in order to execute this method
      user = self.changeUser(self.user_name, activity_tool)
422
      active_obj = obj.activate(activity=activity, **self.activity_kw)
423 424 425
      getattr(active_obj, self.method_id)(*self.args, **self.kw)
    finally:
      # Use again the previous user
426
      setSecurityManager(old_security_manager)
427

428 429 430 431 432 433
  def setExecutionState(self, is_executed, exc_info=None, log=True, context=None):
    """
      Set message execution state.

      is_executed can be one of MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED and
      MESSAGE_NOT_EXECUTABLE (variables defined above).
434

435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454
      exc_info must be - if given - similar to sys.exc_info() return value.

      log must be - if given - True or False. If True, a log line will be
      emited with failure details. This parameter should only be used when
      invoking this method on a list of messages to avoid log flood. It is
      caller's responsability to output a log line summing up all errors, and
      to store error in Zope's error_log.

      context must be - if given - an object wrapped in acquisition context.
      It is used to access Zope's error_log object. It is not used if log is
      False.

      If given state is not MESSAGE_EXECUTED, it will also store given
      exc_info. If not given, it will extract one using sys.exc_info().
      If final exc_info does not contain any exception, current stack trace
      will be stored instead: it will hopefuly help understand why message
      is in an error state.
    """
    assert is_executed in (MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, MESSAGE_NOT_EXECUTABLE)
    self.is_executed = is_executed
455
    if is_executed == MESSAGE_NOT_EXECUTED:
456
      if not exc_info:
457
        exc_info = sys.exc_info()
458 459
      if self.on_error_callback is not None:
        self.exc_info = exc_info
460 461
      self.exc_type = exc_info[0]
      if exc_info[0] is None:
462 463 464
        # Raise a dummy exception, ignore it, fetch it and use it as if it was the error causing message non-execution. This will help identifyting the cause of this misbehaviour.
        try:
          raise Exception, 'Message execution failed, but there is no exception to explain it. This is a dummy exception so that one can track down why we end up here outside of an exception handling code path.'
465 466
        except Exception:
          exc_info = sys.exc_info()
467 468
      elif exc_info[0] is SkippedMessage:
        return
469 470 471 472 473
      if log:
        LOG('ActivityTool', WARNING, 'Could not call method %s on object %s. Activity created at:\n%s' % (self.method_id, self.object_path, self.call_traceback), error=exc_info)
        # push the error in ZODB error_log
        error_log = getattr(context, 'error_log', None)
        if error_log is not None:
474
          error_log.raising(exc_info)
475
      self.traceback = ''.join(ExceptionFormatter.format_exception(*exc_info)[1:])
476 477 478 479

  def getExecutionState(self):
    return self.is_executed

480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498
class GroupedMessage(object):
  __slots__ = 'object', '_message', 'result', 'exc_info'

  def __init__(self, object, message):
    self.object = object
    self._message = message

  args = property(lambda self: self._message.args)
  kw = property(lambda self: self._message.kw)

  def raised(self, exc_info=None):
    self.exc_info = exc_info or sys.exc_info()
    try:
      del self.result
    except AttributeError:
      pass

# XXX: Allowing restricted code to implement a grouping method is questionable
#      but there already exist some.
499
  __parent__ = property(lambda self: self.object) # for object
500
  _guarded_writes = 1 # for result
501
allow_class(GroupedMessage)
502 503 504

# Activity Registration
def activity_dict():
505
  from Activity import SQLDict, SQLQueue, SQLJoblib
506 507 508 509
  return {k: getattr(v, k)() for k, v in locals().iteritems()}
activity_dict = activity_dict()


Vincent Pelletier's avatar
Vincent Pelletier committed
510 511
class Method(object):
  __slots__ = (
512 513
    '_portal_activities',
    '_passive_url',
514
    '_passive_uid',
515 516 517 518 519 520
    '_activity',
    '_active_process',
    '_active_process_uid',
    '_kw',
    '_method_id',
    '_request',
Vincent Pelletier's avatar
Vincent Pelletier committed
521
  )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
522

523
  def __init__(self, portal_activities, passive_url, passive_uid, activity,
524
      active_process, active_process_uid, kw, method_id, request):
525 526
    self._portal_activities = portal_activities
    self._passive_url = passive_url
527
    self._passive_uid = passive_uid
528 529 530 531 532 533
    self._activity = activity
    self._active_process = active_process
    self._active_process_uid = active_process_uid
    self._kw = kw
    self._method_id = method_id
    self._request = request
Jean-Paul Smets's avatar
Jean-Paul Smets committed
534 535

  def __call__(self, *args, **kw):
536
    portal_activities = self._portal_activities
537
    m = Message(
538
      url=self._passive_url,
539
      document_uid=self._passive_uid,
540 541 542 543
      active_process=self._active_process,
      active_process_uid=self._active_process_uid,
      activity_kw=self._kw,
      method_id=self._method_id,
544 545
      args=args,
      kw=kw,
546
      request=self._request,
547 548
      portal_activities=portal_activities,
    )
549
    portal_activities.getActivityBuffer().deferredQueueMessage(
550
      portal_activities, activity_dict[self._activity], m)
551 552
    if portal_activities.activity_tracking and m.is_registered:
      activity_tracking_logger.info('queuing message: activity=%s, object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % (self._activity, '/'.join(m.object_path), m.method_id, m.args, m.kw, m.activity_kw, m.user_name))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
553

554 555
allow_class(Method)

Vincent Pelletier's avatar
Vincent Pelletier committed
556 557 558
class ActiveWrapper(object):
  __slots__ = (
    '__portal_activities',
559
    '__passive_url',
560
    '__passive_uid',
Vincent Pelletier's avatar
Vincent Pelletier committed
561 562
    '__activity',
    '__active_process',
563
    '__active_process_uid',
Vincent Pelletier's avatar
Vincent Pelletier committed
564 565 566
    '__kw',
    '__request',
  )
567 568 569
  # Shortcut security lookup (avoid calling __getattr__)
  __parent__ = None

570
  def __init__(self, portal_activities, url, uid, activity, active_process,
571
      active_process_uid, kw, request):
572
    # second parameter can be an object or an object's path
573
    self.__portal_activities = portal_activities
574
    self.__passive_url = url
575
    self.__passive_uid = uid
576 577
    self.__activity = activity
    self.__active_process = active_process
578
    self.__active_process_uid = active_process_uid
579 580 581 582 583 584
    self.__kw = kw
    self.__request = request

  def __getattr__(self, name):
    return Method(
      self.__portal_activities,
585
      self.__passive_url,
586
      self.__passive_uid,
587 588
      self.__activity,
      self.__active_process,
589
      self.__active_process_uid,
590 591 592 593
      self.__kw,
      name,
      self.__request,
    )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
594

595
  def __repr__(self):
596 597
    return '<%s at 0x%x to %s>' % (self.__class__.__name__, id(self),
                                   self.__passive_url)
598

599 600 601
# True when activities cannot be executing any more.
has_processed_shutdown = False

602 603 604 605 606 607 608 609
def cancelProcessShutdown():
  """
    This method reverts the effect of calling "process_shutdown" on activity
    tool.
  """
  global has_processed_shutdown
  is_running_lock.release()
  has_processed_shutdown = False
610

611 612 613
# Due to a circular import dependency between this module and
# Products.ERP5Type.Core.Folder, both modules must import after the definitions
# of getCurrentNode and Folder (the later is a base class of BaseTool).
614 615 616 617 618 619
from Products.ERP5Type.Tool.BaseTool import BaseTool
# Activating a path means we tried to avoid loading useless
# data in cache so there would be no gain to expect.
# And all nodes are likely to have tools already loaded.
NO_DEFAULT_NODE_PREFERENCE = str, BaseTool

620
class ActivityTool (BaseTool):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
621
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
622 623 624 625 626 627 628 629 630 631 632 633
    ActivityTool is the central point for activity management.

    Improvement to consider to reduce locks:

      Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
              thus allowing all SQL transaction to happen in a very short time
              (this would also be a great way of using MyISAM tables)

      Idea 2: do the same at the level of ActivityTool

      Idea 3: do the same at the level of each activity (ie. queueMessage
              accumulates and fires messages at the end of the transactino)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
634 635 636
    """
    id = 'portal_activities'
    meta_type = 'CMF Activity Tool'
637
    portal_type = 'Activity Tool'
638
    title = 'Activities'
639
    allowed_types = ( 'CMF Active Process', )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
640 641
    security = ClassSecurityInfo()

642 643
    manage_options = tuple(
                     [ { 'label' : 'Overview', 'action' : 'manage_overview' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
644
                     , { 'label' : 'Activities', 'action' : 'manageActivities' }
645
                     , { 'label' : 'LoadBalancing', 'action' : 'manageLoadBalancing'}
646
                     , { 'label' : 'Advanced', 'action' : 'manageActivitiesAdvanced' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
647
                     ,
648
                     ] + list(BaseTool.manage_options))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
649 650 651 652

    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
    manageActivities = DTMLFile( 'dtml/manageActivities', globals() )

653 654 655
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivitiesAdvanced' )
    manageActivitiesAdvanced = DTMLFile( 'dtml/manageActivitiesAdvanced', globals() )

656 657
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
    manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )
658

659
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageLoadBalancing' )
660
    manageLoadBalancing = DTMLFile( 'dtml/manageLoadBalancing', globals(), _getCurrentNode=getCurrentNode)
661

662 663
    distributingNode = ''
    _nodes = ()
664 665
    _family_list = ()
    _node_family_dict = None
666
    activity_creation_trace = False
667
    activity_tracking = False
668
    activity_timing_log = False
669
    activity_failure_mail_notification = True
670
    cancel_and_invoke_links_hidden = False
671 672 673 674

    # Filter content (ZMI))
    def filtered_meta_types(self, user=None):
        # Filters the list of available meta types.
675
        all = BaseTool.filtered_meta_types(self)
676 677 678 679 680 681
        meta_types = []
        for meta_type in self.all_meta_types():
            if meta_type['name'] in self.allowed_types:
                meta_types.append(meta_type)
        return meta_types

682 683 684
    def getSQLConnection(self):
      return self.aq_inner.aq_parent.cmf_activity_sql_connection()

685 686 687 688 689 690 691 692 693 694 695 696 697 698
    def maybeMigrateConnectionClass(self):
      connection_id = 'cmf_activity_sql_connection'
      sql_connection = getattr(self, connection_id, None)
      if (sql_connection is not None and
          not isinstance(sql_connection, ActivityConnection)):
        # SQL Connection migration is needed
        LOG('ActivityTool', WARNING, "Migrating MySQL Connection class")
        parent = aq_parent(aq_inner(sql_connection))
        parent._delObject(sql_connection.getId())
        new_sql_connection = ActivityConnection(connection_id,
                                                sql_connection.title,
                                                sql_connection.connection_string)
        parent._setObject(connection_id, new_sql_connection)

699
    security.declarePrivate('initialize')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
700
    def initialize(self):
701
      self.maybeMigrateConnectionClass()
702 703
      for activity in activity_dict.itervalues():
        activity.initialize(self, clear=False)
704 705 706 707 708 709 710 711 712 713 714 715 716 717
      # Remove old skin if any.
      skins_tool = self.getPortalObject().portal_skins
      name = 'activity'
      if (getattr(skins_tool.get(name), '_dirpath', None)
          == 'Products.CMFActivity:skins/activity'):
        for selection, skins in skins_tool.getSkinPaths():
          skins = skins.split(',')
          try:
            skins.remove(name)
          except ValueError:
            continue
          skins_tool.manage_skinLayers(
            add_skin=1, skinname=selection, skinpath=skins)
        skins_tool._delObject(name)
718

719 720 721
    def _callSafeFunction(self, batch_function):
      return batch_function()

722 723
    security.declareProtected(Permissions.manage_properties, 'isSubscribed')
    def isSubscribed(self):
724 725 726 727 728 729
      """
      return True, if we are subscribed to TimerService.
      Otherwise return False.
      """
      service = getTimerService(self)
      if service:
730
        path = '/'.join(self.getPhysicalPath())
731 732 733
        return path in service.lisSubscriptions()
      LOG('ActivityTool', INFO, 'TimerService not available')
      return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
734

735
    security.declareProtected(Permissions.manage_properties, 'subscribe')
736
    def subscribe(self, REQUEST=None, RESPONSE=None):
737 738
        """ subscribe to the global Timer Service """
        service = getTimerService(self)
739
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
740
        if not service:
741
            LOG('ActivityTool', INFO, 'TimerService not available')
742 743 744 745
            url += urllib.quote('TimerService not available')
        else:
            service.subscribe(self)
            url += urllib.quote("Subscribed to Timer Service")
746 747
        if RESPONSE is not None:
            RESPONSE.redirect(url)
748 749

    security.declareProtected(Permissions.manage_properties, 'unsubscribe')
750
    def unsubscribe(self, REQUEST=None, RESPONSE=None):
751 752
        """ unsubscribe from the global Timer Service """
        service = getTimerService(self)
753
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
754
        if not service:
755
            LOG('ActivityTool', INFO, 'TimerService not available')
756 757 758 759
            url += urllib.quote('TimerService not available')
        else:
            service.unsubscribe(self)
            url += urllib.quote("Unsubscribed from Timer Service")
760 761
        if RESPONSE is not None:
            RESPONSE.redirect(url)
762

763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788
    security.declareProtected(Permissions.manage_properties, 'isActivityTrackingEnabled')
    def isActivityTrackingEnabled(self):
      return self.activity_tracking

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTracking')
    def manage_enableActivityTracking(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity tracing.
        """
        self.activity_tracking = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Tracking log enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTracking')
    def manage_disableActivityTracking(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity tracing.
        """
        self.activity_tracking = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Tracking log disabled')
          RESPONSE.redirect(url)

789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814
    security.declareProtected(Permissions.manage_properties, 'isActivityMailNotificationEnabled')
    def isActivityMailNotificationEnabled(self):
      return self.activity_failure_mail_notification

    security.declareProtected(Permissions.manage_properties, 'manage_enableMailNotification')
    def manage_enableMailNotification(self, REQUEST=None, RESPONSE=None):
        """
          Enable mail notification when activity fails.
        """
        self.activity_failure_mail_notification = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Mail notification enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableMailNotification')
    def manage_disableMailNotification(self, REQUEST=None, RESPONSE=None):
        """
          Disable mail notification when activity fails.
        """
        self.activity_failure_mail_notification = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Mail notification disabled')
          RESPONSE.redirect(url)

815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866
    security.declareProtected(Permissions.manage_properties, 'isActivityTimingLoggingEnabled')
    def isActivityTimingLoggingEnabled(self):
      return self.activity_timing_log

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTimingLogging')
    def manage_enableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity timing logging.
        """
        self.activity_timing_log = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Timing log enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTimingLogging')
    def manage_disableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity timing logging.
        """
        self.activity_timing_log = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Timing log disabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'isActivityCreationTraceEnabled')
    def isActivityCreationTraceEnabled(self):
      return self.activity_creation_trace

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityCreationTrace')
    def manage_enableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity creation trace.
        """
        self.activity_creation_trace = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Activity creation trace enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityCreationTrace')
    def manage_disableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity creation trace.
        """
        self.activity_creation_trace = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Activity creation trace disabled')
          RESPONSE.redirect(url)

867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890
    security.declareProtected(Permissions.manage_properties, 'isCancelAndInvokeLinksHidden')
    def isCancelAndInvokeLinksHidden(self):
      return self.cancel_and_invoke_links_hidden

    security.declareProtected(Permissions.manage_properties, 'manage_hideCancelAndInvokeLinks')
    def manage_hideCancelAndInvokeLinks(self, REQUEST=None, RESPONSE=None):
        """
        """
        self.cancel_and_invoke_links_hidden = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Cancel and invoke links hidden')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_showCancelAndInvokeLinks')
    def manage_showCancelAndInvokeLinks(self, REQUEST=None, RESPONSE=None):
        """
        """
        self.cancel_and_invoke_links_hidden = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Cancel and invoke links visible')
          RESPONSE.redirect(url)

891
    security.declarePrivate('manage_beforeDelete')
892 893
    def manage_beforeDelete(self, item, container):
        self.unsubscribe()
894
        BaseTool.inheritedAttribute('manage_beforeDelete')(self, item, container)
895 896
    
    security.declarePrivate('manage_afterAdd')
897 898
    def manage_afterAdd(self, item, container):
        self.subscribe()
899
        BaseTool.inheritedAttribute('manage_afterAdd')(self, item, container)
900

901
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getServerAddress')
902 903 904 905
    def getServerAddress(self):
        """
        Backward-compatibility code only.
        """
906 907 908 909 910
        warnings.warn(
          '"getServerAddress" class method is deprecated, use "getServerAddress" module-level function instead.',
          DeprecationWarning,
          stacklevel=2,
        )
911
        return getServerAddress()
912

913
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getCurrentNode')
914
    def getCurrentNode(self):
915 916 917
        """
        Backward-compatibility code only.
        """
918 919 920 921 922
        warnings.warn(
          '"getCurrentNode" class method is deprecated, use "getCurrentNode" module-level function instead.',
          DeprecationWarning,
          stacklevel=2,
        )
923
        return getCurrentNode()
924

925
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getDistributingNode')
926 927 928 929
    def getDistributingNode(self):
        """ Return the distributingNode """
        return self.distributingNode

930 931 932 933 934 935 936 937 938 939 940 941 942
    def getNodeList(self, role=None):
      node_dict = self.getNodeDict()
      if role is None:
        result = [x for x in node_dict.keys()]
      else:
        result = [node_id for node_id, node_role in node_dict.items() if node_role == role]
      result.sort()
      return result

    def getNodeDict(self):
      nodes = self._nodes
      if isinstance(nodes, tuple):
        new_nodes = OIBTree()
943
        new_nodes.update([(x, ROLE_PROCESSING) for x in nodes])
944 945 946
        self._nodes = nodes = new_nodes
      return nodes

947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063
    def _getNodeFamilyIdDict(self):
      result = self._node_family_dict
      if result is None:
        result = self._node_family_dict = OOBTree()
      return result

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getCurrentNodeFamilyIdSet')
    def getCurrentNodeFamilyIdSet(self):
      """
      Returns the tuple of family ids current node is member of.
      """
      return self._getNodeFamilyIdDict().get(getCurrentNode(), ())

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getCurrentNodeFamilyNameSet')
    def getCurrentNodeFamilyNameSet(self):
      """
      Returns the tuple of family names current node is member of.
      """
      return [
        self._family_list[-x - 1]
        for x in self._getNodeFamilyIdDict().get(getCurrentNode(), ())
      ]

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getFamilyId')
    def getFamilyId(self, name):
      """
      Raises ValueError for unknown family names.
      """
      # First family is -1, second is -2, etc.
      return -self._family_list.index(name) - 1

    security.declareProtected(CMFCorePermissions.ManagePortal, 'addNodeToFamily')
    def addNodeToFamily(self, node_id, family_name):
      """
      Silently does nothing if node is already a member of family_name.
      """
      family_id = self.getFamilyId(family_name)
      node_family_id_dict = self._getNodeFamilyIdDict()
      family_id_list = node_family_id_dict.get(node_id, ())
      if family_id not in family_id_list:
        node_family_id_dict[node_id] = family_id_list + (family_id, )

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_addNodeSetToFamily')
    def manage_addNodeSetToFamily(self, family_new_node_list, REQUEST):
      """
      Add selected nodes to family.
      """
      family_name = REQUEST['manage_addNodeSetToFamily']
      if isinstance(family_new_node_list, basestring):
        family_new_node_list = [family_new_node_list]
      for node_id in family_new_node_list:
        self.addNodeToFamily(node_id, family_name)
      REQUEST.RESPONSE.redirect(
        REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message=' +
        urllib.quote('Nodes added to family.'),
      )

    security.declareProtected(CMFCorePermissions.ManagePortal, 'removeNodeFromFamily')
    def removeNodeFromFamily(self, node_id, family_name):
      """
      Silently does nothing if node is not member of family_name.
      """
      family_id = self.getFamilyId(family_name)
      node_family_id_dict = self._getNodeFamilyIdDict()
      family_id_list = node_family_id_dict.get(node_id, ())
      if family_id in family_id_list:
        node_family_id_dict[node_id] = tuple(
          x
          for x in family_id_list
          if x != family_id
        )

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_removeNodeSetFromFamily')
    def manage_removeNodeSetFromFamily(self, REQUEST):
      """
      Remove selected nodes from family.
      """
      family_name = REQUEST['manage_removeNodeSetFromFamily']
      node_to_remove_list = REQUEST['family_member_set_' + family_name]
      if isinstance(node_to_remove_list, basestring):
        node_to_remove_list = [node_to_remove_list]
      for node_id in node_to_remove_list:
        self.removeNodeFromFamily(node_id, family_name)
      REQUEST.RESPONSE.redirect(
        REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message=' +
        urllib.quote('Nodes removed from family.'),
      )

    def _checkFamilyName(self, name):
      if not isinstance(name, basestring):
        raise TypeError('Name must be a string')
      if name in self._family_list:
        raise ValueError('Already in use')
      if name in ('', 'same'):
        raise ValueError('Reserved family name')

    security.declareProtected(CMFCorePermissions.ManagePortal, 'createFamily')
    def createFamily(self, name):
      """
      Raises ValueError if family already exists.
      """
      self._checkFamilyName(name)
      new_family_list = []
      for existing_name in self._family_list:
        if existing_name is None and name is not None:
          new_family_list.append(name)
          name = None
        else:
          new_family_list.append(existing_name)
      if name is None:
        # A free spot has been recycled.
        self._family_list = tuple(new_family_list)
      else:
        # No free spot, append.
        self._family_list += (name, )

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_createFamily')
1064
    def manage_createFamily(self, new_family_name, REQUEST, family_new_node_list=None):
1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134
      """Create a family"""
      redirect_url = REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message='
      if family_new_node_list is None:
        family_new_node_list = []
      elif isinstance(family_new_node_list, basestring):
        family_new_node_list = [family_new_node_list]
      try:
        self.createFamily(new_family_name)
        for node_id in family_new_node_list:
          self.addNodeToFamily(node_id, new_family_name)
      except ValueError as exc:
        raise Redirect(redirect_url + urllib.quote(str(exc)))
      REQUEST.RESPONSE.redirect(redirect_url + urllib.quote('Family created.'))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'renameFamily')
    def renameFamily(self, old_name, new_name):
      """
      Raises ValueError if old_name does not exist.
      """
      self._checkFamilyName(new_name)
      family_list = self._family_list
      if old_name not in family_list:
        raise ValueError('Unknown family')
      self._family_list = tuple(
        new_name if x == old_name else x
        for x in family_list
      )

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_renameFamily')
    def manage_renameFamily(self, REQUEST):
      """Rename a family"""
      redirect_url = REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message='
      old_family_name = REQUEST['manage_renameFamily']
      new_family_name = REQUEST['family_new_name_' + old_family_name]
      try:
        self.renameFamily(old_family_name, new_family_name)
      except ValueError as exc:
        raise Redirect(redirect_url + urllib.quote(str(exc)))
      REQUEST.RESPONSE.redirect(redirect_url + urllib.quote('Family renamed.'))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'deleteFamily')
    def deleteFamily(self, name):
      """
      Raises ValueError if name does not exist.
      """
      for node_id in self._getNodeFamilyIdDict():
        self.removeNodeFromFamily(node_id, name)
      self._family_list = tuple(
        None if x == name else x
        for x in self._family_list
      )

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_deleteFamily')
    def manage_deleteFamily(self, REQUEST):
      """Delete families"""
      redirect_url = REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message='
      family_name = REQUEST['manage_deleteFamily']
      try:
        self.deleteFamily(family_name)
      except ValueError as exc:
        raise Redirect(redirect_url + urllib.quote(str(exc)))
      REQUEST.RESPONSE.redirect(redirect_url + urllib.quote('Family deleted'))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getFamilyNameList')
    def getFamilyNameList(self):
      """
      Return the list of existing family names.
      """
      return [x for x in self._family_list if x is not None]

1135
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getFamilyNodeList')
1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146
    def getFamilyNodeList(self, family_name):
      """
      Return the list of node names in given family.
      """
      family_id = self.getFamilyId(family_name)
      return [
        x
        for x, y in self._getNodeFamilyIdDict().items()
        if family_id in y
      ]

1147 1148
    def registerNode(self, node):
      node_dict = self.getNodeDict()
1149 1150 1151 1152
      if node not in node_dict:
        if node_dict:
          # BBB: check if our node was known by address (processing and/or
          # distribution), and migrate it.
1153
          server_address = getServerAddress()
1154 1155 1156 1157 1158 1159
          role = node_dict.pop(server_address, ROLE_IDLE)
          if self.distributingNode == server_address:
            self.distributingNode = node
        else:
          # We are registering the first node, make
          # it both the distributing node and a processing node.
1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171
          role = ROLE_PROCESSING
          self.distributingNode = node
        self.updateNode(node, role)

    def updateNode(self, node, role):
      node_dict = self.getNodeDict()
      node_dict[node] = role

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getProcessingNodeList')
    def getProcessingNodeList(self):
      return self.getNodeList(role=ROLE_PROCESSING)

1172
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getIdleNodeList')
1173 1174
    def getIdleNodeList(self):
      return self.getNodeList(role=ROLE_IDLE)
1175

1176 1177
    def _isValidNodeName(self, node_name) :
      """Check we have been provided a good node name"""
1178
      return isinstance(node_name, str)
1179

1180
    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_setDistributingNode')
1181
    def manage_setDistributingNode(self, distributingNode, REQUEST=None):
1182
        """ set the distributing node """
1183
        if not distributingNode or self._isValidNodeName(distributingNode):
1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196
          self.distributingNode = distributingNode
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Distributing Node successfully changed."))
        else :
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Malformed Distributing Node."))

1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252
    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_delNode')
    def manage_delNode(self, unused_node_list=None, REQUEST=None):
      """ delete selected unused nodes """
      processing_node = self.getDistributingNode()
      updated_processing_node = False
      if unused_node_list is not None:
        node_dict = self.getNodeDict()
        for node in unused_node_list:
          if node in node_dict:
            del node_dict[node]
          if node == processing_node:
            self.processing_node = ''
            updated_processing_node = True
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing deleted."
        else:
          message = "Deleted nodes %r." % (unused_node_list, )
        if updated_processing_node:
          message += "Disabled distributing node because it was deleted."
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_addToProcessingList')
    def manage_addToProcessingList(self, unused_node_list=None, REQUEST=None):
      """ Change one or more idle nodes into processing nodes """
      if unused_node_list is not None:
        for node in unused_node_list:
          self.updateNode(node, ROLE_PROCESSING)
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing done."
        else:
          message = "Nodes now procesing: %r." % (unused_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_removeFromProcessingList')
    def manage_removeFromProcessingList(self, processing_node_list=None, REQUEST=None):
      """ Change one or more procesing nodes into idle nodes """
      if processing_node_list is not None:
        for node in processing_node_list:
          self.updateNode(node, ROLE_IDLE)
      if REQUEST is not None:
        if processing_node_list is None:
          message = "No used node selected, nothing done."
        else:
          message = "Nodes now unused %r." % (processing_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))
1253

1254
    security.declarePrivate('process_shutdown')
1255 1256 1257 1258 1259
    def process_shutdown(self, phase, time_in_phase):
        """
          Prevent shutdown from happening while an activity queue is
          processing a batch.
        """
1260
        global has_processed_shutdown
1261 1262
        if phase == 3 and not has_processed_shutdown:
          has_processed_shutdown = True
1263 1264 1265 1266
          LOG('CMFActivity', INFO, "Shutdown: Waiting for activities to finish.")
          is_running_lock.acquire()
          LOG('CMFActivity', INFO, "Shutdown: Activities finished.")

1267
    security.declareProtected(CMFCorePermissions.ManagePortal, 'process_timer')
1268
    def process_timer(self, tick, interval, prev="", next=""):
1269 1270 1271 1272 1273 1274 1275 1276
      """
      Call distribute() if we are the Distributing Node and call tic()
      with our node number.
      This method is called by TimerService in the interval given
      in zope.conf. The Default is every 5 seconds.
      """
      # Prevent TimerService from starting multiple threads in parallel
      if timerservice_lock.acquire(0):
1277
        try:
1278 1279 1280 1281 1282 1283
          # make sure our skin is set-up. On CMF 1.5 it's setup by acquisition,
          # but on 2.2 it's by traversal, and our site probably wasn't traversed
          # by the timerserver request, which goes into the Zope Control_Panel
          # calling it a second time is a harmless and cheap no-op.
          # both setupCurrentSkin and REQUEST are acquired from containers.
          self.setupCurrentSkin(self.REQUEST)
1284
          old_sm = getSecurityManager()
1285
          try:
1286 1287 1288 1289 1290
            # get owner of portal_catalog, so normally we should be able to
            # have the permission to invoke all activities
            user = self.portal_catalog.getWrappedOwner()
            newSecurityManager(self.REQUEST, user)

1291
            currentNode = getCurrentNode()
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302
            self.registerNode(currentNode)
            processing_node_list = self.getNodeList(role=ROLE_PROCESSING)

            # only distribute when we are the distributingNode
            if self.getDistributingNode() == currentNode:
              self.distribute(len(processing_node_list))

            # SkinsTool uses a REQUEST cache to store skin objects, as
            # with TimerService we have the same REQUEST over multiple
            # portals, we clear this cache to make sure the cache doesn't
            # contains skins from another portal.
1303
            try:
1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315
              self.getPortalObject().portal_skins.changeSkin(None)
            except AttributeError:
              pass

            # call tic for the current processing_node
            # the processing_node numbers are the indices of the elements
            # in the node tuple +1 because processing_node starts form 1
            if currentNode in processing_node_list:
              self.tic(processing_node_list.index(currentNode) + 1)
          except:
            # Catch ALL exception to avoid killing timerserver.
            LOG('ActivityTool', ERROR, 'process_timer received an exception',
1316
                error=True)
1317 1318
          finally:
            setSecurityManager(old_sm)
Jérome Perrin's avatar
Jérome Perrin committed
1319
        finally:
1320
          timerservice_lock.release()
1321

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1322 1323 1324 1325 1326 1327
    security.declarePublic('distribute')
    def distribute(self, node_count=1):
      """
        Distribute load
      """
      # Call distribute on each queue
1328
      for activity in activity_dict.itervalues():
1329
        activity.distribute(aq_inner(self), node_count)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1330

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1331
    security.declarePublic('tic')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1332
    def tic(self, processing_node=1, force=0):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1333 1334
      """
        Starts again an activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1335
        processing_node starts from 1 (there is not node 0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1336
      """
1337
      global active_threads
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1338 1339

      # return if the number of threads is too high
1340
      # else, increase the number of active_threads and continue
1341 1342 1343 1344 1345 1346
      with tic_lock:
        too_many_threads = (active_threads >= max_active_threads)
        if not too_many_threads or force:
          active_threads += 1
        else:
          raise RuntimeError, 'Too many threads'
1347

1348
      inner_self = aq_inner(self)
1349

1350
      try:
1351
        # Loop as long as there are activities. Always process the queue with
1352 1353
        # "highest" priority. If several queues have same highest priority,
        # use a round-robin algorithm.
1354 1355
        # XXX: We always finish by iterating over all queues, in case that
        #      getPriority does not see messages dequeueMessage would process.
1356
        activity_list = activity_dict.values()
1357
        def sort_key(activity):
1358 1359
          return activity.getPriority(self, processing_node,
            node_family_id_set)
1360 1361
        while is_running_lock.acquire(0):
          try:
1362 1363
            # May have changed since previous iteration.
            node_family_id_set = self.getCurrentNodeFamilyIdSet()
1364 1365
            activity_list.sort(key=sort_key) # stable sort
            for i, activity in enumerate(activity_list):
1366
              # Transaction processing is the responsability of the activity
1367 1368
              if not activity.dequeueMessage(inner_self, processing_node,
                node_family_id_set):
1369
                activity_list.append(activity_list.pop(i))
1370 1371 1372 1373 1374
                break
            else:
              break
          finally:
            is_running_lock.release()
1375 1376
      finally:
        # decrease the number of active_threads
1377 1378
        with tic_lock:
          active_threads -= 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1379

1380
    def hasActivity(self, *args, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1381
      # Check in each queue if the object has deferred tasks
1382
      # if not argument is provided, then check on self
1383 1384
      if args:
        obj, = args
1385
      else:
1386
        obj = self
1387 1388 1389 1390 1391 1392
      path = None if obj is None else '/'.join(obj.getPhysicalPath())
      db = self.getSQLConnection()
      quote = db.string_literal
      return bool(db.query("(%s)" % ") UNION ALL (".join(
        activity.hasActivitySQL(quote, path=path, **kw)
        for activity in activity_dict.itervalues()))[1])
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1393

1394
    security.declarePrivate('getActivityBuffer')
1395 1396 1397 1398 1399 1400 1401 1402
    def getActivityBuffer(self, create_if_not_found=True):
      """
        Get activtity buffer for this thread for this activity tool.
        If no activity buffer is found at lowest level and create_if_not_found
        is True, create one.
        Intermediate level is unconditionaly created if non existant because
        chances are it will be used in the instance life.
      """
1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417
      # XXX: using a volatile attribute to cache getPhysicalPath result.
      # This cache may need invalidation if all the following is
      # simultaneously true:
      # - ActivityTool instances can be moved in object tree
      # - moved instance is used to get access to its activity buffer
      # - another instance is put in the place of the original, and used to
      #   access its activity buffer
      # ...which seems currently unlikely, and as such is left out.
      try:
        my_instance_key = self._v_physical_path
      except AttributeError:
        # Safeguard: make sure we are wrapped in acquisition context before
        # using our path as an activity tool instance-wide identifier.
        assert getattr(self, 'aq_self', None) is not None
        self._v_physical_path = my_instance_key = self.getPhysicalPath()
1418
      thread_activity_buffer = global_activity_buffer[my_instance_key]
1419
      my_thread_key = get_ident()
1420 1421 1422
      try:
        return thread_activity_buffer[my_thread_key]
      except KeyError:
1423
        if create_if_not_found:
1424
          buffer = ActivityBuffer()
1425 1426 1427
        else:
          buffer = None
        thread_activity_buffer[my_thread_key] = buffer
1428
        return buffer
1429

1430 1431
    def activateObject(self, object, activity=DEFAULT_ACTIVITY,
                       active_process=None, serialization_tag=None,
1432
                       node=None, uid=None, **kw):
1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443
      if active_process is None:
        active_process_uid = None
      elif isinstance(active_process, str):
        # TODO: deprecate
        active_process_uid = self.unrestrictedTraverse(active_process).getUid()
      else:
        active_process_uid = active_process.getUid()
        active_process = active_process.getPhysicalPath()
      if isinstance(object, str):
        url = tuple(object.split('/'))
      else:
1444 1445 1446
        if uid is not None:
          raise ValueError
        uid = getattr(aq_base(object), 'uid', None)
1447
        url = object.getPhysicalPath()
1448 1449
      if serialization_tag is not None:
        kw['serialization_tag'] = serialization_tag
1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461
      while 1: # not a loop
        if node is None:
          # The caller lets us decide whether we prefer to execute on same node
          # (to increase the efficiency of the ZODB Storage cache).
          if (isinstance(object, NO_DEFAULT_NODE_PREFERENCE)
              # A grouped activity is the sign we may have many of them so make
              # sure that this node won't overprioritize too many activities.
              or kw.get('group_method_id', '') != ''):
            break
        elif node == '':
          break
        elif node != 'same':
1462 1463
          kw['node'] = self.getFamilyId(node)
          break
1464 1465 1466 1467 1468
        try:
          kw['node'] = 1 + self.getNodeList(
            role=ROLE_PROCESSING).index(getCurrentNode())
        except ValueError:
          pass
1469
        break
1470
      return ActiveWrapper(self, url, uid, activity,
1471
                           active_process, active_process_uid, kw,
1472
                           getattr(self, 'REQUEST', None))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1473

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1474
    def getRegisteredMessageList(self, activity):
1475
      activity_buffer = self.getActivityBuffer(create_if_not_found=False)
1476
      if activity_buffer is not None:
1477 1478
        #activity_buffer._register() # This is required if flush flush is called outside activate
        return activity.getRegisteredMessageList(activity_buffer,
1479
                                                 aq_inner(self))
1480 1481
      else:
        return []
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1482

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1483
    def unregisterMessage(self, activity, message):
1484 1485 1486
      activity_buffer = self.getActivityBuffer()
      #activity_buffer._register()
      return activity.unregisterMessage(activity_buffer, aq_inner(self), message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1487

1488
    def flush(self, obj, invoke=0, **kw):
1489
      self.getActivityBuffer()
1490 1491
      if isinstance(obj, tuple):
        object_path = obj
1492
      else:
1493
        object_path = obj.getPhysicalPath()
1494
      for activity in activity_dict.itervalues():
1495
        activity.flush(aq_inner(self), object_path, invoke=invoke, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1496 1497

    def invoke(self, message):
1498
      if self.activity_tracking:
1499
        activity_tracking_logger.info('invoking message: object_path=%s, method_id=%s, args=%r, kw=%r, activity_kw=%r, user_name=%s' % ('/'.join(message.object_path), message.method_id, message.args, message.kw, message.activity_kw, message.user_name))
1500
      restore_request = False
1501 1502
      if getattr(self, 'aq_chain', None) is not None:
        # Grab existing acquisition chain and extrach base objects.
1503
        base_chain = [aq_base(x) for x in self.aq_chain]
1504 1505 1506
        # Grab existig request (last chain item) and create a copy.
        request_container = base_chain.pop()
        request = request_container.REQUEST
1507 1508 1509 1510 1511 1512 1513 1514
        # Generate PARENTS value. Sadly, we cannot reuse base_chain since
        # PARENTS items must be wrapped in acquisition
        parents = []
        application = self.getPhysicalRoot().aq_base
        for parent in self.aq_chain:
          if parent.aq_base is application:
            break
          parents.append(parent)
1515 1516
        # XXX: REQUEST.clone() requires PARENTS to be set, and it's not when
        # runing unit tests. Recreate it if it does not exist.
1517 1518
        if getattr(request.other, 'PARENTS', None) is None:
          request.other['PARENTS'] = parents
1519
        # XXX: PATH_INFO might not be set when runing unit tests.
1520
        if request.environ.get('PATH_INFO') is None:
1521
          request.environ['PATH_INFO'] = '/Control_Panel/timer_service/process_timer'
1522

1523 1524
        # restore request information
        new_request = request.clone()
1525
        request_info = message.request_info
1526 1527
        # PARENTS is truncated by clone
        new_request.other['PARENTS'] = parents
1528 1529
        if '_script' in request_info:
          new_request._script = request_info['_script']
1530
        if 'SERVER_URL' in request_info:
1531
          new_request.other['SERVER_URL'] = request_info['SERVER_URL']
1532 1533 1534
        if 'VirtualRootPhysicalPath' in request_info:
          new_request.other['VirtualRootPhysicalPath'] = request_info['VirtualRootPhysicalPath']
        if 'HTTP_ACCEPT_LANGUAGE' in request_info:
1535
          new_request.environ['HTTP_ACCEPT_LANGUAGE'] = request_info['HTTP_ACCEPT_LANGUAGE']
1536
          old_request = getRequest()
1537
          restore_request = True
1538
          setRequest(new_request)
1539
          new_request.processInputs()
1540 1541

        new_request_container = request_container.__class__(REQUEST=new_request)
1542 1543 1544 1545 1546 1547 1548 1549
        # Recreate acquisition chain.
        my_self = new_request_container
        base_chain.reverse()
        for item in base_chain:
          my_self = item.__of__(my_self)
      else:
        my_self = self
        LOG('CMFActivity.ActivityTool.invoke', INFO, 'Strange: invoke is called outside of acquisition context.')
1550 1551 1552
      try:
        message(my_self)
      finally:
1553 1554 1555 1556
        if my_self is not self: # We rewrapped self
          # Restore default skin selection
          skinnable = self.getPortalObject()
          skinnable.changeSkin(skinnable.getSkinNameFromRequest(request))
1557 1558
        if restore_request:
          setRequest(old_request)
1559
      if self.activity_tracking:
1560
        activity_tracking_logger.info('invoked message')
1561 1562 1563
      if my_self is not self: # We rewrapped self
        for held in my_self.REQUEST._held:
          self.REQUEST._hold(held)
1564

1565
    def invokeGroup(self, method_id, message_list, activity, merge_duplicate):
1566
      if self.activity_tracking:
1567 1568 1569
        activity_tracking_logger.info(
          'invoking group messages: method_id=%s, paths=%s'
          % (method_id, ['/'.join(m.object_path) for m in message_list]))
1570
      # Invoke a group method.
1571
      message_dict = {}
1572
      path_set = set()
1573 1574 1575
      # Filter the list of messages. If an object is not available, mark its
      # message as non-executable. In addition, expand an object if necessary,
      # and make sure that no duplication happens.
1576
      for m in message_list:
1577 1578
        # alternate method is used to segregate objects which cannot be grouped.
        alternate_method_id = m.activity_kw.get('alternate_method_id')
1579
        try:
1580 1581 1582
          object_list = m.getObjectList(self)
          if object_list is None:
            continue
1583
          message_dict[m] = expanded_object_list = []
1584
          for subobj in object_list:
1585 1586 1587 1588
            if merge_duplicate:
              path = subobj.getPath()
              if path in path_set:
                continue
1589
              path_set.add(path)
1590 1591 1592 1593 1594 1595 1596 1597 1598 1599
            if alternate_method_id is not None \
               and hasattr(aq_base(subobj), alternate_method_id):
              # if this object is alternated,
              # generate a new single active object
              activity_kw = m.activity_kw.copy()
              activity_kw.pop('group_method_id', None)
              activity_kw.pop('group_id', None)
              active_obj = subobj.activate(activity=activity, **activity_kw)
              getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
            else:
1600
              expanded_object_list.append(GroupedMessage(subobj, m))
1601
        except:
1602
          m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
1603

1604
      expanded_object_list = sum(message_dict.itervalues(), [])
1605
      try:
1606
        if expanded_object_list:
1607 1608
          # Store site info
          setSite(self.getParentValue())
1609
          traverse = self.getPortalObject().unrestrictedTraverse
1610
          # FIXME: how to apply security here?
1611
          # NOTE: The callee must update each processed item of
1612 1613 1614 1615 1616
          #       expanded_object_list, by setting:
          #       - 'exc_info' in case of error
          #       - 'result' otherwise, with None or the result to post
          #          on the active process
          #       Skipped item must not be touched.
1617
          traverse(method_id)(expanded_object_list)
1618 1619
      except:
        # In this case, the group method completely failed.
1620
        exc_info = sys.exc_info()
1621
        for m in message_dict:
1622
          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
1623
        LOG('WARNING ActivityTool', 0,
1624
            'Could not call method %s on objects %s' %
1625 1626
            (method_id, [x.object for x in expanded_object_list]),
            error=exc_info)
1627 1628 1629
        error_log = getattr(self, 'error_log', None)
        if error_log is not None:
          error_log.raising(exc_info)
1630
      else:
1631 1632 1633 1634
        # Note there can be partial failures.
        for m, expanded_object_list in message_dict.iteritems():
          result_list = []
          for result in expanded_object_list:
1635 1636 1637 1638 1639 1640
            try:
              if result.result is not None:
                result_list.append(result)
            except AttributeError:
              exc_info = getattr(result, "exc_info", (SkippedMessage,))
              break # failed or skipped message
1641 1642
          else:
            try:
1643 1644 1645
              if result_list and m.active_process:
                active_process = traverse(m.active_process)
                for result in result_list:
1646
                  m.activateResult(active_process, result.result, result.object)
1647
            except:
1648
              exc_info = None
1649
            else:
1650
              m.setExecutionState(MESSAGE_EXECUTED, context=self)
1651
              continue
1652
          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, context=self)
1653
      exc_info = None
1654
      if self.activity_tracking:
1655
        activity_tracking_logger.info('invoked group messages')
1656

1657 1658 1659 1660
    security.declarePrivate('dummyGroupMethod')
    class dummyGroupMethod(object):
      def __bobo_traverse__(self, REQUEST, method_id):
        def group_method(message_list):
1661 1662
          user_name = None
          sm = getSecurityManager()
1663 1664
          try:
            for m in message_list:
1665 1666 1667 1668
              message = m._message
              if user_name != message.user_name:
                user_name = message.user_name
                message.changeUser(user_name, m.object)
1669
              m.result = getattr(m.object, method_id)(*m.args, **m.kw)
1670
          except Exception:
1671
            m.raised()
1672 1673
          finally:
            setSecurityManager(sm)
1674 1675 1676
        return group_method
    dummyGroupMethod = dummyGroupMethod()

1677 1678
    def newMessage(self, activity, path, active_process,
                   activity_kw, method_id, *args, **kw):
1679
      # Some Security Cheking should be made here XXX
1680
      self.getActivityBuffer()
1681
      activity_dict[activity].queueMessage(aq_inner(self),
1682 1683
        Message(path, active_process, activity_kw, method_id, args, kw,
          portal_activities=self))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1684

1685
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageInvoke' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1686 1687 1688 1689 1690 1691
    def manageInvoke(self, object_path, method_id, REQUEST=None):
      """
        Invokes all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
1692
      self.flush(object_path,method_id=method_id,invoke=1)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1693
      if REQUEST is not None:
1694 1695
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1696

1697
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageRestart')
1698 1699 1700 1701
    def manageRestart(self, message_uid_list, activity, REQUEST=None):
      """
        Restart one or several messages
      """
Sebastien Robin's avatar
Sebastien Robin committed
1702 1703
      if not(isinstance(message_uid_list, list)):
        message_uid_list = [message_uid_list]
1704
      if message_uid_list:
1705
        activity_dict[activity].assignMessageList(self.getSQLConnection(),
1706
                                                     0, message_uid_list)
1707 1708 1709 1710
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (
          self.absolute_url(), 'view'))

1711
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageCancel' )
1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722
    def manageCancel(self, object_path, method_id, REQUEST=None):
      """
        Cancel all methods for object "object_path"
      """
      LOG('ActivityTool', WARNING,
          '"manageCancel" method is deprecated, use "manageDelete" instead.')
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
      self.flush(object_path,method_id=method_id,invoke=0)
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (
1723
          self.absolute_url(), 'manageActivities'))
1724 1725 1726 1727 1728 1729

    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageDelete' )
    def manageDelete(self, message_uid_list, activity, REQUEST=None):
      """
        Delete one or several messages
      """
Sebastien Robin's avatar
Sebastien Robin committed
1730 1731
      if not(isinstance(message_uid_list, list)):
        message_uid_list = [message_uid_list]
1732 1733
      activity_dict[activity].deleteMessageList(
        self.getSQLConnection(), message_uid_list)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1734
      if REQUEST is not None:
1735 1736
        return REQUEST.RESPONSE.redirect('%s/%s' % (
          self.absolute_url(), 'view'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1737

1738 1739
    security.declareProtected( CMFCorePermissions.ManagePortal,
                               'manageClearActivities' )
1740
    def manageClearActivities(self, keep=1, RESPONSE=None):
1741
      """
1742
        Recreate tables, clearing all activities
1743
      """
1744 1745
      for activity in activity_dict.itervalues():
        activity.initialize(self, clear=True)
1746

1747 1748 1749
      if RESPONSE is not None:
        return RESPONSE.redirect(self.absolute_url_path() +
          '/manageActivitiesAdvanced?manage_tabs_message=Activities%20Cleared')
1750

1751 1752 1753 1754 1755 1756 1757 1758 1759
    security.declarePublic('getMessageTempObjectList')
    def getMessageTempObjectList(self, **kw):
      """
        Get object list of messages waiting in queues
      """
      message_list = self.getMessageList(**kw)
      object_list = []
      for sql_message in message_list:
        message = self.newContent(temp_object=1)
1760
        message.__dict__.update(**sql_message.__dict__)
1761 1762 1763
        object_list.append(message)
      return object_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1764
    security.declarePublic('getMessageList')
1765
    def getMessageList(self, activity=None, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1766 1767 1768
      """
        List messages waiting in queues
      """
1769 1770
      if activity:
        return activity_dict[activity].getMessageList(aq_inner(self), **kw)
1771

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1772
      message_list = []
1773
      for activity in activity_dict.itervalues():
Sebastien Robin's avatar
Sebastien Robin committed
1774
        try:
1775
          message_list += activity.getMessageList(aq_inner(self), **kw)
Sebastien Robin's avatar
Sebastien Robin committed
1776 1777
        except AttributeError:
          LOG('getMessageList, could not get message from Activity:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1778 1779
      return message_list

1780 1781 1782 1783 1784
    security.declarePublic('countMessageWithTag')
    def countMessageWithTag(self, value):
      """
        Return the number of messages which match the given tag.
      """
1785
      return self.countMessage(tag=value)
Sebastien Robin's avatar
Sebastien Robin committed
1786 1787 1788 1789 1790 1791 1792 1793 1794

    security.declarePublic('countMessage')
    def countMessage(self, **kw):
      """
        Return the number of messages which match the given parameter.

        Parameters allowed:

        method_id : the id of the method
Jérome Perrin's avatar
Jérome Perrin committed
1795
        path : for activities on a particular object
Sebastien Robin's avatar
Sebastien Robin committed
1796 1797 1798
        tag : activities with a particular tag
        message_uid : activities with a particular uid
      """
1799 1800 1801 1802 1803
      db = self.getSQLConnection()
      quote = db.string_literal
      return sum(x for x, in db.query("(%s)" % ") UNION ALL (".join(
        activity.countMessageSQL(quote, **kw)
        for activity in activity_dict.itervalues()))[1])
1804

1805
    security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
1806
    def newActiveProcess(self, REQUEST=None, **kw):
1807 1808
      # note: if one wants to create an Actice Process without ERP5 products,
      # she can call ActiveProcess.addActiveProcess
1809
      obj = self.newContent(portal_type="Active Process", **kw)
1810 1811 1812
      if REQUEST is not None:
        REQUEST['RESPONSE'].redirect( 'manage_main' )
      return obj
1813

1814
    security.declarePrivate('getDependentMessageList')
1815 1816 1817
    def getDependentMessageList(self, message, validating_queue=None):
      activity_kw = message.activity_kw
      db = self.getSQLConnection()
1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838
      quote = db.string_literal
      queries = []
      for activity in activity_dict.itervalues():
        q = activity.getValidationSQL(
          quote, activity_kw, activity is validating_queue)
        if q:
          queries.append(q)
      if queries:
        message_list = []
        for line in Results(db.query("(%s)" % ") UNION ALL (".join(queries))):
          activity = activity_dict[line.activity]
          m = Message.load(line.message,
                           line=line,
                           uid=line.uid,
                           date=line.date,
                           processing_node=line.processing_node)
          if not hasattr(m, 'order_validation_text'): # BBB
            m.order_validation_text = activity.getOrderValidationText(m)
          message_list.append((activity, m))
        return message_list
      return ()
1839

Yoshinori Okuji's avatar
Yoshinori Okuji committed
1840 1841
    # Required for tests (time shift)
    def timeShift(self, delay):
1842
      for activity in activity_dict.itervalues():
1843
        activity.timeShift(aq_inner(self), delay)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1844

1845
InitializeClass(ActivityTool)