SQLDict.py 29.2 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2
##############################################################################
#
3
# Copyright (c) 2002,2007 Nexedi SA and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
from DateTime import DateTime
Jean-Paul Smets's avatar
Jean-Paul Smets committed
30
from Products.CMFActivity.ActivityTool import registerActivity
31
from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY
Jean-Paul Smets's avatar
Jean-Paul Smets committed
32
from RAMDict import RAMDict
33
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
34
from Products.CMFActivity.Errors import ActivityFlushError
35
from ZODB.POSException import ConflictError
36
import sys
37
from types import ClassType
Jean-Paul Smets's avatar
Jean-Paul Smets committed
38

39 40 41 42 43
try:
  from transaction import get as get_transaction
except ImportError:
  pass

Yoshinori Okuji's avatar
Yoshinori Okuji committed
44
from zLOG import LOG, TRACE, WARNING, ERROR
Jean-Paul Smets's avatar
Jean-Paul Smets committed
45

46
MAX_PRIORITY = 5
47
MAX_GROUPED_OBJECTS = 500
Jean-Paul Smets's avatar
Jean-Paul Smets committed
48

49 50 51 52 53 54
priority_weight = \
  [1] * 64 + \
  [2] * 20 + \
  [3] * 10 + \
  [4] * 5 + \
  [5] * 1
55

56

Jean-Paul Smets's avatar
Jean-Paul Smets committed
57 58 59 60 61 62
class SQLDict(RAMDict):
  """
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
  """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
63
  # Transaction commit methods
64
  def prepareQueueMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
65
    if m.is_registered:
66
      activity_tool.SQLDict_writeMessage( path = '/'.join(m.object_path) ,
Jean-Paul Smets's avatar
Jean-Paul Smets committed
67 68
                                          method_id = m.method_id,
                                          priority = m.activity_kw.get('priority', 1),
69
                                          broadcast = m.activity_kw.get('broadcast', 0),
70
                                          message = self.dumpMessage(m),
71
                                          date = m.activity_kw.get('at_date', DateTime()),
72
                                          group_method_id = m.activity_kw.get('group_method_id', ''),
73 74
                                          tag = m.activity_kw.get('tag', ''),
                                          order_validation_text = self.getOrderValidationText(m))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
75
                                          # Also store uid of activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
76

77 78 79 80 81 82
  def prepareQueueMessageList(self, activity_tool, message_list):
    registered_message_list = []
    for message in message_list:
      if message.is_registered:
        registered_message_list.append(message)
    if len(registered_message_list) > 0:
83
      #LOG('SQLDict prepareQueueMessageList', 0, 'registered_message_list = %r' % (registered_message_list,))
84 85 86 87 88
      path_list = ['/'.join(message.object_path) for message in registered_message_list]
      method_id_list = [message.method_id for message in registered_message_list]
      priority_list = [message.activity_kw.get('priority', 1) for message in registered_message_list]
      broadcast_list = [message.activity_kw.get('broadcast', 0) for message in registered_message_list]
      dumped_message_list = [self.dumpMessage(message) for message in registered_message_list]
89 90 91
      datetime = DateTime()
      date_list = [message.activity_kw.get('at_date', datetime) for message in registered_message_list]
      group_method_id_list = [message.activity_kw.get('group_method_id', '') for message in registered_message_list]
92
      tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list]
93
      order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list]
94 95 96
      uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity', id_count=len(registered_message_list))
      activity_tool.SQLDict_writeMessageList( uid_list = uid_list,
                                              path_list = path_list,
97 98 99 100
                                              method_id_list = method_id_list,
                                              priority_list = priority_list,
                                              broadcast_list = broadcast_list,
                                              message_list = dumped_message_list,
101
                                              date_list = date_list,
102
                                              group_method_id_list = group_method_id_list,
103 104
                                              tag_list = tag_list,
                                              order_validation_text_list = order_validation_text_list)
105

106 107
  def prepareDeleteMessage(self, activity_tool, m):
    # Erase all messages in a single transaction
Jean-Paul Smets's avatar
Jean-Paul Smets committed
108
    path = '/'.join(m.object_path)
109 110
    order_validation_text = self.getOrderValidationText(m)
    uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = m.method_id,
111
                                                 order_validation_text = order_validation_text,
112
                                                 processing_node = None)
113
    uid_list = [x.uid for x in uid_list]
114
    if len(uid_list)>0:
115 116 117
      activity_tool.SQLDict_delMessage(uid = uid_list)

  # Registration management
Jean-Paul Smets's avatar
Jean-Paul Smets committed
118
  def registerActivityBuffer(self, activity_buffer):
119
    pass
120

Jean-Paul Smets's avatar
Jean-Paul Smets committed
121
  def isMessageRegistered(self, activity_buffer, activity_tool, m):
122
    uid_set = activity_buffer.getUidSet(self)
123
    return (tuple(m.object_path), m.method_id, m.activity_kw.get('tag')) in uid_set
124

Jean-Paul Smets's avatar
Jean-Paul Smets committed
125 126
  def registerMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 1
127
    uid_set = activity_buffer.getUidSet(self)
128
    uid_set.add((tuple(m.object_path), m.method_id, m.activity_kw.get('tag')))
129 130
    message_list = activity_buffer.getMessageList(self)
    message_list.append(m)
131

Jean-Paul Smets's avatar
Jean-Paul Smets committed
132 133
  def unregisterMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 0 # This prevents from inserting deleted messages into the queue
134
    class_name = self.__class__.__name__
135
    uid_set = activity_buffer.getUidSet(self)
136
    uid_set.discard((tuple(m.object_path), m.method_id, m.activity_kw.get('tag')))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
137 138

  def getRegisteredMessageList(self, activity_buffer, activity_tool):
139 140
    message_list = activity_buffer.getMessageList(self)
    return [m for m in message_list if m.is_registered]
141

142
  def validateMessage(self, activity_tool, message, uid_list, priority, processing_node):
143
    validation_state = message.validate(self, activity_tool, check_order_validation=0)
144
    if validation_state is not VALID:
145 146 147 148 149 150 151 152
      # There is a serious validation error - we must lower priority
      if priority > MAX_PRIORITY:
        # This is an error
        if len(uid_list) > 0:
          activity_tool.SQLDict_assignMessage(uid=uid_list, processing_node=VALIDATE_ERROR_STATE)
                                                                          # Assign message back to 'error' state
        #m.notifyUser(activity_tool)                                      # Notify Error
        get_transaction().commit()                                        # and commit
153
      else:
154 155 156 157
        # Lower priority
        if len(uid_list) > 0: # Add some delay before new processing
          activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY,
                                            priority=priority + 1, retry=1)
158 159 160
        get_transaction().commit() # Release locks before starting a potentially long calculation
      return 0
    return 1
161

Jean-Paul Smets's avatar
Jean-Paul Smets committed
162
  # Queue semantic
Jean-Paul Smets's avatar
Jean-Paul Smets committed
163
  def dequeueMessage(self, activity_tool, processing_node):
164 165 166 167 168
    readMessage = getattr(activity_tool, 'SQLDict_readMessage', None)
    if readMessage is None:
      return 1

    now_date = DateTime()
169 170
    result = readMessage(processing_node=processing_node, to_date=now_date)
    if len(result) > 0:
171 172 173 174
      line = result[0]
      path = line.path
      method_id = line.method_id
      order_validation_text = line.order_validation_text
175 176 177
      uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id,
                                                   processing_node=None, to_date=now_date,
                                                   order_validation_text=order_validation_text)
178 179 180 181 182 183
      uid_list = [x.uid for x in uid_list]
      uid_list_list = [uid_list]
      priority_list = [line.priority]
      # Make sure message can not be processed anylonger
      if len(uid_list) > 0:
        # Set selected messages to processing
184
        activity_tool.SQLDict_processMessage(uid=uid_list)
185 186
      get_transaction().commit() # Release locks before starting a potentially long calculation
      # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
187 188 189 190

      # At this point, messages are marked as processed. So catch any kind of exception to make sure
      # that they are unmarked on error.
      try:
191
        m = self.loadMessage(line.message, uid=line.uid)
192 193 194 195 196
        message_list = [m]
        # Validate message (make sure object exists, priority OK, etc.)
        if not self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
          return 0

197 198 199 200
        group_method_id = m.activity_kw.get('group_method_id')
        if group_method_id is not None:
          # Count the number of objects to prevent too many objects.
          if m.hasExpandMethod():
201
            count = len(m.getObjectList(activity_tool))
202
          else:
203 204
            count = 1

205
          group_method = activity_tool.getPortalObject().restrictedTraverse(group_method_id)
206 207 208

          if count < MAX_GROUPED_OBJECTS:
            # Retrieve objects which have the same group method.
209 210 211
            result = readMessage(processing_node=processing_node,
                                 to_date=now_date, group_method_id=group_method_id,
                                 order_validation_text=order_validation_text)
212
            #LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
213
            path_and_method_id_dict = {}
214 215 216
            for line in result:
              path = line.path
              method_id = line.method_id
217 218 219 220 221 222 223

              # Prevent using the same pair of a path and a method id.
              key = (path, method_id)
              if key in path_and_method_id_dict:
                continue
              path_and_method_id_dict[key] = 1

224 225 226 227
              uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id,
                                                           processing_node=None,
                                                           to_date=now_date,
                                                           order_validation_text=order_validation_text)
228 229 230
              uid_list = [x.uid for x in uid_list]
              if len(uid_list) > 0:
                # Set selected messages to processing
231
                activity_tool.SQLDict_processMessage(uid=uid_list)
232
              get_transaction().commit() # Release locks before starting a potentially long calculation
233 234 235 236

              # Save this newly marked uids as soon as possible.
              uid_list_list.append(uid_list)

237
              m = self.loadMessage(line.message, uid=line.uid)
238 239
              if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
                if m.hasExpandMethod():
240
                  count += len(m.getObjectList(activity_tool))
241 242 243 244 245 246
                else:
                  count += 1
                message_list.append(m)
                priority_list.append(line.priority)
                if count >= MAX_GROUPED_OBJECTS:
                  break
247 248 249 250
              else:
                # If the uids were not valid, remove them from the list, as validateMessage
                # unmarked them.
                uid_list_list.pop()
251

252 253
          # Release locks before starting a potentially long calculation
          get_transaction().commit()
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276

        # Try to invoke
        if group_method_id is not None:
          LOG('SQLDict', TRACE,
              'invoking a group method %s with %d objects '\
              ' (%d objects in expanded form)' % (
              group_method_id, len(message_list), count))
          activity_tool.invokeGroup(group_method_id, message_list)
        else:
          activity_tool.invoke(message_list[0])

        # Check if messages are executed successfully.
        # When some of them are executed successfully, it may not be acceptable to
        # abort the transaction, because these remain pending, only due to other
        # invalid messages. This means that a group method should not be used if
        # it has a side effect. For now, only indexing uses a group method, and this
        # has no side effect.
        for m in message_list:
          if m.is_executed:
            get_transaction().commit()
            break
        else:
          get_transaction().abort()
277
      except:
278 279 280 281 282
        # If an exception occurs, abort the transaction to minimize the impact,
        try:
          get_transaction().abort()
        except:
          # Unfortunately, database adapters may raise an exception against abort.
283 284
          LOG('SQLDict', WARNING,
              'abort failed, thus some objects may be modified accidentally')
285
          pass
286 287 288

        # An exception happens at somewhere else but invoke or invokeGroup, so messages
        # themselves should not be delayed.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
289
        try:
290 291 292
          for uid_list in uid_list_list:
            if len(uid_list):
              # This only sets processing to zero.
293
              activity_tool.SQLDict_setPriority(uid=uid_list)
294
              get_transaction().commit()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
295
        except:
296 297
          LOG('SQLDict', ERROR,
              'SQLDict.dequeueMessage raised, and cannot even set processing to zero due to an exception',
Yoshinori Okuji's avatar
Yoshinori Okuji committed
298 299
              error=sys.exc_info())
          raise
300
        return 0
301

Yoshinori Okuji's avatar
Yoshinori Okuji committed
302 303 304 305 306 307
      try:
        for i in xrange(len(message_list)):
          m = message_list[i]
          uid_list = uid_list_list[i]
          priority = priority_list[i]
          if m.is_executed:
308
            if len(uid_list) > 0:
309 310
              activity_tool.SQLDict_delMessage(uid=uid_list)       # Delete it
            get_transaction().commit()                             # If successful, commit
Yoshinori Okuji's avatar
Yoshinori Okuji committed
311 312 313 314 315
            if m.active_process:
              active_process = activity_tool.unrestrictedTraverse(m.active_process)
              if not active_process.hasActivity():
                # No more activity
                m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
316
          else:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
317 318
            if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
              # If this is a conflict error, do not lower the priority but only delay.
319
              activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY)
320
              get_transaction().commit() # Release locks before starting a potentially long calculation
Yoshinori Okuji's avatar
Yoshinori Okuji committed
321 322 323
            elif priority > MAX_PRIORITY:
              # This is an error
              if len(uid_list) > 0:
324 325
                activity_tool.SQLDict_assignMessage(uid=uid_list,
                                                    processing_node=INVOKE_ERROR_STATE)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
326 327 328 329 330 331
                                                                                # Assign message back to 'error' state
              m.notifyUser(activity_tool)                                       # Notify Error
              get_transaction().commit()                                        # and commit
            else:
              # Lower priority
              if len(uid_list) > 0:
332 333 334
                activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY,
                                                  priority=priority + 1)
              get_transaction().commit() # Release locks before starting a potentially long calculation
Yoshinori Okuji's avatar
Yoshinori Okuji committed
335
      except:
336 337
        LOG('SQLDict', ERROR,
            'SQLDict.dequeueMessage raised an exception during checking for the results of processed messages',
Yoshinori Okuji's avatar
Yoshinori Okuji committed
338 339
            error=sys.exc_info())
        raise
340 341 342

      return 0
    get_transaction().commit() # Release locks before starting a potentially long calculation
Jean-Paul Smets's avatar
Jean-Paul Smets committed
343 344
    return 1

345
  def hasActivity(self, activity_tool, object, **kw):
346 347
    hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
    if hasMessage is not None:
348 349
      if object is not None:
        my_object_path = '/'.join(object.getPhysicalPath())
350
        result = hasMessage(path=my_object_path, **kw)
351 352 353 354
        if len(result) > 0:
          return result[0].message_count > 0
      else:
        return 1 # Default behaviour if no object specified is to return 1 until active_process implemented
Jean-Paul Smets's avatar
Jean-Paul Smets committed
355 356
    return 0

Jean-Paul Smets's avatar
Jean-Paul Smets committed
357
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
358 359
    """
      object_path is a tuple
Jean-Paul Smets's avatar
Jean-Paul Smets committed
360 361 362 363 364 365

      commit allows to choose mode
        - if we commit, then we make sure no locks are taken for too long
        - if we do not commit, then we can use flush in a larger transaction

      commit should in general not be used
366 367

      NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
Jean-Paul Smets's avatar
Jean-Paul Smets committed
368 369
    """
    path = '/'.join(object_path)
370
    # LOG('Flush', 0, str((path, invoke, method_id)))
371
    method_dict = {}
372 373
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
374 375
      # Parse each message in registered
      for m in activity_tool.getRegisteredMessageList(self):
376
        if m.object_path == object_path and (method_id is None or method_id == m.method_id):
377
          #if not method_dict.has_key(method_id or m.method_id):
378 379
          if not method_dict.has_key(m.method_id):
            method_dict[m.method_id] = 1 # Prevents calling invoke twice
380 381
            if invoke:
              # First Validate
382 383
              validate_value = m.validate(self, activity_tool)
              if validate_value is VALID:
384 385 386 387
                activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
                if not m.is_executed:                                                 # Make sure message could be invoked
                  # The message no longer exists
                  raise ActivityFlushError, (
388
                      'Could not evaluate %s on %s' % (m.method_id , path))
389
              elif validate_value is INVALID_PATH:
390 391
                # The message no longer exists
                raise ActivityFlushError, (
392
                    'The document %s does not exist' % path)
393 394 395 396
              else:
                raise ActivityFlushError, (
                    'Could not validate %s on %s' % (m.method_id , path))
          activity_tool.unregisterMessage(self, m)
397
      # Parse each message in SQL dict
398 399
      result = readMessageList(path=path, method_id=method_id,
                               processing_node=None,include_processing=0)
400 401
      for line in result:
        path = line.path
402 403
        line_method_id = line.method_id
        if not method_dict.has_key(line_method_id):
404
          # Only invoke once (it would be different for a queue)
405 406
          # This is optimisation with the goal to process objects on the same
          # node and minimize network traffic with ZEO server
407
          method_dict[line_method_id] = 1
408
          m = self.loadMessage(line.message, uid = line.uid)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
409 410
          if invoke:
            # First Validate
411
            validate_value = m.validate(self, activity_tool)
Romain Courteaud's avatar
Romain Courteaud committed
412
#             LOG('SQLDict.flush validate_value',0,validate_value)
413
            if validate_value is VALID:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
414
              activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
Romain Courteaud's avatar
Romain Courteaud committed
415
#               LOG('SQLDict.flush m.is_executed',0,m.is_executed)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
416 417 418
              if not m.is_executed:                                                 # Make sure message could be invoked
                # The message no longer exists
                raise ActivityFlushError, (
419
                    'Could not evaluate %s on %s' % (m.method_id , path))
420
            elif validate_value is INVALID_PATH:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
421 422
              # The message no longer exists
              raise ActivityFlushError, (
423
                  'The document %s does not exist' % path)
424 425 426
            else:
              raise ActivityFlushError, (
                  'Could not validate %s on %s' % (m.method_id , path))
427 428

      if len(result):
429
        uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
430
                                                     processing_node = None,)
431 432
        if len(uid_list)>0:
          activity_tool.SQLDict_delMessage(uid = [x.uid for x in uid_list])
Jean-Paul Smets's avatar
Jean-Paul Smets committed
433

434
  def getMessageList(self, activity_tool, processing_node=None, include_processing=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
435
    # YO: reading all lines might cause a deadlock
Jean-Paul Smets's avatar
Jean-Paul Smets committed
436
    message_list = []
437 438 439
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
      result = readMessageList(path=None, method_id=None, processing_node=None,
440
                               to_processing_date=None, include_processing=include_processing)
441 442 443 444
      for line in result:
        m = self.loadMessage(line.message, uid = line.uid)
        m.processing_node = line.processing_node
        m.priority = line.priority
445
        m.processing = line.processing
446
        message_list.append(m)
447 448
    return message_list

449 450 451
  def dumpMessageList(self, activity_tool):
    # Dump all messages in the table.
    message_list = []
452 453 454
    dumpMessageList = getattr(activity_tool, 'SQLDict_dumpMessageList', None)
    if dumpMessageList is not None:
      result = dumpMessageList()
455 456 457 458 459
      for line in result:
        m = self.loadMessage(line.message, uid = line.uid)
        message_list.append(m)
    return message_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
460
  def distribute(self, activity_tool, node_count):
461 462
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
463
      now_date = DateTime()
464 465 466 467 468 469
      result = readMessageList(path=None, method_id=None, processing_node=-1,
                               to_date=now_date, include_processing=0)
      get_transaction().commit()

      validation_text_dict = {'none': 1}
      message_dict = {}
470
      for line in result:
471 472 473 474 475 476 477 478 479
        message = self.loadMessage(line.message, uid = line.uid,
                                   order_validation_text = line.order_validation_text)
        self.getExecutableMessageList(activity_tool, message, message_dict,
                                      validation_text_dict)

      # XXX probably this below can be optimized by assigning multiple messages at a time.
      path_dict = {}
      assignMessage = activity_tool.SQLDict_assignMessage
      processing_node = 1
480
      id_tool = activity_tool.getPortalObject().portal_ids
481 482 483
      for message in message_dict.itervalues():
        path = '/'.join(message.object_path)
        broadcast = message.activity_kw.get('broadcast', 0)
484 485
        if broadcast:
          # Broadcast messages must be distributed into all nodes.
486 487
          uid = message.uid
          assignMessage(processing_node=1, uid=[uid])
Yoshinori Okuji's avatar
Yoshinori Okuji committed
488
          if node_count > 1:
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525
            uid_list = id_tool.generateNewLengthIdList(id_group='portal_activity',
                                                       id_count=node_count - 1)
            path_list = [path] * (node_count - 1)
            method_id_list = [message.method_id] * (node_count - 1)
            priority_list = [message.activity_kw.get('priority', 1)] * (node_count - 1)
            processing_node_list = range(2, node_count + 1)
            broadcast_list = [1] * (node_count - 1)
            message_list = [self.dumpMessage(message)] * (node_count - 1)
            date_list = [message.activity_kw.get('at_date', now_date)] * (node_count - 1)
            group_method_id_list = [message.activity_kw.get('group_method_id', '')] * (node_count - 1)
            tag_list = [message.activity_kw.get('tag', '')] * (node_count - 1)
            order_validation_text_list = [message.order_validation_text] * (node_count - 1)
            activity_tool.SQLDict_writeMessageList(uid_list=uid_list,
                                                   path_list=path_list,
                                                   method_id_list=method_id_list,
                                                   priority_list=priority_list,
                                                   broadcast_list=broadcast_list,
                                                   processing_node_list=processing_node_list,
                                                   message_list=message_list,
                                                   date_list=date_list,
                                                   group_method_id_list=group_method_id_list,
                                                   tag_list=tag_list,
                                                   order_validation_text_list=order_validation_text_list)
          get_transaction().commit()
        else:
          # Select a processing node. If the same path appears again, dispatch the message to
          # the same node, so that object caching is more efficient. Otherwise, apply a round
          # robin scheduling.
          node = path_dict.get(path)
          if node is None:
            node = processing_node
            path_dict[path] = node
            processing_node += 1
            if processing_node > node_count:
              processing_node = 1

          assignMessage(processing_node=node, uid=[message.uid], broadcast=0)
526
          get_transaction().commit() # Release locks immediately to allow processing of messages
Jean-Paul Smets's avatar
Jean-Paul Smets committed
527

528
  # Validation private methods
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554
  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None):
    if isinstance(method_id, str):
      method_id = [method_id]
    if isinstance(path, str):
      path = [path]
    if isinstance(tag, str):
      tag = [tag]

    if method_id or message_uid or path or tag:
      validateMessageList = activity_tool.SQLDict_validateMessageList
      result = validateMessageList(method_id=method_id,
                                   message_uid=message_uid,
                                   path=path,
                                   tag=tag)
      message_list = []
      for line in result:
        m = self.loadMessage(line.message,
                             uid=line.uid,
                             order_validation_text=line.order_validation_text,
                             date=line.date,
                             processing_node=line.processing_node)
        message_list.append(m)
      return message_list
    else:
      return []

555
  def _validate_after_method_id(self, activity_tool, message, value):
556
    return self._validate(activity_tool, method_id=value)
557

558
  def _validate_after_path(self, activity_tool, message, value):
559
    return self._validate(activity_tool, path=value)
560

561
  def _validate_after_message_uid(self, activity_tool, message, value):
562
    return self._validate(activity_tool, message_uid=value)
563

564
  def _validate_after_path_and_method_id(self, activity_tool, message, value):
565 566 567 568 569
    if not isinstance(value, (tuple, list)) or len(value) < 2:
      LOG('CMFActivity', WARNING,
          'unable to recognize value for after_path_and_method_id: %r' % (value,))
      return []
    return self._validate(activity_tool, path=value[0], method_id=value[1])
570

571
  def _validate_after_tag(self, activity_tool, message, value):
572
    return self._validate(activity_tool, tag=value)
573

574 575 576 577 578 579 580 581 582 583 584
  def _validate_after_tag_and_method_id(self, activity_tool, message, value):
    # Count number of occurances of tag and method_id
    if not isinstance(value, (tuple, list)) or len(value) < 2:
      LOG('CMFActivity', WARNING,
          'unable to recognize value for after_tag_and_method_id: %r' % (value,))
      return []
    return self._validate(activity_tool, tag=value[0], method_id=value[1])

  def countMessage(self, activity_tool, tag=None, path=None,
                   method_id=None, message_uid=None, **kw):
    """Return the number of messages which match the given parameters.
Sebastien Robin's avatar
Sebastien Robin committed
585
    """
586
    if isinstance(tag, str):
Sebastien Robin's avatar
Sebastien Robin committed
587
      tag = [tag]
588
    if isinstance(path, str):
Sebastien Robin's avatar
Sebastien Robin committed
589
      path = [path]
590
    elif isinstance(method_id, str):
Sebastien Robin's avatar
Sebastien Robin committed
591 592 593 594
      method_id = [method_id]
    result = activity_tool.SQLDict_validateMessageList(method_id=method_id, 
                                                       path=path,
                                                       message_uid=message_uid, 
595 596
                                                       tag=tag,
                                                       count=1)
Sebastien Robin's avatar
Sebastien Robin committed
597 598
    return result[0].uid_count

599
  def countMessageWithTag(self, activity_tool, value):
600
    """Return the number of messages which match the given tag.
601
    """
602
    return self.countMessage(activity_tool, tag=value)
603

604
  # Required for tests (time shift)
605
  def timeShift(self, activity_tool, delay, processing_node=None, retry=None):
606 607 608 609
    """
      To simulate timeShift, we simply substract delay from
      all dates in SQLDict message table
    """
610
    activity_tool.SQLDict_timeShift(delay=delay, processing_node=processing_node,retry=retry)
611

Jean-Paul Smets's avatar
Jean-Paul Smets committed
612
registerActivity(SQLDict)