ActivityTool.py 11.7 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#                    Jean-Paul Smets-Solane <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

from Products.CMFCore import CMFCorePermissions
30
from Products.ERP5Type.Document.Folder import Folder
Jean-Paul Smets's avatar
Jean-Paul Smets committed
31 32
from AccessControl import ClassSecurityInfo
from Products.CMFCore.utils import UniqueObject, _checkPermission, _getAuthenticatedUser
33
from Globals import InitializeClass, DTMLFile, get_request
Jean-Paul Smets's avatar
Jean-Paul Smets committed
34 35 36 37 38 39 40 41 42 43 44
from Acquisition import aq_base
from DateTime.DateTime import DateTime
import threading

from zLOG import LOG

# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
is_initialized = 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
45
tic_lock = threading.Lock() # A RAM based lock
Jean-Paul Smets's avatar
Jean-Paul Smets committed
46 47 48 49 50 51 52 53 54 55 56 57 58 59

# Activity Registration
activity_dict = {}
activity_list = []

def registerActivity(activity):
  # Must be rewritten to register
  # class and create instance for each activity
  LOG('Init Activity', 0, str(activity.__name__))
  activity_instance = activity()
  activity_list.append(activity_instance)
  activity_dict[activity.__name__] = activity_instance

class Message:
60
  def __init__(self, object, active_process, activity_kw, method_id, args, kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
61 62 63 64
    if type(object) is type('a'):
      self.object_path = object.split('/')
    else:
      self.object_path = object.getPhysicalPath()
65 66 67 68 69 70
    if type(active_process) is type('a'):
      self.active_process = active_process.split('/')
    elif active_process is None:
      self.active_process = None
    else:
      self.active_process = active_process.getPhysicalPath()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
71 72 73 74
    self.activity_kw = activity_kw
    self.method_id = method_id
    self.args = args
    self.kw = kw
Jean-Paul Smets's avatar
Jean-Paul Smets committed
75
    self.is_executed = 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
76 77 78 79
    # User Info ? REQUEST Info ?

  def __call__(self, activity_tool):
    try:
80 81
      LOG('WARNING ActivityTool', 0,
           'Trying to call method %s on object %s' % (self.method_id, self.object_path))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
82
      object = activity_tool.unrestrictedTraverse(self.object_path)
83 84 85 86 87 88
      REQUEST = get_request()
      REQUEST.active_process = self.active_process
      result = getattr(object, self.method_id)(*self.args, **self.kw)
      if REQUEST.active_process is not None:
        active_process = activity_tool.getActiveProcess()
        active_process.activateResult(result) # XXX Allow other method_id in future
Jean-Paul Smets's avatar
Jean-Paul Smets committed
89
      self.is_executed = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
90
    except:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
91
      self.is_executed = 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
92 93 94 95 96 97 98 99
      LOG('WARNING ActivityTool', 0,
           'Could not call method %s on object %s' % (self.method_id, self.object_path))

  def validate(self, activity, activity_tool):
    return activity.validate(activity_tool, self, **self.activity_kw)

class Method:

100
  def __init__(self, passive_self, activity, active_process, kw, method_id):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
101 102
    self.__passive_self = passive_self
    self.__activity = activity
103
    self.__active_process = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
104 105 106 107
    self.__kw = kw
    self.__method_id = method_id

  def __call__(self, *args, **kw):
108
    m = Message(self.__passive_self, self.__active_process, self.__kw, self.__method_id, args, kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
109 110 111 112
    activity_dict[self.__activity].queueMessage(self.__passive_self.portal_activities, m)

class ActiveWrapper:

113
  def __init__(self, passive_self, activity, active_process, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
114 115
    self.__dict__['__passive_self'] = passive_self
    self.__dict__['__activity'] = activity
116
    self.__dict__['__active_process'] = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
117 118 119 120
    self.__dict__['__kw'] = kw

  def __getattr__(self, id):
    return Method(self.__dict__['__passive_self'], self.__dict__['__activity'],
121
                  self.__dict__['__active_process'],
Jean-Paul Smets's avatar
Jean-Paul Smets committed
122 123
                  self.__dict__['__kw'], id)

124
class ActivityTool (Folder, UniqueObject):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
125
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
126 127 128 129 130 131 132 133 134 135 136 137
    ActivityTool is the central point for activity management.

    Improvement to consider to reduce locks:

      Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
              thus allowing all SQL transaction to happen in a very short time
              (this would also be a great way of using MyISAM tables)

      Idea 2: do the same at the level of ActivityTool

      Idea 3: do the same at the level of each activity (ie. queueMessage
              accumulates and fires messages at the end of the transactino)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
138 139 140
    """
    id = 'portal_activities'
    meta_type = 'CMF Activity Tool'
141
    allowed_types = ( 'CMF Active Process', )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
142 143
    security = ClassSecurityInfo()

144 145
    manage_options = tuple(
                     [ { 'label' : 'Overview', 'action' : 'manage_overview' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
146 147
                     , { 'label' : 'Activities', 'action' : 'manageActivities' }
                     ,
148
                     ] + list(Folder.manage_options))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
149 150 151 152

    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
    manageActivities = DTMLFile( 'dtml/manageActivities', globals() )

153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
    manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )

    def __init__(self):
        return Folder.__init__(self, ActivityTool.id)

    # Filter content (ZMI))
    def filtered_meta_types(self, user=None):
        # Filters the list of available meta types.
        all = ActivityTool.inheritedAttribute('filtered_meta_types')(self)
        meta_types = []
        for meta_type in self.all_meta_types():
            if meta_type['name'] in self.allowed_types:
                meta_types.append(meta_type)
        return meta_types

Jean-Paul Smets's avatar
Jean-Paul Smets committed
169 170
    def initialize(self):
      global is_initialized
171
      from Activity import RAMQueue, RAMDict, SQLDict
Jean-Paul Smets's avatar
Jean-Paul Smets committed
172 173 174 175 176
      # Initialize each queue
      for activity in activity_list:
        activity.initialize(self)
      is_initialized = 1

Jean-Paul Smets's avatar
Jean-Paul Smets committed
177 178 179 180 181 182 183 184 185 186
    security.declarePublic('distribute')
    def distribute(self, node_count=1):
      """
        Distribute load
      """
      # Initialize if needed
      if not is_initialized: self.initialize()

      # Call distribute on each queue
      for activity in activity_list:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
187 188
        try:
        #if 1:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
189
          activity.distribute(self, node_count)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
190 191
        except:
        #else:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
192 193
          LOG('CMFActivity:', 100, 'Core call to distribute failed for activity %s' % activity)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
194
    security.declarePublic('tic')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
195
    def tic(self, processing_node=1, force=0):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
196 197
      """
        Starts again an activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
198
        processing_node starts from 1 (there is not node 0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
199 200 201 202
      """
      global active_threads, is_initialized

      # return if the number of threads is too high
Jean-Paul Smets's avatar
Jean-Paul Smets committed
203
      if active_threads >= max_active_threads:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
204 205
        if not force: return 'Too many threads'

Jean-Paul Smets's avatar
Jean-Paul Smets committed
206
      if tic_lock is None:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
207 208 209 210 211 212
        return

      # Initialize if needed
      if not is_initialized: self.initialize()

      # increase the number of active_threads
Jean-Paul Smets's avatar
Jean-Paul Smets committed
213
      tic_lock.acquire()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
214
      active_threads += 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
215
      tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
216 217 218 219

      # Wakeup each queue
      for activity in activity_list:
        try:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
220
          activity.wakeup(self, processing_node)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
221 222 223 224 225 226 227 228
        except:
          LOG('CMFActivity:', 100, 'Core call to wakeup failed for activity %s' % activity)

      # Process messages on each queue in round robin
      has_awake_activity = 1
      while has_awake_activity:
        has_awake_activity = 0
        for activity in activity_list:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
229 230
          try:
          #if 1:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
231 232
            activity.tic(self, processing_node) # Transaction processing is the responsability of the activity
            has_awake_activity = has_awake_activity or activity.isAwake(self, processing_node)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
233 234
          except:
          #else:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
235 236 237
            LOG('CMFActivity:', 100, 'Core call to tic or isAwake failed for activity %s' % activity)

      # decrease the number of active_threads
Jean-Paul Smets's avatar
Jean-Paul Smets committed
238
      tic_lock.acquire()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
239
      active_threads -= 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
240
      tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
241 242 243 244 245 246 247 248

    def hasActivity(self, object, **kw):
      # Check in each queue if the object has deferred tasks
      for activity in activity_list:
        if activity.hasActivity(self, object, **kw):
          return 1
      return 0

249
    def activate(self, object, activity, active_process, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
250 251
      global is_initialized
      if not is_initialized: self.initialize()
252
      return ActiveWrapper(object, activity, active_process, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
253 254 255 256 257 258 259 260 261 262 263 264

    def flush(self, object, invoke=0, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
      object_path = object.getPhysicalPath()
      for activity in activity_list:
        LOG('CMFActivity: ', 0, 'flushing activity %s' % activity.__class__.__name__)
        activity.flush(self, object_path, invoke=invoke, **kw)

    def invoke(self, message):
      message(self)

265 266
    def newMessage(self, activity, path, active_process, activity_kw, method_id, *args, **kw):
      # Some Security Cheking should be made here XXX
Jean-Paul Smets's avatar
Jean-Paul Smets committed
267 268
      global is_initialized
      if not is_initialized: self.initialize()
269
      activity_dict[activity].queueMessage(self, Message(path, active_process, activity_kw, method_id, args, kw))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297

    def manageInvoke(self, object_path, method_id, REQUEST=None):
      """
        Invokes all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
      for activity in activity_list:
        activity.flush(self, object_path, method_id=method_id, invoke=1)
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(), 'manageActivities'))

    def manageCancel(self, object_path, method_id, REQUEST=None):
      """
        Cancel all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
      for activity in activity_list:
        activity.flush(self, object_path, method_id=method_id, invoke=0)
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(), 'manageActivities'))

    security.declarePublic('getMessageList')
    def getMessageList(self):
      """
        List messages waiting in queues
      """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
298 299 300
      # Initialize if needed
      if not is_initialized: self.initialize()

Jean-Paul Smets's avatar
Jean-Paul Smets committed
301 302 303 304 305
      message_list = []
      for activity in activity_list:
        message_list += activity.getMessageList(self)
      return message_list

306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
    security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
    def newActiveProcess(self):
      from ActiveProcess import addActiveProcess
      new_id = str(self.generateNewId())
      addActiveProcess(self, new_id)
      return self._getOb(new_id)

    def reindexObject(self):
      self.immediateReindexObject()

    def getActiveProcess(self):
      REQUEST = get_request()
      if REQUEST.active_process:
        return self.unrestrictedTraverse(REQUEST.active_process)
      return None


Jean-Paul Smets's avatar
Jean-Paul Smets committed
323
InitializeClass(ActivityTool)