Queue.py 7.97 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
import pickle, sys
30
from Acquisition import aq_base
31
from DateTime import DateTime
Jean-Paul Smets's avatar
Jean-Paul Smets committed
32 33
from Products.CMFActivity.ActivityTool import Message
from zLOG import LOG
Yoshinori Okuji's avatar
Yoshinori Okuji committed
34
from ZODB.POSException import ConflictError
Jean-Paul Smets's avatar
Jean-Paul Smets committed
35

36 37 38 39 40 41
# Error values for message validation
EXCEPTION      = -1
VALID          = 0
INVALID_PATH   = 1
INVALID_ORDER  = 2

42
# Time global parameters
43 44
MAX_PROCESSING_TIME = 900 # in seconds
VALIDATION_ERROR_DELAY = 30 # in seconds
45

Jean-Paul Smets's avatar
Jean-Paul Smets committed
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
class Queue:
  """
    Step 1: use lists

    Step 2: add some object related dict which prevents calling twice the same method

    Step 3: add some time information for deferred execution

    Step 4: use MySQL as a way to store events (with locks)

    Step 5: use periodic Timer to wakeup Scheduler

    Step 6: add multiple threads on a single Scheduler

    Step 7: add control thread to kill "events which last too long"

    Some data:

    - reindexObject = 50 ms

    - calling a MySQL read = 0.7 ms

    - calling a simple method by HTTP = 30 ms

    - calling a complex method by HTTP = 500 ms

    References:

    http://www.mysql.com/doc/en/InnoDB_locking_reads.html
    http://www.python.org/doc/current/lib/thread-objects.html
    http://www-poleia.lip6.fr/~briot/actalk/actalk.html
  """

  #scriptable_method_id_list = ['appendMessage', 'nextMessage', 'delMessage']

  def __init__(self):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
82 83
    self.is_alive = {}
    self.is_awake = {}
Jean-Paul Smets's avatar
Jean-Paul Smets committed
84
    self.is_initialized = 0
85
    self.max_processing_date = DateTime()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
86 87 88 89 90 91 92 93

  def initialize(self, activity_tool):
    # This is the only moment when
    # we can set some global variables related
    # to the ZODB context
    if not self.is_initialized:
      self.is_initialized = 1

Jean-Paul Smets's avatar
Jean-Paul Smets committed
94
  def queueMessage(self, activity_tool, m):    
95 96 97
    activity_tool.deferredQueueMessage(self, m)  
  
  def deleteMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
98 99 100
    if not getattr(m, 'is_deleted', 0):
      # We try not to delete twice
      # However this can not be garanteed in the case of messages loaded from SQL
Jean-Paul Smets's avatar
Jean-Paul Smets committed
101
      activity_tool.deferredDeleteMessage(self, m)  
Jean-Paul Smets's avatar
Jean-Paul Smets committed
102
    m.is_deleted = 1
103
    
Jean-Paul Smets's avatar
Jean-Paul Smets committed
104
  def dequeueMessage(self, activity_tool, processing_node):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
105 106
    pass

Jean-Paul Smets's avatar
Jean-Paul Smets committed
107 108 109 110
  def tic(self, activity_tool, processing_node):
    # Tic should return quickly to prevent locks or commit transactions at some point
    if self.dequeueMessage(activity_tool, processing_node):
      self.sleep(activity_tool, processing_node)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
111

Jean-Paul Smets's avatar
Jean-Paul Smets committed
112 113 114 115 116
  def distribute(self, activity_tool, node_count):
    pass

  def sleep(self, activity_tool, processing_node):
    self.is_awake[processing_node] = 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
117

Jean-Paul Smets's avatar
Jean-Paul Smets committed
118 119
  def wakeup(self, activity_tool, processing_node):
    self.is_awake[processing_node] = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
120

Jean-Paul Smets's avatar
Jean-Paul Smets committed
121 122 123
  def terminate(self, activity_tool, processing_node):
    self.is_awake[processing_node] = 0
    self.is_alive[processing_node] = 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
124

125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
  def validate(self, activity_tool, message, **kw):
    """
      This is the place where activity semantics is implemented
      **kw contains all parameters which allow to implement synchronisation,
      constraints, delays, etc.
      
      Standard synchronisation parameters:
      
      after_method_id   --  never validate message if after_method_id
                            is in the list of methods which are
                            going to be executed
    
      after_message_uid --  never validate message if after_message_uid
                            is in the list of messages which are
                            going to be executed
    
      after_path        --  never validate message if after_path
                            is in the list of path which are
                            going to be executed                                                        
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
145 146 147 148 149
    try:
      if activity_tool.unrestrictedTraverse(message.object_path) is None:
        # Do not try to call methods on objects which do not exist
        LOG('WARNING ActivityTool', 0,
           'Object %s does not exist' % '/'.join(message.object_path))
150 151
        return INVALID_PATH
      for k, v in kw.items():
152
        if activity_tool.validateOrder(message, k, v):
153
          return INVALID_ORDER
Yoshinori Okuji's avatar
Yoshinori Okuji committed
154 155
    except ConflictError:
      raise
Jean-Paul Smets's avatar
Jean-Paul Smets committed
156 157
    except:
      LOG('WARNING ActivityTool', 0,
158 159
          'Validation of Object %s raised exception' % '/'.join(message.object_path),
          error=sys.exc_info())
Jean-Paul Smets's avatar
Jean-Paul Smets committed
160
      # Do not try to call methods on objects which cause errors
161 162
      return EXCEPTION
    return VALID
Jean-Paul Smets's avatar
Jean-Paul Smets committed
163

Jean-Paul Smets's avatar
Jean-Paul Smets committed
164 165
  def isAwake(self, activity_tool, processing_node):
    return self.is_awake[processing_node]
Jean-Paul Smets's avatar
Jean-Paul Smets committed
166

167
  def hasActivity(self, activity_tool, object, processing_node=None, active_process=None, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
168 169
    return 0

Jean-Paul Smets's avatar
Jean-Paul Smets committed
170
  def flush(self, activity_tool, object, **kw):    
Jean-Paul Smets's avatar
Jean-Paul Smets committed
171 172
    pass

173 174 175 176 177 178 179 180
  def start(self, active_process=None):
    # Start queue / activities in queue for given process
    pass

  def stop(self, active_process=None):
    # Stop queue / activities in queue for given process
    pass

181 182 183 184
  def loadMessage(self, s, **kw):
    m = pickle.loads(s)
    m.__dict__.update(kw)
    return m
Jean-Paul Smets's avatar
Jean-Paul Smets committed
185 186 187 188

  def dumpMessage(self, m):
    return pickle.dumps(m)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
189
  def getMessageList(self, activity_tool, processing_node=None):
190 191
    return []  
  
192 193
  # Transaction Management
  def prepareQueueMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
194
    # Called to prepare transaction commit for queued messages
195 196 197
    pass
  
  def finishQueueMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
198
    # Called to commit queued messages
199 200
    pass

Sebastien Robin's avatar
Sebastien Robin committed
201
  def prepareDeleteMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
202
    # Called to prepare transaction commit for deleted messages
203 204
    pass
  
Sebastien Robin's avatar
Sebastien Robin committed
205
  def finishDeleteMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
206
    # Called to commit deleted messages
207
    pass
Jean-Paul Smets's avatar
Jean-Paul Smets committed
208 209 210 211
  
  # Registration Management
  def registerActivityBuffer(self, activity_buffer):
    class_name = self.__class__.__name__
Jean-Paul Smets's avatar
Jean-Paul Smets committed
212
    setattr(activity_buffer, '_%s_message_list' % class_name, [])  
Jean-Paul Smets's avatar
Jean-Paul Smets committed
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
            
  def isMessageRegistered(self, activity_buffer, activity_tool, m):
    class_name = self.__class__.__name__
    return m in getattr(activity_buffer, '_%s_message_list' % class_name)
                                   
  def registerMessage(self, activity_buffer, activity_tool, m):
    class_name = self.__class__.__name__
    getattr(activity_buffer, '_%s_message_list' % class_name).append(m)
    m.is_registered = 1
          
  def unregisterMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 0
          
  def getRegisteredMessageList(self, activity_buffer, activity_tool):
    class_name = self.__class__.__name__
228 229 230
    if hasattr(activity_buffer, '_%s_message_list' % class_name):
      return filter(lambda m: m.is_registered, getattr(activity_buffer, '_%s_message_list' % class_name))      
    else:
231 232 233 234 235 236 237
      return ()        
    
  # Required for tests (time shift)        
  def timeShift(self, activity_tool, delay):    
    """
      delay is provided in fractions of day
    """
238
    pass