SQLDict.py 28.4 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
import random
30
from DateTime import DateTime
Jean-Paul Smets's avatar
Jean-Paul Smets committed
31
from Products.CMFActivity.ActivityTool import registerActivity
32
from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY
Jean-Paul Smets's avatar
Jean-Paul Smets committed
33
from RAMDict import RAMDict
34
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
35
from ZODB.POSException import ConflictError
36
import sys
37
import sha
38
from types import ClassType, StringType, ListType, TupleType
Jean-Paul Smets's avatar
Jean-Paul Smets committed
39

40 41 42 43 44
try:
  from transaction import get as get_transaction
except ImportError:
  pass

Jean-Paul Smets's avatar
Jean-Paul Smets committed
45 46
from zLOG import LOG

47
MAX_PRIORITY = 5
48
MAX_GROUPED_OBJECTS = 500
Jean-Paul Smets's avatar
Jean-Paul Smets committed
49

50 51 52 53 54 55
priority_weight = \
  [1] * 64 + \
  [2] * 20 + \
  [3] * 10 + \
  [4] * 5 + \
  [5] * 1
56

57 58 59
class ActivityFlushError(Exception):
    """Error during active message flush"""

Jean-Paul Smets's avatar
Jean-Paul Smets committed
60 61 62 63 64 65
class SQLDict(RAMDict):
  """
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
  """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
66
  # Transaction commit methods
67
  def prepareQueueMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
68
    if m.is_registered:
69
      activity_tool.SQLDict_writeMessage( path = '/'.join(m.object_path) ,
Jean-Paul Smets's avatar
Jean-Paul Smets committed
70 71
                                          method_id = m.method_id,
                                          priority = m.activity_kw.get('priority', 1),
72
                                          broadcast = m.activity_kw.get('broadcast', 0),
73
                                          message = self.dumpMessage(m),
74
                                          date = m.activity_kw.get('at_date', DateTime()),
75
                                          group_method_id = m.activity_kw.get('group_method_id', ''),
76 77
                                          tag = m.activity_kw.get('tag', ''),
                                          order_validation_text = self.getOrderValidationText(m))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
78
                                          # Also store uid of activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
79

80 81 82 83 84 85
  def prepareQueueMessageList(self, activity_tool, message_list):
    registered_message_list = []
    for message in message_list:
      if message.is_registered:
        registered_message_list.append(message)
    if len(registered_message_list) > 0:
86
      #LOG('SQLDict prepareQueueMessageList', 0, 'registered_message_list = %r' % (registered_message_list,))
87 88 89 90 91
      path_list = ['/'.join(message.object_path) for message in registered_message_list]
      method_id_list = [message.method_id for message in registered_message_list]
      priority_list = [message.activity_kw.get('priority', 1) for message in registered_message_list]
      broadcast_list = [message.activity_kw.get('broadcast', 0) for message in registered_message_list]
      dumped_message_list = [self.dumpMessage(message) for message in registered_message_list]
92 93 94
      datetime = DateTime()
      date_list = [message.activity_kw.get('at_date', datetime) for message in registered_message_list]
      group_method_id_list = [message.activity_kw.get('group_method_id', '') for message in registered_message_list]
95
      tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list]
96
      order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list]
97 98 99 100 101
      activity_tool.SQLDict_writeMessageList( path_list = path_list,
                                              method_id_list = method_id_list,
                                              priority_list = priority_list,
                                              broadcast_list = broadcast_list,
                                              message_list = dumped_message_list,
102
                                              date_list = date_list,
103
                                              group_method_id_list = group_method_id_list,
104 105
                                              tag_list = tag_list,
                                              order_validation_text_list = order_validation_text_list)
106
                                                         
107 108
  def prepareDeleteMessage(self, activity_tool, m):
    # Erase all messages in a single transaction
Jean-Paul Smets's avatar
Jean-Paul Smets committed
109
    path = '/'.join(m.object_path)
110 111 112 113
    order_validation_text = self.getOrderValidationText(m)
    uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = m.method_id,
                                                 order_validation_text = order_validation_text, 
                                                 processing_node = None)
114
    uid_list = [x.uid for x in uid_list]
115
    if len(uid_list)>0:
116 117 118
      activity_tool.SQLDict_delMessage(uid = uid_list)

  # Registration management
Jean-Paul Smets's avatar
Jean-Paul Smets committed
119
  def registerActivityBuffer(self, activity_buffer):
120
    class_name = self.__class__.__name__
121 122 123
    setattr(activity_buffer, '_%s_uid_dict' % class_name, {})
    setattr(activity_buffer, '_%s_message_list' % class_name, [])

Jean-Paul Smets's avatar
Jean-Paul Smets committed
124
  def isMessageRegistered(self, activity_buffer, activity_tool, m):
125 126 127
    class_name = self.__class__.__name__
    uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
    return uid_dict.has_key((tuple(m.object_path), m.method_id))
128

Jean-Paul Smets's avatar
Jean-Paul Smets committed
129 130
  def registerMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 1
131 132 133 134
    class_name = self.__class__.__name__
    uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
    uid_dict[(tuple(m.object_path), m.method_id)] = 1
    getattr(activity_buffer,'_%s_message_list' % class_name).append(m)
135

Jean-Paul Smets's avatar
Jean-Paul Smets committed
136 137
  def unregisterMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 0 # This prevents from inserting deleted messages into the queue
138 139 140 141
    class_name = self.__class__.__name__
    uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
    if uid_dict.has_key((tuple(m.object_path), m.method_id)):
      del uid_dict[(tuple(m.object_path), m.method_id)]
Jean-Paul Smets's avatar
Jean-Paul Smets committed
142 143

  def getRegisteredMessageList(self, activity_buffer, activity_tool):
144 145 146
    class_name = self.__class__.__name__
    if hasattr(activity_buffer,'_%s_message_list' % class_name):
      message_list = getattr(activity_buffer,'_%s_message_list' % class_name)
147
      return [m for m in message_list if m.is_registered]
148 149
    else:
      return ()
150

151 152 153 154 155 156 157 158 159 160
  def getOrderValidationText(self, message):
    # Return an identifier of validators related to ordering.
    order_validation_item_list = []
    key_list = message.activity_kw.keys()
    key_list.sort()
    for key in key_list:
      method_id = "_validate_%s" % key
      if hasattr(self, method_id):
        order_validation_item_list.append((key, message.activity_kw[key]))
    if len(order_validation_item_list) == 0:
161 162 163 164 165 166
      # When no order validation argument is specified, skip the computation
      # of the checksum for speed. Here, 'none' is used, because this never be
      # identical to SHA1 hexdigest (which is always 40 characters), and 'none'
      # is true in Python. This is important, because dtml-if assumes that an empty
      # string is false, so we must use a non-empty string for this.
      return 'none'
167 168 169
    return sha.new(repr(order_validation_item_list)).hexdigest()
    
  def validateMessage(self, activity_tool, message, uid_list, priority, processing_node):
170 171 172 173 174 175 176 177 178 179 180 181 182 183
    validation_state = message.validate(self, activity_tool)
    if validation_state is not VALID:
      if validation_state in (EXCEPTION, INVALID_PATH):
        # There is a serious validation error - we must lower priority
        if priority > MAX_PRIORITY:
          # This is an error
          if len(uid_list) > 0:
            activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE)
                                                                            # Assign message back to 'error' state
          #m.notifyUser(activity_tool)                                      # Notify Error
          get_transaction().commit()                                        # and commit
        else:
          # Lower priority
          if len(uid_list) > 0: # Add some delay before new processing
184 185
            activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
                                              priority = priority + 1, retry = 1)
186 187 188
          get_transaction().commit() # Release locks before starting a potentially long calculation
      else:
        # We do not lower priority for INVALID_ORDER errors but we do postpone execution
189 190 191 192 193 194
        order_validation_text = self.getOrderValidationText(message)
        activity_tool.SQLDict_setPriority(order_validation_text = order_validation_text, 
                                          processing_node = processing_node,
                                          delay = VALIDATION_ERROR_DELAY,
                                          retry = 1,
                                          uid = None)
195 196 197 198
        get_transaction().commit() # Release locks before starting a potentially long calculation
      return 0
    return 1
  
Jean-Paul Smets's avatar
Jean-Paul Smets committed
199
  # Queue semantic
Jean-Paul Smets's avatar
Jean-Paul Smets committed
200
  def dequeueMessage(self, activity_tool, processing_node):
201 202
    if hasattr(activity_tool,'SQLDict_readMessage'):
      now_date = DateTime()
203
      priority = random.choice(priority_weight)
204
      # Try to find a message at given priority level which is scheduled for now
205
      result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority,
206
                                                 to_date=now_date)
207
      if len(result) == 0:
208
        # If empty, take any message which is scheduled for now
209
        priority = None
210
        result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority, to_date=now_date)
211 212 213
      if len(result) == 0:
        # If the result is still empty, shift the dates so that SQLDict can dispatch pending active
        # objects quickly.
214
        self.timeShift(activity_tool, VALIDATION_ERROR_DELAY, processing_node,retry=1)
215
      elif len(result) > 0:
216
        #LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result)))
217 218 219
        line = result[0]
        path = line.path
        method_id = line.method_id
220 221 222 223
        order_validation_text = line.order_validation_text
        uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id, 
                                                     processing_node = None, to_date = now_date,
                                                     order_validation_text = order_validation_text)
224 225 226
        uid_list = [x.uid for x in uid_list]
        uid_list_list = [uid_list]
        priority_list = [line.priority]
227 228
        # Make sure message can not be processed anylonger
        if len(uid_list) > 0:
229
          # Set selected messages to processing
230 231 232 233
          activity_tool.SQLDict_processMessage(uid = uid_list)
        get_transaction().commit() # Release locks before starting a potentially long calculation
        # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
        m = self.loadMessage(line.message, uid = line.uid)
234
        message_list = [m]
235
        # Validate message (make sure object exists, priority OK, etc.)
236
        if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
237 238
          group_method_id = m.activity_kw.get('group_method_id')
          if group_method_id is not None:
239 240 241
            # Count the number of objects to prevent too many objects.
            if m.hasExpandMethod():
              try:
242
                count = len(m.getObjectList(activity_tool))
243 244
              except:
                # Here, simply ignore an exception. The same exception should be handled later.
245
                LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
246 247 248 249
                count = 0
            else:
              count = 1

250
            group_method = activity_tool.restrictedTraverse(group_method_id)
251 252 253
            
            if count < MAX_GROUPED_OBJECTS:
              # Retrieve objects which have the same group method.
254 255 256
              result = activity_tool.SQLDict_readMessage(processing_node = processing_node, priority = priority,
                                                         to_date = now_date, group_method_id = group_method_id,
                                                         order_validation_text = order_validation_text)
257 258 259 260
              #LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
              for line in result:
                path = line.path
                method_id = line.method_id
261 262 263
                uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
                                                             processing_node = None, to_date = now_date,
                                                             order_validation_text = order_validation_text)
264 265 266 267 268 269
                uid_list = [x.uid for x in uid_list]
                if len(uid_list) > 0:
                  # Set selected messages to processing
                  activity_tool.SQLDict_processMessage(uid = uid_list)
                get_transaction().commit() # Release locks before starting a potentially long calculation
                m = self.loadMessage(line.message, uid = line.uid)
270
                if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
271 272
                  if m.hasExpandMethod():
                    try:
273
                      count += len(m.getObjectList(activity_tool))
274 275
                    except:
                      # Here, simply ignore an exception. The same exception should be handled later.
276
                      LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
277 278 279 280 281 282 283 284
                      pass
                  else:
                    count += 1
                  message_list.append(m)
                  uid_list_list.append(uid_list)
                  priority_list.append(line.priority)
                  if count >= MAX_GROUPED_OBJECTS:
                    break
285 286
                
          get_transaction().commit() # Release locks before starting a potentially long calculation
287
          # Try to invoke
288
          if group_method_id is not None:
289
            LOG('SQLDict', 0, 'invoking a group method %s with %d objects (%d objects in expanded form)' % (group_method_id, len(message_list), count))
290 291
            #for m in message_list:
            #  LOG('SQLDict', 0, '%r has objects %r' % (m, m.getObjectList(activity_tool)))
292
            activity_tool.invokeGroup(group_method_id, message_list)
293
          else:
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
            #LOG('SQLDict dequeueMessage', 0, 'invoke %s on %s' % (message_list[0].method_id, message_list[0].object_path))
            activity_tool.invoke(message_list[0]) 

          # Check if messages are executed successfully.
          # When some of them are executed successfully, it may not be acceptable to
          # abort the transaction, because these remain pending, only due to other
          # invalid messages. This means that a group method should not be used if
          # it has a side effect. For now, only indexing uses a group method, and this
          # has no side effect.
          for m in message_list:
            if m.is_executed:
              break
          else:            
            get_transaction().abort()
            
          for i in xrange(len(message_list)):
            m = message_list[i]
            uid_list = uid_list_list[i]
            priority = priority_list[i]
            if m.is_executed:
              activity_tool.SQLDict_delMessage(uid = uid_list)                # Delete it
              get_transaction().commit()                                        # If successful, commit
              if m.active_process:
                active_process = activity_tool.unrestrictedTraverse(m.active_process)
                if not active_process.hasActivity():
                  # No more activity
                  m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
321
            else:
322
              if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
323 324 325 326 327
                # If this is a conflict error, do not lower the priority but only delay.
                activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
                                                  retry = 1)
                get_transaction().commit() # Release locks before starting a potentially long calculation
              elif priority > MAX_PRIORITY:
328 329 330 331 332 333 334 335 336
                # This is an error
                if len(uid_list) > 0:
                  activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
                                                                                  # Assign message back to 'error' state
                m.notifyUser(activity_tool)                                       # Notify Error
                get_transaction().commit()                                        # and commit
              else:
                # Lower priority
                if len(uid_list) > 0:
337 338
                  activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
                                                    priority = priority + 1, retry = 1)
339 340
                  get_transaction().commit() # Release locks before starting a potentially long calculation
                
341 342
        return 0
      get_transaction().commit() # Release locks before starting a potentially long calculation
Jean-Paul Smets's avatar
Jean-Paul Smets committed
343 344
    return 1

345
  def hasActivity(self, activity_tool, object, **kw):
346 347 348 349 350 351 352 353
    if hasattr(activity_tool,'SQLDict_readMessageList'):
      if object is not None:
        my_object_path = '/'.join(object.getPhysicalPath())
        result = activity_tool.SQLDict_hasMessage(path=my_object_path, **kw)
        if len(result) > 0:
          return result[0].message_count > 0
      else:
        return 1 # Default behaviour if no object specified is to return 1 until active_process implemented
Jean-Paul Smets's avatar
Jean-Paul Smets committed
354 355
    return 0

Jean-Paul Smets's avatar
Jean-Paul Smets committed
356
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
357 358
    """
      object_path is a tuple
Jean-Paul Smets's avatar
Jean-Paul Smets committed
359 360 361 362 363 364

      commit allows to choose mode
        - if we commit, then we make sure no locks are taken for too long
        - if we do not commit, then we can use flush in a larger transaction

      commit should in general not be used
365 366

      NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
Jean-Paul Smets's avatar
Jean-Paul Smets committed
367 368
    """
    path = '/'.join(object_path)
369
    # LOG('Flush', 0, str((path, invoke, method_id)))
370
    method_dict = {}
371 372 373 374 375
    if hasattr(activity_tool,'SQLDict_readMessageList'):
      # Parse each message in registered
      for m in activity_tool.getRegisteredMessageList(self):
        if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id):
          activity_tool.unregisterMessage(self, m)
376
          #if not method_dict.has_key(method_id or m.method_id):
377 378
          if not method_dict.has_key(m.method_id):
            method_dict[m.method_id] = 1 # Prevents calling invoke twice
379 380
            if invoke:
              # First Validate
381 382
              validate_value = m.validate(self, activity_tool)
              if validate_value is VALID:
383 384 385 386
                activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
                if not m.is_executed:                                                 # Make sure message could be invoked
                  # The message no longer exists
                  raise ActivityFlushError, (
387
                      'Could not evaluate %s on %s' % (m.method_id , path))
388
              elif validate_value is INVALID_PATH:
389 390
                # The message no longer exists
                raise ActivityFlushError, (
391
                    'The document %s does not exist' % path)
392
      # Parse each message in SQL dict
393 394
      result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,
                             processing_node=None,include_processing=0)
395 396 397
      for line in result:
        path = line.path
        method_id = line.method_id
Jean-Paul Smets's avatar
Jean-Paul Smets committed
398
        if not method_dict.has_key(method_id):
399
          # Only invoke once (it would be different for a queue)
400 401
          # This is optimisation with the goal to process objects on the same
          # node and minimize network traffic with ZEO server
402 403 404
          method_dict[method_id] = 1
          m = self.loadMessage(line.message, uid = line.uid)
          self.deleteMessage(activity_tool, m)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
405 406
          if invoke:
            # First Validate
407
            validate_value = m.validate(self, activity_tool)
Romain Courteaud's avatar
Romain Courteaud committed
408
#             LOG('SQLDict.flush validate_value',0,validate_value)
409
            if validate_value is VALID:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
410
              activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
Romain Courteaud's avatar
Romain Courteaud committed
411
#               LOG('SQLDict.flush m.is_executed',0,m.is_executed)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
412 413 414
              if not m.is_executed:                                                 # Make sure message could be invoked
                # The message no longer exists
                raise ActivityFlushError, (
415
                    'Could not evaluate %s on %s' % (m.method_id , path))
416
            if validate_value is INVALID_PATH:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
417 418
              # The message no longer exists
              raise ActivityFlushError, (
419
                  'The document %s does not exist' % path)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
420

421
  def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
422
    # YO: reading all lines might cause a deadlock
Jean-Paul Smets's avatar
Jean-Paul Smets committed
423
    message_list = []
424
    if hasattr(activity_tool,'SQLDict_readMessageList'):
425
      result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None, to_processing_date=None,include_processing=include_processing)
426 427 428 429
      for line in result:
        m = self.loadMessage(line.message, uid = line.uid)
        m.processing_node = line.processing_node
        m.priority = line.priority
430
        m.processing = line.processing
431
        message_list.append(m)
432 433
    return message_list

434 435 436 437 438 439 440 441 442 443
  def dumpMessageList(self, activity_tool):
    # Dump all messages in the table.
    message_list = []
    if hasattr(activity_tool, 'SQLDict_dumpMessageList'):
      result = activity_tool.SQLDict_dumpMessageList()
      for line in result:
        m = self.loadMessage(line.message, uid = line.uid)
        message_list.append(m)
    return message_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
444 445
  def distribute(self, activity_tool, node_count):
    processing_node = 1
446
    if hasattr(activity_tool,'SQLDict_readMessageList'):
447 448 449 450 451 452
      now_date = DateTime()
      if (now_date - self.max_processing_date) > MAX_PROCESSING_TIME:
        # Sticky processing messages should be set back to non processing
        max_processing_date = now_date - MAX_PROCESSING_TIME
        self.max_processing_date = now_date
      else:
453
        max_processing_date = None
454
      result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1,
455 456
                                                     to_processing_date = max_processing_date,
                                                     include_processing=0) # Only assign non assigned messages
457 458 459 460
      get_transaction().commit() # Release locks before starting a potentially long calculation
      path_dict = {}
      for line in result:
        path = line.path
461 462 463 464 465
        broadcast = line.broadcast
        if broadcast:
          # Broadcast messages must be distributed into all nodes.
          uid = line.uid
          activity_tool.SQLDict_assignMessage(processing_node=1, uid=[uid])
Yoshinori Okuji's avatar
Yoshinori Okuji committed
466 467 468 469 470 471 472 473 474
          if node_count > 1:
            for node in range(2, node_count+1):
              activity_tool.SQLDict_writeMessage( path = path,
                                                  method_id = line.method_id,
                                                  priority = line.priority,
                                                  broadcast = 1,
                                                  processing_node = node,
                                                  message = line.message,
                                                  date = line.date)
475
        elif not path_dict.has_key(path):
476 477
          # Only assign once (it would be different for a queue)
          path_dict[path] = 1
478
          activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None, broadcast=0)
479 480 481 482
          get_transaction().commit() # Release locks immediately to allow processing of messages
          processing_node = processing_node + 1
          if processing_node > node_count:
            processing_node = 1 # Round robin
Jean-Paul Smets's avatar
Jean-Paul Smets committed
483

484 485 486
  # Validation private methods
  def _validate_after_method_id(self, activity_tool, message, value):
    # Count number of occurances of method_id
487
    if type(value) is StringType:
488
      value = [value]
489 490 491 492
    if len(value)>0: # if empty list provided, the message is valid
      result = activity_tool.SQLDict_validateMessageList(method_id=value, message_uid=None, path=None)
      if result[0].uid_count > 0:
        return INVALID_ORDER
493
    return VALID
494

495 496
  def _validate_after_path(self, activity_tool, message, value):
    # Count number of occurances of path
497
    if type(value) is StringType:
498
      value = [value]
499 500 501 502
    if len(value)>0: # if empty list provided, the message is valid
      result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=None, path=value)
      if result[0].uid_count > 0:
        return INVALID_ORDER
503
    return VALID
504

505 506 507 508 509 510 511
  def _validate_after_message_uid(self, activity_tool, message, value):
    # Count number of occurances of message_uid
    result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=value, path=None)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID

512 513
  def _validate_after_path_and_method_id(self, activity_tool, message, value):
    # Count number of occurances of path and method_id
514
    if (type(value) != TupleType and type(value) != ListType) or len(value)<2:
515 516 517 518
      LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_path_and_method_id : %s' % repr(value))
      return VALID
    path = value[0]
    method = value[1]
519
    if type(path) is StringType:
520
      path = [path]
521
    if type(method) is StringType:
522 523 524 525 526 527
      method = [method]
    result = activity_tool.SQLDict_validateMessageList(method_id=method, message_uid=None, path=path)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID

528 529
  def _validate_after_tag(self, activity_tool, message, value):
    # Count number of occurances of tag
530
    if type(value) is StringType:
531 532 533 534 535 536 537 538
      value = [value]
    result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=None, tag=value)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID
    
  def _validate_after_tag_and_method_id(self, activity_tool, message, value):
    # Count number of occurances of tag and method_id
539
    if (type(value) != TupleType and type(value) != ListType) or len(value)<2:
540 541 542 543
      LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_tag_and_method_id : %s' % repr(value))
      return VALID
    tag = value[0]
    method = value[1]
544
    if type(tag) is StringType:
545
      tag = [tag]
546
    if type(method) is StringType:
547 548 549 550 551 552
      method = [method]
    result = activity_tool.SQLDict_validateMessageList(method_id=method, message_uid=None, tag=tag)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID

553
  # Required for tests (time shift)
554
  def timeShift(self, activity_tool, delay, processing_node=None,retry=None):
555 556 557 558
    """
      To simulate timeShift, we simply substract delay from
      all dates in SQLDict message table
    """
559
    activity_tool.SQLDict_timeShift(delay = delay, processing_node = processing_node,retry=retry)
560

Jean-Paul Smets's avatar
Jean-Paul Smets committed
561
registerActivity(SQLDict)