SQLQueue.py 18 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
import random
Jean-Paul Smets's avatar
Jean-Paul Smets committed
30
from Products.CMFActivity.ActivityTool import registerActivity
31
from RAMQueue import RAMQueue
32
from DateTime import DateTime
33
from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY
34
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
35
from Products.CMFActivity.Errors import ActivityFlushError
Romain Courteaud's avatar
Romain Courteaud committed
36
from ZODB.POSException import ConflictError
37
from types import StringType, ClassType
38
import sys
Jean-Paul Smets's avatar
Jean-Paul Smets committed
39

40 41 42 43 44
try:
  from transaction import get as get_transaction
except ImportError:
  pass

45
from zLOG import LOG, WARNING
Jean-Paul Smets's avatar
Jean-Paul Smets committed
46

47 48 49 50 51 52 53 54 55 56
MAX_PRIORITY = 5

priority_weight = \
  [1] * 64 + \
  [2] * 20 + \
  [3] * 10 + \
  [4] * 5 + \
  [5] * 1

class SQLQueue(RAMQueue):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
57
  """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
58 59 60
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
61
  """
62
  def prepareQueueMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
63 64 65 66
    if m.is_registered:
      activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) ,
                                          method_id = m.method_id,
                                          priority = m.activity_kw.get('priority', 1),
67
                                          broadcast = m.activity_kw.get('broadcast', 0),
68
                                          message = self.dumpMessage(m),
69 70
                                          date = m.activity_kw.get('at_date', DateTime()),
                                          tag = m.activity_kw.get('tag', ''))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
71

72 73
  def prepareDeleteMessage(self, activity_tool, m):
    # Erase all messages in a single transaction
74
    #LOG("prepareDeleteMessage", 0, str(m.__dict__))
75
    activity_tool.SQLQueue_delMessage(uid = [m.uid])
76

Jean-Paul Smets's avatar
Jean-Paul Smets committed
77
  def dequeueMessage(self, activity_tool, processing_node):
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
    readMessage = getattr(activity_tool, 'SQLQueue_readMessage', None)
    if readMessage is None:
      return 1

    now_date = DateTime()
    # Next processing date in case of error
    next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400
    priority = random.choice(priority_weight)
    # Try to find a message at given priority level
    result = readMessage(processing_node=processing_node, priority=priority,
                         to_date=now_date)
    if len(result) == 0:
      # If empty, take any message
      result = readMessage(processing_node=processing_node, priority=None,to_date=now_date)
    if len(result) > 0:
      line = result[0]
      path = line.path
      method_id = line.method_id
      # Make sure message can not be processed anylonger
      activity_tool.SQLQueue_processMessage(uid=line.uid)
      get_transaction().commit() # Release locks before starting a potentially long calculation
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116

      # At this point, the message is marked as processed.
      try:
        m = self.loadMessage(line.message)
        # Make sure object exists
        validation_state = m.validate(self, activity_tool)
        if validation_state is not VALID:
          if validation_state in (EXCEPTION, INVALID_PATH):
            if line.priority > MAX_PRIORITY:
              # This is an error.
              # Assign message back to 'error' state.
              activity_tool.SQLQueue_assignMessage(uid=line.uid,
                                                   processing_node = VALIDATE_ERROR_STATE)
              get_transaction().commit()                                        # and commit
            else:
              # Lower priority
              activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
              get_transaction().commit() # Release locks before starting a potentially long calculation
117
          else:
118 119 120
            # We do not lower priority for INVALID_ORDER errors but we do postpone execution
            activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
                                                priority = line.priority)
121
            get_transaction().commit() # Release locks before starting a potentially long calculation
122
          return 0
123 124

        # Try to invoke
125
        activity_tool.invoke(m) # Try to invoke the message
126 127
        if m.is_executed:                                          # Make sure message could be invoked
          get_transaction().commit()                                        # If successful, commit
128
      except:
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
        # If an exception occurs, abort the transaction to minimize the impact,
        try:
          get_transaction().abort()
        except:
          # Unfortunately, database adapters may raise an exception against abort.
          LOG('SQLQueue', WARNING, 'abort failed, thus some objects may be modified accidentally')
          pass

        if issubclass(sys.exc_info()[0], ConflictError):
          # If a conflict occurs, delay the operation.
          activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
                                             priority = line.priority)
        else:
          # For the other exceptions, put it into an error state.
          activity_tool.SQLQueue_assignMessage(uid = line.uid, 
                                               processing_node = INVOKE_ERROR_STATE)
145 146
          LOG('SQLQueue', WARNING, 'Error in ActivityTool.invoke', e=sys.exc_info())

147 148 149
        get_transaction().commit()
        return 0

150

151
      if m.is_executed:
152
        activity_tool.SQLQueue_delMessage(uid=[line.uid])  # Delete it
153
      else:
154 155 156 157 158 159 160 161
        try:
          # If not, abort transaction and start a new one
          get_transaction().abort()
        except:
          # Unfortunately, database adapters may raise an exception against abort.
          LOG('SQLQueue', WARNING, 'abort failed, thus some objects may be modified accidentally')
          pass

162 163 164 165 166 167
        if type(m.exc_type) is ClassType \
                and issubclass(m.exc_type, ConflictError):
          activity_tool.SQLQueue_setPriority(uid = line.uid, 
                                             date = next_processing_date,
                                             priority = line.priority)
        elif line.priority > MAX_PRIORITY:
168
          # This is an error
169 170
          activity_tool.SQLQueue_assignMessage(uid = line.uid, 
                                               processing_node = INVOKE_ERROR_STATE)
171 172
                                                                            # Assign message back to 'error' state
          m.notifyUser(activity_tool)                                       # Notify Error
173
        else:
174 175 176
          # Lower priority
          activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
                                              priority = line.priority + 1)
177
      get_transaction().commit()
178 179
      return 0
    get_transaction().commit() # Release locks before starting a potentially long calculation
Jean-Paul Smets's avatar
Jean-Paul Smets committed
180
    return 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
181

182
  def hasActivity(self, activity_tool, object, **kw):
183 184
    hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
    if hasMessage is not None:
185 186
      if object is not None:
        my_object_path = '/'.join(object.getPhysicalPath())
187
        result = hasMessage(path=my_object_path, **kw)
188 189 190 191
        if len(result) > 0:
          return result[0].message_count > 0
      else:
        return 1 # Default behaviour if no object specified is to return 1 until active_process implemented
Jean-Paul Smets's avatar
Jean-Paul Smets committed
192
    return 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
193

194
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
195 196
    """
      object_path is a tuple
197 198 199 200 201 202 203 204

      commit allows to choose mode
        - if we commit, then we make sure no locks are taken for too long
        - if we do not commit, then we can use flush in a larger transaction

      commit should in general not be used

      NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
Jean-Paul Smets's avatar
Jean-Paul Smets committed
205
    """
206 207
    readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
    if readMessageList is not None:
208 209 210 211 212 213 214 215 216
      #return # Do nothing here to precent overlocking
      path = '/'.join(object_path)
      # Parse each message in registered
      for m in activity_tool.getRegisteredMessageList(self):
        if object_path == m.object_path and (method_id is None or method_id == m.method_id):
          if invoke: activity_tool.invoke(m)
          activity_tool.unregisterMessage(self, m)
      # Parse each message in SQL queue
      #LOG('Flush', 0, str((path, invoke, method_id)))
217
      result = readMessageList(path=path, method_id=method_id,processing_node=None)
218 219 220 221 222 223 224 225 226 227 228
      #LOG('Flush', 0, str(len(result)))
      method_dict = {}
      for line in result:
        path = line.path
        method_id = line.method_id
        if not method_dict.has_key(method_id):
          # Only invoke once (it would be different for a queue)
          method_dict[method_id] = 1
          m = self.loadMessage(line.message, uid = line.uid)
          if invoke:
            # First Validate
229
            if m.validate(self, activity_tool) is VALID:
230 231 232 233 234 235
              activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
              if not m.is_executed:                                                 # Make sure message could be invoked
                # The message no longer exists
                raise ActivityFlushError, (
                    'Could not evaluate %s on %s' % (method_id , path))
            else:
236 237
              # The message no longer exists
              raise ActivityFlushError, (
238
                  'The document %s does not exist' % path)
239 240 241

      if len(result):
        activity_tool.SQLQueue_delMessage(uid = [line.uid for line in result])
Jean-Paul Smets's avatar
Jean-Paul Smets committed
242

243 244 245
  # def start(self, activity_tool, active_process=None):
  #   uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process)
  #   activity_tool.SQLQueue_assignMessage(uid = uid_list, processing_node = DISTRIBUTABLE_STATE)
246

247 248 249
  # def stop(self, activity_tool, active_process=None):
  #   uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process)
  #   activity_tool.SQLQueue_assignMessage(uid = uid_list, processing_node = STOP_STATE)
250

251
  def getMessageList(self, activity_tool, processing_node=None,**kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
252
    message_list = []
253 254 255
    readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
    if readMessageList is not None:
      result = readMessageList(path=None, method_id=None, processing_node=None)
256 257 258 259 260
      for line in result:
        m = self.loadMessage(line.message)
        m.processing_node = line.processing_node
        m.priority = line.priority
        message_list.append(m)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
261 262
    return message_list

Sebastien Robin's avatar
Sebastien Robin committed
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
  def countMessage(self, activity_tool, tag=None,path=None,
                   method_id=None,message_uid=None,**kw):
    """
      Return the number of message which match the given parameter.
    """
    if isinstance(tag, StringType):
      tag = [tag]
    if isinstance(path, StringType):
      path = [path]
    if isinstance(message_uid, (int,long)):
      message_uid = [message_uid]
    if isinstance(method_id, StringType):
      method_id = [method_id]
    result = activity_tool.SQLQueue_validateMessageList(method_id=method_id, 
                                                       path=path,
                                                       message_uid=message_uid, 
                                                       tag=tag)
    return result[0].uid_count

  def countMessageWithTag(self, activity_tool, value):
    """
      Return the number of message which match the given tag.
    """
    return self.countMessage(activity_tool,tag=value)


289 290 291
  def dumpMessageList(self, activity_tool):
    # Dump all messages in the table.
    message_list = []
292 293 294
    dumpMessageList = getattr(activity_tool, 'SQLQueue_dumpMessageList', None)
    if dumpMessageList is not None:
      result = dumpMessageList()
295 296 297 298
      for line in result:
        m = self.loadMessage(line.message, uid = line.uid)
        message_list.append(m)
    return message_list
299

300 301
  def distribute(self, activity_tool, node_count):
    processing_node = 1
302 303 304
    readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
    if readMessageList is not None:
      result = readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages
305 306 307
      #LOG('distribute count',0,str(len(result)) )
      #LOG('distribute count',0,str(map(lambda x:x.uid, result)))
      #get_transaction().commit() # Release locks before starting a potentially long calculation
308 309 310 311 312 313 314
      result = list(result)[0:100]
      for line in result:
        broadcast = line.broadcast
        uid = line.uid
        if broadcast:
          # Broadcast messages must be distributed into all nodes.
          activity_tool.SQLQueue_assignMessage(processing_node=1, uid=uid)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
315 316 317 318 319 320 321 322 323
          if node_count > 1:
            for node in range(2, node_count+1):
              activity_tool.SQLQueue_writeMessage( path = line.path,
                                                  method_id = line.method_id,
                                                  priority = line.priority,
                                                  broadcast = 1,
                                                  processing_node = node,
                                                  message = line.message,
                                                  date = line.date)
324 325 326 327 328 329 330
        else:
          #LOG("distribute", 0, "assign %s" % uid)
          activity_tool.SQLQueue_assignMessage(uid=uid, processing_node=processing_node)
          #get_transaction().commit() # Release locks immediately to allow processing of messages
          processing_node = processing_node + 1
          if processing_node > node_count:
            processing_node = 1 # Round robin
331

332 333 334
  # Validation private methods
  def _validate_after_method_id(self, activity_tool, message, value):
    # Count number of occurances of method_id
335 336 337
    #get_transaction().commit()
    if type(value) == type(''):
      value = [value]
338
    result = activity_tool.SQLQueue_validateMessageList(method_id=value, message_uid=None, path=None)
339 340
    #LOG('SQLQueue._validate_after_method_id, method_id',0,value)
    #LOG('SQLQueue._validate_after_method_id, result[0].uid_count',0,result[0].uid_count)
341 342 343
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID
344

345 346
  def _validate_after_path(self, activity_tool, message, value):
    # Count number of occurances of path
347 348
    if type(value) == type(''):
      value = [value]
349 350 351 352
    result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=None, path=value)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID
353

354 355 356 357 358 359 360
  def _validate_after_message_uid(self, activity_tool, message, value):
    # Count number of occurances of message_uid
    result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=value, path=None)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID

361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
  def _validate_after_path_and_method_id(self, activity_tool, message, value):
    # Count number of occurances of method_id and path
    if (type(value) != type( (0,) ) and type(value) != type ([])) or len(value)<2:
      LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_path_and_method : %s' % repr(value))
      return VALID
    path = value[0]
    method = value[1]
    if type(path) == type(''):
      path = [path]
    if type(method) == type(''):
      method = [method]
    result = activity_tool.SQLQueue_validateMessageList(method_id=method, message_uid=None, path=path)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID
376

377 378 379 380 381 382 383 384
  def _validate_after_tag(self, activity_tool, message, value):
    # Count number of occurances of tag
    if type(value) == type(''):
      value = [value]
    result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=None, tag=value)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID
385

386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
  def _validate_after_tag_and_method_id(self, activity_tool, message, value):
    # Count number of occurances of tag and method_id
    if (type(value) != type ( (0,) ) and type(value) != type([])) or len(value)<2:
      LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_tag_and_method_id : %s' % repr(value))
      return VALID
    tag = value[0]
    method = value[1]
    if type(tag) == type(''):
      tag = [tag]
    if type(method) == type(''):
      method = [method]
    result = activity_tool.SQLQueue_validateMessageList(method_id=method, message_uid=None, tag=tag)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID
401

402
  # Required for tests (time shift)
403
  def timeShift(self, activity_tool, delay, processing_node = None):
404 405 406 407
    """
      To simulate timeShift, we simply substract delay from
      all dates in SQLDict message table
    """
408
    activity_tool.SQLQueue_timeShift(delay = delay, processing_node = processing_node)
409

Jean-Paul Smets's avatar
Jean-Paul Smets committed
410
registerActivity(SQLQueue)