Queue.py 11.3 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
import cPickle, sys
30
from DateTime import DateTime
31
from zLOG import LOG, WARNING, ERROR
Yoshinori Okuji's avatar
Yoshinori Okuji committed
32
from ZODB.POSException import ConflictError
33 34 35
import sha
from cStringIO import StringIO

36
import transaction
Jean-Paul Smets's avatar
Jean-Paul Smets committed
37

38 39 40 41 42 43
# Error values for message validation
EXCEPTION      = -1
VALID          = 0
INVALID_PATH   = 1
INVALID_ORDER  = 2

44
# Time global parameters
45 46
MAX_PROCESSING_TIME = 900 # in seconds
VALIDATION_ERROR_DELAY = 30 # in seconds
47

Jean-Paul Smets's avatar
Jean-Paul Smets committed
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
class Queue:
  """
    Step 1: use lists

    Step 2: add some object related dict which prevents calling twice the same method

    Step 3: add some time information for deferred execution

    Step 4: use MySQL as a way to store events (with locks)

    Step 5: use periodic Timer to wakeup Scheduler

    Step 6: add multiple threads on a single Scheduler

    Step 7: add control thread to kill "events which last too long"

    Some data:

    - reindexObject = 50 ms

    - calling a MySQL read = 0.7 ms

    - calling a simple method by HTTP = 30 ms

    - calling a complex method by HTTP = 500 ms

    References:

    http://www.mysql.com/doc/en/InnoDB_locking_reads.html
    http://www.python.org/doc/current/lib/thread-objects.html
    http://www-poleia.lip6.fr/~briot/actalk/actalk.html
  """

  #scriptable_method_id_list = ['appendMessage', 'nextMessage', 'delMessage']

  def __init__(self):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
84 85
    self.is_alive = {}
    self.is_awake = {}
Jean-Paul Smets's avatar
Jean-Paul Smets committed
86 87 88 89 90 91 92 93 94
    self.is_initialized = 0

  def initialize(self, activity_tool):
    # This is the only moment when
    # we can set some global variables related
    # to the ZODB context
    if not self.is_initialized:
      self.is_initialized = 1

Jean-Paul Smets's avatar
Jean-Paul Smets committed
95
  def queueMessage(self, activity_tool, m):    
96
    activity_tool.deferredQueueMessage(self, m)  
97

98
  def deleteMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
99 100 101
    if not getattr(m, 'is_deleted', 0):
      # We try not to delete twice
      # However this can not be garanteed in the case of messages loaded from SQL
Jean-Paul Smets's avatar
Jean-Paul Smets committed
102
      activity_tool.deferredDeleteMessage(self, m)  
Jean-Paul Smets's avatar
Jean-Paul Smets committed
103
    m.is_deleted = 1
104

Jean-Paul Smets's avatar
Jean-Paul Smets committed
105
  def dequeueMessage(self, activity_tool, processing_node):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
106 107
    pass

Jean-Paul Smets's avatar
Jean-Paul Smets committed
108 109 110 111
  def tic(self, activity_tool, processing_node):
    # Tic should return quickly to prevent locks or commit transactions at some point
    if self.dequeueMessage(activity_tool, processing_node):
      self.sleep(activity_tool, processing_node)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
112

Jean-Paul Smets's avatar
Jean-Paul Smets committed
113 114 115 116 117
  def distribute(self, activity_tool, node_count):
    pass

  def sleep(self, activity_tool, processing_node):
    self.is_awake[processing_node] = 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
118

Jean-Paul Smets's avatar
Jean-Paul Smets committed
119 120
  def wakeup(self, activity_tool, processing_node):
    self.is_awake[processing_node] = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
121

Jean-Paul Smets's avatar
Jean-Paul Smets committed
122 123 124
  def terminate(self, activity_tool, processing_node):
    self.is_awake[processing_node] = 0
    self.is_alive[processing_node] = 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
125

126
  def validate(self, activity_tool, message, check_order_validation=1, **kw):
127 128 129 130
    """
      This is the place where activity semantics is implemented
      **kw contains all parameters which allow to implement synchronisation,
      constraints, delays, etc.
131

132
      Standard synchronisation parameters:
133

134 135 136
      after_method_id   --  never validate message if after_method_id
                            is in the list of methods which are
                            going to be executed
137

138 139 140
      after_message_uid --  never validate message if after_message_uid
                            is in the list of messages which are
                            going to be executed
141

142 143
      after_path        --  never validate message if after_path
                            is in the list of path which are
144
                            going to be executed
145
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
146
    try:
147
      if activity_tool.unrestrictedTraverse(message.object_path, None) is None:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
148
        # Do not try to call methods on objects which do not exist
149
        LOG('CMFActivity', WARNING,
Jean-Paul Smets's avatar
Jean-Paul Smets committed
150
           'Object %s does not exist' % '/'.join(message.object_path))
151
        return INVALID_PATH
152 153 154 155
      if check_order_validation:
        for k, v in kw.iteritems():
          if activity_tool.validateOrder(message, k, v):
            return INVALID_ORDER
Yoshinori Okuji's avatar
Yoshinori Okuji committed
156 157
    except ConflictError:
      raise
Jean-Paul Smets's avatar
Jean-Paul Smets committed
158
    except:
159
      LOG('CMFActivity', WARNING,
160 161
          'Validation of Object %s raised exception' % '/'.join(message.object_path),
          error=sys.exc_info())
Jean-Paul Smets's avatar
Jean-Paul Smets committed
162
      # Do not try to call methods on objects which cause errors
163 164
      return EXCEPTION
    return VALID
Jean-Paul Smets's avatar
Jean-Paul Smets committed
165

166 167 168 169 170 171 172 173 174
  def getDependentMessageList(self, activity_tool, message, **kw):
    message_list = []
    for k, v in kw.iteritems():
      result = activity_tool.getDependentMessageList(message, k, v)
      if result:
        message_list.extend(result)
    return message_list

  def getExecutableMessageList(self, activity_tool, message, message_dict,
175
                               validation_text_dict, now_date=None):
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
    """Get messages which have no dependent message, and store them in the dictionary.

    If the passed message itself is executable, simply store only that message.
    Otherwise, try to find at least one message executable from dependent messages.

    This may result in no new message, if all dependent messages are already present
    in the dictionary, if all dependent messages are in different activities, or if
    the message has a circular dependency.

    The validation text dictionary is used only to cache the results of validations,
    in order to reduce the number of SQL queries.
    """
    if message.uid in message_dict:
      # Nothing to do. But detect a circular dependency.
      if message_dict[message.uid] is None:
        LOG('CMFActivity', ERROR,
            'message uid %r has a circular dependency' % (message.uid,))
      return

    cached_result = validation_text_dict.get(message.order_validation_text)
    if cached_result is None:
      message_list = message.getDependentMessageList(self, activity_tool)
198
      transaction.commit() # Release locks.
199 200 201
      if message_list:
        # The result is not empty, so this message is not executable.
        validation_text_dict[message.order_validation_text] = 0
202 203
        if now_date is None:
          now_date = DateTime()
204 205 206 207 208 209 210 211
        for activity, m in message_list:
          # Note that the messages may contain ones which are already assigned or not
          # executable yet.
          if activity is self and m.processing_node == -1 and m.date <= now_date:
            # Call recursively. Set None as a marker to detect a circular dependency.
            message_dict[message.uid] = None
            try:
              self.getExecutableMessageList(activity_tool, m, message_dict,
212
                                             validation_text_dict, now_date=now_date)
213 214 215 216 217 218 219 220 221 222
            finally:
              del message_dict[message.uid]
      else:
        validation_text_dict[message.order_validation_text] = 1
        message_dict[message.uid] = message
    elif cached_result:
      message_dict[message.uid] = message
    else:
      pass

Jean-Paul Smets's avatar
Jean-Paul Smets committed
223 224
  def isAwake(self, activity_tool, processing_node):
    return self.is_awake[processing_node]
Jean-Paul Smets's avatar
Jean-Paul Smets committed
225

226
  def hasActivity(self, activity_tool, object, processing_node=None, active_process=None, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
227 228
    return 0

Jean-Paul Smets's avatar
Jean-Paul Smets committed
229
  def flush(self, activity_tool, object, **kw):    
Jean-Paul Smets's avatar
Jean-Paul Smets committed
230 231
    pass

232 233 234 235 236 237 238 239
  def start(self, active_process=None):
    # Start queue / activities in queue for given process
    pass

  def stop(self, active_process=None):
    # Stop queue / activities in queue for given process
    pass

240
  def loadMessage(self, s, **kw):
241
    m = cPickle.load(StringIO(s))
242 243
    m.__dict__.update(kw)
    return m
Jean-Paul Smets's avatar
Jean-Paul Smets committed
244 245

  def dumpMessage(self, m):
246 247 248 249 250 251 252 253 254
    return cPickle.dumps(m)

  def getOrderValidationText(self, message):
    # Return an identifier of validators related to ordering.
    order_validation_item_list = []
    key_list = message.activity_kw.keys()
    key_list.sort()
    for key in key_list:
      method_id = "_validate_%s" % key
255
      if getattr(self, method_id, None) is not None:
256 257 258 259 260 261 262 263 264
        order_validation_item_list.append((key, message.activity_kw[key]))
    if len(order_validation_item_list) == 0:
      # When no order validation argument is specified, skip the computation
      # of the checksum for speed. Here, 'none' is used, because this never be
      # identical to SHA1 hexdigest (which is always 40 characters), and 'none'
      # is true in Python. This is important, because dtml-if assumes that an empty
      # string is false, so we must use a non-empty string for this.
      return 'none'
    return sha.new(repr(order_validation_item_list)).hexdigest()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
265

266
  def getMessageList(self, activity_tool, processing_node=None,**kw):
267
    return []
268

Sebastien Robin's avatar
Sebastien Robin committed
269 270 271 272 273
  def countMessage(self, activity_tool,**kw):
    return 0

  def countMessageWithTag(self, activity_tool,value):
    return 0
274

275 276
  # Transaction Management
  def prepareQueueMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
277
    # Called to prepare transaction commit for queued messages
278
    pass
279 280

  def finishQueueMessage(self, activity_tool_path, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
281
    # Called to commit queued messages
282 283
    pass

Sebastien Robin's avatar
Sebastien Robin committed
284
  def prepareDeleteMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
285
    # Called to prepare transaction commit for deleted messages
286
    pass
287 288

  def finishDeleteMessage(self, activity_tool_path, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
289
    # Called to commit deleted messages
290
    pass
291

Jean-Paul Smets's avatar
Jean-Paul Smets committed
292 293
  # Registration Management
  def registerActivityBuffer(self, activity_buffer):
294
    pass
295

Jean-Paul Smets's avatar
Jean-Paul Smets committed
296
  def isMessageRegistered(self, activity_buffer, activity_tool, m):
297 298
    message_list = activity_buffer.getMessageList(self)
    return m in message_list
299

Jean-Paul Smets's avatar
Jean-Paul Smets committed
300
  def registerMessage(self, activity_buffer, activity_tool, m):
301 302
    message_list = activity_buffer.getMessageList(self)
    message_list.append(m)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
303
    m.is_registered = 1
304

Jean-Paul Smets's avatar
Jean-Paul Smets committed
305 306
  def unregisterMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 0
307

Jean-Paul Smets's avatar
Jean-Paul Smets committed
308
  def getRegisteredMessageList(self, activity_buffer, activity_tool):
309 310
    message_list = activity_buffer.getMessageList(self)
    return [m for m in message_list if m.is_registered]
311 312 313

  # Required for tests (time shift)
  def timeShift(self, activity_tool, delay):
314 315 316
    """
      delay is provided in fractions of day
    """
317
    pass
318 319 320 321 322 323 324 325 326 327

  def getPriority(self, activity_tool):
    """
      Get priority from this queue.
      Lower number means higher priority value.
      Legal value range is [1, 6].
      Values out of this range might work, but are non-standard.
    """
    return 6