SQLDict.py 29.5 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2
##############################################################################
#
3
# Copyright (c) 2002,2007 Nexedi SA and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
30
from Queue import VALID, INVALID_PATH
Jean-Paul Smets's avatar
Jean-Paul Smets committed
31
from RAMDict import RAMDict
32
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
33
from Products.CMFActivity.Errors import ActivityFlushError
34
from ZODB.POSException import ConflictError
35
import sys
36
from types import ClassType
37
#from time import time
38
from SQLBase import SQLBase, sort_message_key
39 40
from Products.CMFActivity.ActivityRuntimeEnvironment import (
  ActivityRuntimeEnvironment, getTransactionalVariable)
41
from zExceptions import ExceptionFormatter
Jean-Paul Smets's avatar
Jean-Paul Smets committed
42

43
import transaction
44

45
from zLOG import LOG, TRACE, WARNING, ERROR, INFO, PANIC
Jean-Paul Smets's avatar
Jean-Paul Smets committed
46

47 48
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000 
Yoshinori Okuji's avatar
Yoshinori Okuji committed
49
# Read up to this number of messages to validate.
50
READ_MESSAGE_LIMIT = 1000
Yoshinori Okuji's avatar
Yoshinori Okuji committed
51 52
# Stop electing more messages for processing if more than this number of
# objects are impacted by elected messages.
53
MAX_GROUPED_OBJECTS = 100
Jean-Paul Smets's avatar
Jean-Paul Smets committed
54

55 56
MAX_MESSAGE_LIST_SIZE = 100

57
class SQLDict(RAMDict, SQLBase):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
58 59 60 61 62
  """
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
  """
63 64
  sql_table = 'message'

Jean-Paul Smets's avatar
Jean-Paul Smets committed
65
  # Transaction commit methods
66
  def prepareQueueMessageList(self, activity_tool, message_list):
67 68 69
    message_list = [m for m in message_list if m.is_registered]
    for i in xrange(0, len(message_list), MAX_MESSAGE_LIST_SIZE):
      registered_message_list = message_list[i:i + MAX_MESSAGE_LIST_SIZE]
70
      #LOG('SQLDict prepareQueueMessageList', 0, 'registered_message_list = %r' % (registered_message_list,))
71
      path_list = ['/'.join(message.object_path) for message in registered_message_list]
72
      active_process_uid_list = [message.active_process_uid for message in registered_message_list]
73 74 75
      method_id_list = [message.method_id for message in registered_message_list]
      priority_list = [message.activity_kw.get('priority', 1) for message in registered_message_list]
      dumped_message_list = [self.dumpMessage(message) for message in registered_message_list]
76
      date_list = [message.activity_kw.get('at_date', None) for message in registered_message_list]
77 78
      group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''), message.activity_kw.get('group_id', '')])
                              for message in registered_message_list]
79
      tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list]
80
      serialization_tag_list = [message.activity_kw.get('serialization_tag', '') for message in registered_message_list]
81
      order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list]
82 83 84 85
      # The uid_list also is store in the ZODB
      uid_list = activity_tool.getPortalObject().portal_ids.\
                                           generateNewIdList(id_generator='uid', id_group='portal_activity',
                                           id_count=len(registered_message_list))
86 87
      activity_tool.SQLDict_writeMessageList( uid_list = uid_list,
                                              path_list = path_list,
88
                                              active_process_uid_list=active_process_uid_list,
89 90 91
                                              method_id_list = method_id_list,
                                              priority_list = priority_list,
                                              message_list = dumped_message_list,
92
                                              date_list = date_list,
93
                                              group_method_id_list = group_method_id_list,
94
                                              tag_list = tag_list,
95
                                              serialization_tag_list = serialization_tag_list,
96
                                              processing_node_list=None,
97
                                              order_validation_text_list = order_validation_text_list)
98

99 100
  def prepareDeleteMessage(self, activity_tool, m):
    # Erase all messages in a single transaction
Jean-Paul Smets's avatar
Jean-Paul Smets committed
101
    path = '/'.join(m.object_path)
102 103
    order_validation_text = self.getOrderValidationText(m)
    uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = m.method_id,
104
                                                 order_validation_text = order_validation_text)
105
    uid_list = [x.uid for x in uid_list]
106
    if len(uid_list)>0:
107
      activity_tool.SQLBase_delMessage(table=self.sql_table, uid=uid_list)
108

109 110 111 112 113 114 115 116
  def finishQueueMessage(self, activity_tool_path, m):
    # Nothing to do in SQLDict.
    pass

  def finishDeleteMessage(self, activity_tool_path, m):
    # Nothing to do in SQLDict.
    pass

117
  # Registration management
Jean-Paul Smets's avatar
Jean-Paul Smets committed
118
  def registerActivityBuffer(self, activity_buffer):
119
    pass
120

121 122
  def generateMessageUID(self, m):
    return (tuple(m.object_path), m.method_id, m.activity_kw.get('tag'), m.activity_kw.get('group_id'))
123

Jean-Paul Smets's avatar
Jean-Paul Smets committed
124 125
  def unregisterMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 0 # This prevents from inserting deleted messages into the queue
126
    class_name = self.__class__.__name__
127
    uid_set = activity_buffer.getUidSet(self)
128
    uid_set.discard(self.generateMessageUID(m))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
129 130

  def getRegisteredMessageList(self, activity_buffer, activity_tool):
131 132
    message_list = activity_buffer.getMessageList(self)
    return [m for m in message_list if m.is_registered]
133

134
  def getReservedMessageList(self, activity_tool, date, processing_node, limit=None, group_method_id=None):
135 136 137 138 139 140 141 142 143 144 145
    """
      Get and reserve a list of messages.
      limit
        Maximum number of messages to fetch.
        This number is not garanted to be reached, because of:
         - not enough messages being pending execution
         - race condition (other nodes reserving the same messages at the same
           time)
        This number is guaranted not to be exceeded.
        If None (or not given) no limit apply.
    """
146 147 148 149
    result = not group_method_id and \
      activity_tool.SQLDict_selectReservedMessageList(
        processing_node=processing_node, count=limit)
    if not result:
150
      activity_tool.SQLDict_reserveMessageList(count=limit, processing_node=processing_node, to_date=date, group_method_id=group_method_id)
151
      result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit)
152 153 154 155 156 157 158 159 160
    return result

  def makeMessageListAvailable(self, activity_tool, uid_list):
    """
      Put messages back in processing_node=0 .
    """
    if len(uid_list):
      activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list)

161
  def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
162
    """
163 164
      Reserve unreserved messages matching given line.
      Return their uids.
165
    """
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
    try:
      result = activity_tool.SQLDict_selectDuplicatedLineList(
        path=line.path,
        method_id=line.method_id,
        group_method_id=line.group_method_id,
        order_validation_text=line.order_validation_text
      )
      uid_list = [x.uid for x in result]
      if len(uid_list):
        activity_tool.SQLDict_reserveDuplicatedLineList(
          processing_node=processing_node,
          uid_list=uid_list
        )
      else:
        # Release locks
        activity_tool.SQLDict_commit()
    except:
      # Log
      LOG('SQLDict', WARNING, 'getDuplicateMessageUidList got an exception', error=sys.exc_info())
      # Release lock
      activity_tool.SQLDict_rollback()
      # And re-raise
      raise
    return uid_list
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214

  def getProcessableMessageList(self, activity_tool, processing_node):
    """
      Always true:
        For each reserved message, delete redundant messages when it gets
        reserved (definitely lost, but they are expandable since redundant).

      - reserve a message
      - set reserved message to processing=1 state
      - if this message has a group_method_id:
        - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
        - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
          - get one message from the reserved bunch (this messages will be
            "needed")
          - increase the number of impacted object
        - set "needed" reserved messages to processing=1 state
        - unreserve "unneeded" messages
      - return still-reserved message list and a group_method_id

      If any error happens in above described process, try to unreserve all
      messages already reserved in that process.
      If it fails, complain loudly that some messages might still be in an
      unclean state.

      Returned values:
215
        4-tuple:
216
          - list of messages
217 218
          - impacted object count
          - group_method_id
219
          - uid_to_duplicate_uid_list_dict
220
    """
221
    def getReservedMessageList(limit, group_method_id=None):
222 223 224
      line_list = self.getReservedMessageList(activity_tool=activity_tool,
                                              date=now_date,
                                              processing_node=processing_node,
225 226
                                              limit=limit,
                                              group_method_id=group_method_id)
227 228 229
      if len(line_list):
        LOG('SQLDict', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
      return line_list
230 231 232 233 234 235
    def getDuplicateMessageUidList(line):
      uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool, 
        line=line, processing_node=processing_node)
      if len(uid_list):
        LOG('SQLDict', TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
      return uid_list
236 237 238 239 240 241 242 243 244
    def makeMessageListAvailable(uid_list):
      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
    BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
    now_date = self.getNow(activity_tool)
    message_list = []
    count = 0
    group_method_id = None
    try:
      result = getReservedMessageList(limit=1)
245
      uid_to_duplicate_uid_list_dict = {}
246 247
      if len(result) > 0:
        line = result[0]
Julien Muchembled's avatar
Julien Muchembled committed
248
        uid = line.uid
249 250
        m = self.loadMessage(line.message, uid=uid, line=line)
        message_list.append(m)
251
        group_method_id = line.group_method_id
Julien Muchembled's avatar
Julien Muchembled committed
252 253 254
        activity_tool.SQLDict_processMessage(uid=[uid])
        uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
          .extend(getDuplicateMessageUidList(line))
255
        if group_method_id not in (None, '', '\0'):
256
          # Count the number of objects to prevent too many objects.
257
          count += len(m.getObjectList(activity_tool))
258 259
          if count < MAX_GROUPED_OBJECTS:
            # Retrieve objects which have the same group method.
260
            result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT, group_method_id=group_method_id)
261
            path_and_method_id_dict = {}
262
            unreserve_uid_list = []
263
            for line in result:
264 265
              if line.uid == uid:
                continue
266 267 268 269
              # All fetched lines have the same group_method_id and
              # processing_node.
              # Their dates are lower-than or equal-to now_date.
              # We read each line once so lines have distinct uids.
270 271 272
              # So what remains to be filtered on are path, method_id and
              # order_validation_text.
              key = (line.path, line.method_id, line.order_validation_text)
273 274 275
              original_uid = path_and_method_id_dict.get(key)
              if original_uid is not None:
                uid_to_duplicate_uid_list_dict.setdefault(original_uid, []).append(line.uid)
276
                continue
277
              path_and_method_id_dict[key] = line.uid
278
              uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
279
              if count < MAX_GROUPED_OBJECTS:
280
                m = self.loadMessage(line.message, uid=line.uid, line=line)
281
                count += len(m.getObjectList(activity_tool))
282
                message_list.append(m)
283
              else:
284
                unreserve_uid_list.append(line.uid)
285
            activity_tool.SQLDict_processMessage(uid=[m.uid for m in message_list])
286 287
            # Unreserve extra messages as soon as possible.
            makeMessageListAvailable(unreserve_uid_list)
288
      return message_list, count, group_method_id, uid_to_duplicate_uid_list_dict
289 290 291
    except:
      LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
      if len(message_list):
292
        to_free_uid_list = [m.uid for m in message_list]
293 294 295
        try:
          makeMessageListAvailable(to_free_uid_list)
        except:
296
          LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
297
        else:
298 299 300 301
          if len(to_free_uid_list):
            LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
      else:
        LOG('SQLDict', TRACE, '(no message was reserved)')
302
      return [], 0, None, {}
303 304 305

  # Queue semantic
  def dequeueMessage(self, activity_tool, processing_node):
306 307 308 309 310 311
    def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
      final_uid_list = []
      for uid in uid_list:
        final_uid_list.append(uid)
        final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=final_uid_list)
312
    message_list, count, group_method_id, uid_to_duplicate_uid_list_dict = \
313
      self.getProcessableMessageList(activity_tool, processing_node)
314
    if message_list:
315 316 317 318 319 320
      # Remove group_id parameter from group_method_id
      if group_method_id is not None:
        group_method_id = group_method_id.split('\0')[0]
      if group_method_id not in (None, ""):
        method  = activity_tool.invokeGroup
        args = (group_method_id, message_list)
321
        activity_runtime_environment = ActivityRuntimeEnvironment(None)
322 323
      else:
        method = activity_tool.invoke
324 325
        message = message_list[0]
        args = (message, )
326
        activity_runtime_environment = ActivityRuntimeEnvironment(message)
327
      # Commit right before executing messages.
Julien Muchembled's avatar
Julien Muchembled committed
328
      # As MySQL transaction does not start exactly at the same time as ZODB
329 330 331
      # transactions but a bit later, messages available might be called
      # on objects which are not available - or available in an old
      # version - to ZODB connector.
Julien Muchembled's avatar
Julien Muchembled committed
332
      # So all connectors must be committed now that we have selected
333
      # everything needed from MySQL to get a fresh view of ZODB objects.
334
      transaction.commit()
335 336
      tv = getTransactionalVariable(None)
      tv['activity_runtime_environment'] = activity_runtime_environment
337
      # Try to invoke
338 339 340
      try:
        method(*args)
      except:
341
        LOG('SQLDict', WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
342
        try:
343
          transaction.abort()
344 345
        except:
          # Unfortunately, database adapters may raise an exception against abort.
346
          LOG('SQLDict', PANIC,
347
              'abort failed, thus some objects may be modified accidentally')
348
          raise
349 350
        # XXX Is it still useful to free messages now that this node is able
        #     to reselect them ?
351
        to_free_uid_list = [x.uid for x in message_list]
352
        try:
353
          makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict)
354
        except:
355
          LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
356 357
        else:
          LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
358
      # Abort if something failed.
359
      if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
360
        endTransaction = transaction.abort
361
      else:
362
        endTransaction = transaction.commit
Yoshinori Okuji's avatar
Yoshinori Okuji committed
363
      try:
364
        endTransaction()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
365
      except:
366
        LOG('SQLDict', WARNING, 'Failed to end transaction for messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
367
        if endTransaction == transaction.abort:
368 369 370
          LOG('SQLDict', PANIC, 'Failed to abort executed messages. Some objects may be modified accidentally.')
        else:
          try:
371
            transaction.abort()
372 373
          except:
            LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
374
            raise
375
        exc_info = sys.exc_info()
376 377
        for m in message_list:
          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
378
        try:
379
          makeMessageListAvailable([x.uid for x in message_list], uid_to_duplicate_uid_list_dict)
380
        except:
381
          LOG('SQLDict', ERROR, 'Failed to free remaining messages: %r' % (message_list, ), error=sys.exc_info())
382
        else:
383 384
          LOG('SQLDict', TRACE, 'Freed messages %r' % (message_list, ))
      self.finalizeMessageExecution(activity_tool, message_list, uid_to_duplicate_uid_list_dict)
385
    transaction.commit()
386
    return not message_list
Jean-Paul Smets's avatar
Jean-Paul Smets committed
387

388
  def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
389 390
    hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
    if hasMessage is not None:
391 392 393 394
      if object is None:
        my_object_path = None
      else:
        my_object_path = '/'.join(object.getPhysicalPath())
395 396 397
      result = hasMessage(path=my_object_path, method_id=method_id, only_valid=only_valid, active_process_uid=active_process_uid)
      if len(result) > 0:
        return result[0].message_count > 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
398 399
    return 0

Jean-Paul Smets's avatar
Jean-Paul Smets committed
400
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
401 402
    """
      object_path is a tuple
Jean-Paul Smets's avatar
Jean-Paul Smets committed
403 404 405 406 407 408

      commit allows to choose mode
        - if we commit, then we make sure no locks are taken for too long
        - if we do not commit, then we can use flush in a larger transaction

      commit should in general not be used
409 410

      NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
Jean-Paul Smets's avatar
Jean-Paul Smets committed
411 412
    """
    path = '/'.join(object_path)
413
    # LOG('Flush', 0, str((path, invoke, method_id)))
414
    method_dict = {}
415 416
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
417 418
      # Parse each message in registered
      for m in activity_tool.getRegisteredMessageList(self):
419
        if m.object_path == object_path and (method_id is None or method_id == m.method_id):
420
          #if not method_dict.has_key(method_id or m.method_id):
421 422
          if not method_dict.has_key(m.method_id):
            method_dict[m.method_id] = 1 # Prevents calling invoke twice
423
            if invoke:
424 425
              # First Validate
              validate_value = m.validate(self, activity_tool)
426
              if validate_value is VALID:
427
                activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
428
                if m.getExecutionState() != MESSAGE_EXECUTED:                                                 # Make sure message could be invoked
429 430
                  # The message no longer exists
                  raise ActivityFlushError, (
431
                      'Could not evaluate %s on %s' % (m.method_id , path))
432
              elif validate_value is INVALID_PATH:
433 434
                # The message no longer exists
                raise ActivityFlushError, (
435
                    'The document %s does not exist' % path)
436 437 438 439
              else:
                raise ActivityFlushError, (
                    'Could not validate %s on %s' % (m.method_id , path))
          activity_tool.unregisterMessage(self, m)
440
      # Parse each message in SQL dict
441
      result = readMessageList(path=path, method_id=method_id,
442
                               processing_node=None,include_processing=0, to_date=None)
443 444
      for line in result:
        path = line.path
445 446
        line_method_id = line.method_id
        if not method_dict.has_key(line_method_id):
447
          # Only invoke once (it would be different for a queue)
448 449
          # This is optimisation with the goal to process objects on the same
          # node and minimize network traffic with ZEO server
450
          method_dict[line_method_id] = 1
451
          m = self.loadMessage(line.message, uid=line.uid, line=line)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
452
          if invoke:
453 454 455 456 457
            # First Validate (only if message is marked as new)
            if line.processing_node == -1:
              validate_value = m.validate(self, activity_tool)
            else:
              validate_value = VALID
Romain Courteaud's avatar
Romain Courteaud committed
458
#             LOG('SQLDict.flush validate_value',0,validate_value)
459
            if validate_value is VALID:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
460
              activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
461
              if m.getExecutionState() != MESSAGE_EXECUTED:                                                 # Make sure message could be invoked
Jean-Paul Smets's avatar
Jean-Paul Smets committed
462 463
                # The message no longer exists
                raise ActivityFlushError, (
464
                    'Could not evaluate %s on %s' % (m.method_id , path))
465
            elif validate_value is INVALID_PATH:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
466 467
              # The message no longer exists
              raise ActivityFlushError, (
468
                  'The document %s does not exist' % path)
469 470 471
            else:
              raise ActivityFlushError, (
                  'Could not validate %s on %s' % (m.method_id , path))
472 473

      if len(result):
474
        uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
475
                                                     order_validation_text=None)
476
        if len(uid_list)>0:
477 478
          activity_tool.SQLBase_delMessage(table=self.sql_table,
                                           uid=[x.uid for x in uid_list])
Jean-Paul Smets's avatar
Jean-Paul Smets committed
479

480
  getMessageList = SQLBase.getMessageList
481

482 483 484
  def dumpMessageList(self, activity_tool):
    # Dump all messages in the table.
    message_list = []
485 486 487
    dumpMessageList = getattr(activity_tool, 'SQLDict_dumpMessageList', None)
    if dumpMessageList is not None:
      result = dumpMessageList()
488
      for line in result:
489
        m = self.loadMessage(line.message, uid=line.uid, line=line)
490 491 492
        message_list.append(m)
    return message_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
493
  def distribute(self, activity_tool, node_count):
494
    offset = 0
495 496
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
497 498
      now_date = self.getNow(activity_tool)
      validated_count = 0
Julien Muchembled's avatar
Julien Muchembled committed
499 500 501 502 503 504
      while 1:
        result = readMessageList(path=None, method_id=None, processing_node=-1,
                                 to_date=now_date, include_processing=0,
                                 offset=offset, count=READ_MESSAGE_LIMIT)
        if not result:
          return
505
        transaction.commit()
506 507 508 509

        validation_text_dict = {'none': 1}
        message_dict = {}
        for line in result:
510 511
          message = self.loadMessage(line.message, uid=line.uid, line=line,
            order_validation_text=line.order_validation_text)
512 513
          self.getExecutableMessageList(activity_tool, message, message_dict,
                                        validation_text_dict, now_date=now_date)
514 515

        if message_dict:
516 517 518
          message_unique_dict = {}
          serialization_tag_dict = {}
          distributable_uid_set = set()
519
          deletable_uid_list = []
520

521 522 523
          # remove duplicates
          # SQLDict considers object_path, method_id, tag to unify activities,
          # but ignores method arguments. They are outside of semantics.
524 525 526 527 528 529 530 531 532 533 534 535 536 537
          for message in message_dict.itervalues():
            message_unique_dict.setdefault(self.generateMessageUID(message),
                                           []).append(message)
          for message_list in message_unique_dict.itervalues():
            if len(message_list) > 1:
              # Sort list of duplicates to keep the message with highest score
              message_list.sort(key=sort_message_key)
              deletable_uid_list += [m.uid for m in message_list[1:]]
            message = message_list[0]
            distributable_uid_set.add(message.uid)
            serialization_tag = message.activity_kw.get('serialization_tag')
            if serialization_tag is not None:
              serialization_tag_dict.setdefault(serialization_tag,
                                                []).append(message)
538 539
          # Don't let through if there is the same serialization tag in the
          # message dict. If there is the same serialization tag, only one can
540
          # be validated and others must wait.
541 542
          # But messages with group_method_id are exceptions. serialization_tag
          # does not stop validating together. Because those messages should
543
          # be processed together at once.
544 545 546 547 548 549 550 551 552 553
          for message_list in serialization_tag_dict.itervalues():
            if len(message_list) == 1:
              continue
            # Sort list of messages to validate the message with highest score
            message_list.sort(key=sort_message_key)
            group_method_id = message_list[0].activity_kw.get('group_method_id')
            for message in message_list[1:]:
              if group_method_id is None or \
                 group_method_id != message.activity_kw.get('group_method_id'):
                distributable_uid_set.remove(message.uid)
554
          if deletable_uid_list:
555 556
            activity_tool.SQLBase_delMessage(table=self.sql_table,
                                             uid=deletable_uid_list)
557 558 559 560 561
          distributable_count = len(distributable_uid_set)
          if distributable_count:
            activity_tool.SQLBase_assignMessage(table=self.sql_table,
              processing_node=0, uid=tuple(distributable_uid_set))
            validated_count += distributable_count
Julien Muchembled's avatar
Julien Muchembled committed
562 563 564
            if validated_count >= MAX_VALIDATED_LIMIT:
              return
        offset += READ_MESSAGE_LIMIT
Jean-Paul Smets's avatar
Jean-Paul Smets committed
565

566
  # Validation private methods
567 568
  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
                serialization_tag=None):
569 570 571 572 573 574 575
    if isinstance(method_id, str):
      method_id = [method_id]
    if isinstance(path, str):
      path = [path]
    if isinstance(tag, str):
      tag = [tag]

576
    if method_id or message_uid or path or tag or serialization_tag:
577 578 579 580
      validateMessageList = activity_tool.SQLDict_validateMessageList
      result = validateMessageList(method_id=method_id,
                                   message_uid=message_uid,
                                   path=path,
581
                                   tag=tag,
582
                                   count=False,
583
                                   serialization_tag=serialization_tag)
584 585 586
      message_list = []
      for line in result:
        m = self.loadMessage(line.message,
587
                             line=line,
588 589 590 591 592 593 594 595 596 597 598 599
                             uid=line.uid,
                             order_validation_text=line.order_validation_text,
                             date=line.date,
                             processing_node=line.processing_node)
        message_list.append(m)
      return message_list
    else:
      return []

  def countMessage(self, activity_tool, tag=None, path=None,
                   method_id=None, message_uid=None, **kw):
    """Return the number of messages which match the given parameters.
Sebastien Robin's avatar
Sebastien Robin committed
600
    """
601
    if isinstance(tag, str):
Sebastien Robin's avatar
Sebastien Robin committed
602
      tag = [tag]
603
    if isinstance(path, str):
Sebastien Robin's avatar
Sebastien Robin committed
604
      path = [path]
Jérome Perrin's avatar
bug.  
Jérome Perrin committed
605
    if isinstance(method_id, str):
Sebastien Robin's avatar
Sebastien Robin committed
606 607 608 609
      method_id = [method_id]
    result = activity_tool.SQLDict_validateMessageList(method_id=method_id, 
                                                       path=path,
                                                       message_uid=message_uid, 
610
                                                       tag=tag,
611
                                                       serialization_tag=None,
612
                                                       count=1)
Sebastien Robin's avatar
Sebastien Robin committed
613 614
    return result[0].uid_count

615
  def countMessageWithTag(self, activity_tool, value):
616
    """Return the number of messages which match the given tag.
617
    """
618
    return self.countMessage(activity_tool, tag=value)
619

620
  # Required for tests (time shift)
621
  def timeShift(self, activity_tool, delay, processing_node=None, retry=None):
622 623 624 625
    """
      To simulate timeShift, we simply substract delay from
      all dates in SQLDict message table
    """
626
    activity_tool.SQLDict_timeShift(delay=delay, processing_node=processing_node,retry=retry)
627

628 629 630 631 632
  def getPriority(self, activity_tool):
    method = activity_tool.SQLDict_getPriority
    default =  RAMDict.getPriority(self, activity_tool)
    return self._getPriority(activity_tool, method, default)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
633
registerActivity(SQLDict)