SQLDict.py 18.5 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2
##############################################################################
#
3
# Copyright (c) 2002,2007 Nexedi SA and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
30
from Queue import VALID, INVALID_PATH
Jean-Paul Smets's avatar
Jean-Paul Smets committed
31
from RAMDict import RAMDict
32
from Products.CMFActivity.Errors import ActivityFlushError
33
import sys
34
#from time import time
35
from SQLBase import SQLBase, sort_message_key
Jean-Paul Smets's avatar
Jean-Paul Smets committed
36

37
import transaction
38

39
from zLOG import LOG, TRACE, WARNING, ERROR, INFO, PANIC
Jean-Paul Smets's avatar
Jean-Paul Smets committed
40

41 42
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000 
Yoshinori Okuji's avatar
Yoshinori Okuji committed
43
# Read up to this number of messages to validate.
44
READ_MESSAGE_LIMIT = 1000
Jean-Paul Smets's avatar
Jean-Paul Smets committed
45

46 47
MAX_MESSAGE_LIST_SIZE = 100

48
class SQLDict(RAMDict, SQLBase):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
49 50 51 52 53
  """
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
  """
54
  sql_table = 'message'
55
  merge_duplicate = True
56

Jean-Paul Smets's avatar
Jean-Paul Smets committed
57
  # Transaction commit methods
58
  def prepareQueueMessageList(self, activity_tool, message_list):
59 60 61
    message_list = [m for m in message_list if m.is_registered]
    for i in xrange(0, len(message_list), MAX_MESSAGE_LIST_SIZE):
      registered_message_list = message_list[i:i + MAX_MESSAGE_LIST_SIZE]
62
      #LOG('SQLDict prepareQueueMessageList', 0, 'registered_message_list = %r' % (registered_message_list,))
63
      path_list = ['/'.join(message.object_path) for message in registered_message_list]
64
      active_process_uid_list = [message.active_process_uid for message in registered_message_list]
65 66 67
      method_id_list = [message.method_id for message in registered_message_list]
      priority_list = [message.activity_kw.get('priority', 1) for message in registered_message_list]
      dumped_message_list = [self.dumpMessage(message) for message in registered_message_list]
68
      date_list = [message.activity_kw.get('at_date', None) for message in registered_message_list]
69 70 71 72 73 74 75
      group_method_id_list = []
      for m in registered_message_list:
        group_method_id = m.activity_kw.get('group_method_id', '')
        if group_method_id is None:
          group_method_id = 'portal_activities/dummyGroupMethod/' + m.method_id
        group_method_id_list.append(group_method_id + '\0' +
                                    m.activity_kw.get('group_id', ''))
76
      tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list]
77
      serialization_tag_list = [message.activity_kw.get('serialization_tag', '') for message in registered_message_list]
78
      order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list]
79 80 81 82
      # The uid_list also is store in the ZODB
      uid_list = activity_tool.getPortalObject().portal_ids.\
                                           generateNewIdList(id_generator='uid', id_group='portal_activity',
                                           id_count=len(registered_message_list))
83 84
      activity_tool.SQLDict_writeMessageList( uid_list = uid_list,
                                              path_list = path_list,
85
                                              active_process_uid_list=active_process_uid_list,
86 87 88
                                              method_id_list = method_id_list,
                                              priority_list = priority_list,
                                              message_list = dumped_message_list,
89
                                              date_list = date_list,
90
                                              group_method_id_list = group_method_id_list,
91
                                              tag_list = tag_list,
92
                                              serialization_tag_list = serialization_tag_list,
93
                                              processing_node_list=None,
94
                                              order_validation_text_list = order_validation_text_list)
95

96 97
  def prepareDeleteMessage(self, activity_tool, m):
    # Erase all messages in a single transaction
Jean-Paul Smets's avatar
Jean-Paul Smets committed
98
    path = '/'.join(m.object_path)
99 100
    order_validation_text = self.getOrderValidationText(m)
    uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = m.method_id,
101
                                                 order_validation_text = order_validation_text)
102
    uid_list = [x.uid for x in uid_list]
103
    if len(uid_list)>0:
104
      activity_tool.SQLBase_delMessage(table=self.sql_table, uid=uid_list)
105

106 107 108 109 110 111 112 113
  def finishQueueMessage(self, activity_tool_path, m):
    # Nothing to do in SQLDict.
    pass

  def finishDeleteMessage(self, activity_tool_path, m):
    # Nothing to do in SQLDict.
    pass

114
  # Registration management
Jean-Paul Smets's avatar
Jean-Paul Smets committed
115
  def registerActivityBuffer(self, activity_buffer):
116
    pass
117

118 119
  def generateMessageUID(self, m):
    return (tuple(m.object_path), m.method_id, m.activity_kw.get('tag'), m.activity_kw.get('group_id'))
120

Jean-Paul Smets's avatar
Jean-Paul Smets committed
121 122
  def unregisterMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 0 # This prevents from inserting deleted messages into the queue
123
    class_name = self.__class__.__name__
124
    uid_set = activity_buffer.getUidSet(self)
125
    uid_set.discard(self.generateMessageUID(m))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
126 127

  def getRegisteredMessageList(self, activity_buffer, activity_tool):
128 129
    message_list = activity_buffer.getMessageList(self)
    return [m for m in message_list if m.is_registered]
130

131
  def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
132
    """
133 134
      Reserve unreserved messages matching given line.
      Return their uids.
135
    """
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
    try:
      result = activity_tool.SQLDict_selectDuplicatedLineList(
        path=line.path,
        method_id=line.method_id,
        group_method_id=line.group_method_id,
        order_validation_text=line.order_validation_text
      )
      uid_list = [x.uid for x in result]
      if len(uid_list):
        activity_tool.SQLDict_reserveDuplicatedLineList(
          processing_node=processing_node,
          uid_list=uid_list
        )
      else:
        # Release locks
        activity_tool.SQLDict_commit()
    except:
      # Log
      LOG('SQLDict', WARNING, 'getDuplicateMessageUidList got an exception', error=sys.exc_info())
      # Release lock
      activity_tool.SQLDict_rollback()
      # And re-raise
      raise
    return uid_list
160

161
  dequeueMessage = SQLBase.dequeueMessage
Jean-Paul Smets's avatar
Jean-Paul Smets committed
162

163
  def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
164 165
    hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
    if hasMessage is not None:
166 167 168 169
      if object is None:
        my_object_path = None
      else:
        my_object_path = '/'.join(object.getPhysicalPath())
170 171 172
      result = hasMessage(path=my_object_path, method_id=method_id, only_valid=only_valid, active_process_uid=active_process_uid)
      if len(result) > 0:
        return result[0].message_count > 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
173 174
    return 0

Jean-Paul Smets's avatar
Jean-Paul Smets committed
175
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
176 177 178 179
    """
      object_path is a tuple
    """
    path = '/'.join(object_path)
180
    # LOG('Flush', 0, str((path, invoke, method_id)))
181
    method_dict = {}
182 183
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
184 185
      # Parse each message in registered
      for m in activity_tool.getRegisteredMessageList(self):
186
        if m.object_path == object_path and (method_id is None or method_id == m.method_id):
187
          #if not method_dict.has_key(method_id or m.method_id):
188 189
          if not method_dict.has_key(m.method_id):
            method_dict[m.method_id] = 1 # Prevents calling invoke twice
190
            if invoke:
191 192
              # First Validate
              validate_value = m.validate(self, activity_tool)
193
              if validate_value is VALID:
194
                activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
195
                if m.getExecutionState() != MESSAGE_EXECUTED:                                                 # Make sure message could be invoked
196 197
                  # The message no longer exists
                  raise ActivityFlushError, (
198
                      'Could not evaluate %s on %s' % (m.method_id , path))
199
              elif validate_value is INVALID_PATH:
200 201
                # The message no longer exists
                raise ActivityFlushError, (
202
                    'The document %s does not exist' % path)
203 204 205 206
              else:
                raise ActivityFlushError, (
                    'Could not validate %s on %s' % (m.method_id , path))
          activity_tool.unregisterMessage(self, m)
207
      # Parse each message in SQL dict
208
      result = readMessageList(path=path, method_id=method_id,
209
                               processing_node=None,include_processing=0, to_date=None)
210 211
      for line in result:
        path = line.path
212 213
        line_method_id = line.method_id
        if not method_dict.has_key(line_method_id):
214
          # Only invoke once (it would be different for a queue)
215 216
          # This is optimisation with the goal to process objects on the same
          # node and minimize network traffic with ZEO server
217
          method_dict[line_method_id] = 1
218
          m = self.loadMessage(line.message, uid=line.uid, line=line)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
219
          if invoke:
220 221 222 223 224
            # First Validate (only if message is marked as new)
            if line.processing_node == -1:
              validate_value = m.validate(self, activity_tool)
            else:
              validate_value = VALID
Romain Courteaud's avatar
Romain Courteaud committed
225
#             LOG('SQLDict.flush validate_value',0,validate_value)
226
            if validate_value is VALID:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
227
              activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
228
              if m.getExecutionState() != MESSAGE_EXECUTED:                                                 # Make sure message could be invoked
Jean-Paul Smets's avatar
Jean-Paul Smets committed
229 230
                # The message no longer exists
                raise ActivityFlushError, (
231
                    'Could not evaluate %s on %s' % (m.method_id , path))
232
            elif validate_value is INVALID_PATH:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
233 234
              # The message no longer exists
              raise ActivityFlushError, (
235
                  'The document %s does not exist' % path)
236 237 238
            else:
              raise ActivityFlushError, (
                  'Could not validate %s on %s' % (m.method_id , path))
239 240

      if len(result):
241
        uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
242
                                                     order_validation_text=None)
243
        if len(uid_list)>0:
244 245
          activity_tool.SQLBase_delMessage(table=self.sql_table,
                                           uid=[x.uid for x in uid_list])
Jean-Paul Smets's avatar
Jean-Paul Smets committed
246

247
  getMessageList = SQLBase.getMessageList
248

249 250 251
  def dumpMessageList(self, activity_tool):
    # Dump all messages in the table.
    message_list = []
252 253 254
    dumpMessageList = getattr(activity_tool, 'SQLDict_dumpMessageList', None)
    if dumpMessageList is not None:
      result = dumpMessageList()
255
      for line in result:
256
        m = self.loadMessage(line.message, uid=line.uid, line=line)
257 258 259
        message_list.append(m)
    return message_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
260
  def distribute(self, activity_tool, node_count):
261
    offset = 0
262 263
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
264 265
      now_date = self.getNow(activity_tool)
      validated_count = 0
Julien Muchembled's avatar
Julien Muchembled committed
266 267 268 269 270 271
      while 1:
        result = readMessageList(path=None, method_id=None, processing_node=-1,
                                 to_date=now_date, include_processing=0,
                                 offset=offset, count=READ_MESSAGE_LIMIT)
        if not result:
          return
272
        transaction.commit()
273 274 275 276

        validation_text_dict = {'none': 1}
        message_dict = {}
        for line in result:
277 278
          message = self.loadMessage(line.message, uid=line.uid, line=line,
            order_validation_text=line.order_validation_text)
279 280
          self.getExecutableMessageList(activity_tool, message, message_dict,
                                        validation_text_dict, now_date=now_date)
281 282

        if message_dict:
283 284 285
          message_unique_dict = {}
          serialization_tag_dict = {}
          distributable_uid_set = set()
286
          deletable_uid_list = []
287

288 289 290
          # remove duplicates
          # SQLDict considers object_path, method_id, tag to unify activities,
          # but ignores method arguments. They are outside of semantics.
291 292 293 294 295 296 297 298 299 300
          for message in message_dict.itervalues():
            message_unique_dict.setdefault(self.generateMessageUID(message),
                                           []).append(message)
          for message_list in message_unique_dict.itervalues():
            if len(message_list) > 1:
              # Sort list of duplicates to keep the message with highest score
              message_list.sort(key=sort_message_key)
              deletable_uid_list += [m.uid for m in message_list[1:]]
            message = message_list[0]
            serialization_tag = message.activity_kw.get('serialization_tag')
301 302 303
            if serialization_tag is None:
              distributable_uid_set.add(message.uid)
            else:
304 305
              serialization_tag_dict.setdefault(serialization_tag,
                                                []).append(message)
306 307
          # Don't let through if there is the same serialization tag in the
          # message dict. If there is the same serialization tag, only one can
308
          # be validated and others must wait.
309 310
          # But messages with group_method_id are exceptions. serialization_tag
          # does not stop validating together. Because those messages should
311
          # be processed together at once.
312 313 314
          for message_list in serialization_tag_dict.itervalues():
            # Sort list of messages to validate the message with highest score
            message_list.sort(key=sort_message_key)
315
            distributable_uid_set.add(message_list[0].uid)
316 317
            group_method_id = message_list[0].line.group_method_id
            if group_method_id == '\0':
318
              continue
319
            for message in message_list[1:]:
320
              if group_method_id == message.line.group_method_id:
321
                distributable_uid_set.add(message.uid)
322
          if deletable_uid_list:
323 324
            activity_tool.SQLBase_delMessage(table=self.sql_table,
                                             uid=deletable_uid_list)
325 326 327 328 329
          distributable_count = len(distributable_uid_set)
          if distributable_count:
            activity_tool.SQLBase_assignMessage(table=self.sql_table,
              processing_node=0, uid=tuple(distributable_uid_set))
            validated_count += distributable_count
Julien Muchembled's avatar
Julien Muchembled committed
330 331 332
            if validated_count >= MAX_VALIDATED_LIMIT:
              return
        offset += READ_MESSAGE_LIMIT
Jean-Paul Smets's avatar
Jean-Paul Smets committed
333

334
  # Validation private methods
335 336
  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
                serialization_tag=None):
337 338 339 340 341 342 343
    if isinstance(method_id, str):
      method_id = [method_id]
    if isinstance(path, str):
      path = [path]
    if isinstance(tag, str):
      tag = [tag]

344
    if method_id or message_uid or path or tag or serialization_tag:
345 346 347 348
      validateMessageList = activity_tool.SQLDict_validateMessageList
      result = validateMessageList(method_id=method_id,
                                   message_uid=message_uid,
                                   path=path,
349
                                   tag=tag,
350
                                   count=False,
351
                                   serialization_tag=serialization_tag)
352 353 354
      message_list = []
      for line in result:
        m = self.loadMessage(line.message,
355
                             line=line,
356 357
                             uid=line.uid,
                             date=line.date,
358 359
                             processing_node=line.processing_node,
                             order_validation_text=line.order_validation_text)
360 361 362 363 364 365 366 367
        message_list.append(m)
      return message_list
    else:
      return []

  def countMessage(self, activity_tool, tag=None, path=None,
                   method_id=None, message_uid=None, **kw):
    """Return the number of messages which match the given parameters.
Sebastien Robin's avatar
Sebastien Robin committed
368
    """
369
    if isinstance(tag, str):
Sebastien Robin's avatar
Sebastien Robin committed
370
      tag = [tag]
371
    if isinstance(path, str):
Sebastien Robin's avatar
Sebastien Robin committed
372
      path = [path]
Jérome Perrin's avatar
bug.  
Jérome Perrin committed
373
    if isinstance(method_id, str):
Sebastien Robin's avatar
Sebastien Robin committed
374 375 376 377
      method_id = [method_id]
    result = activity_tool.SQLDict_validateMessageList(method_id=method_id, 
                                                       path=path,
                                                       message_uid=message_uid, 
378
                                                       tag=tag,
379
                                                       serialization_tag=None,
380
                                                       count=1)
Sebastien Robin's avatar
Sebastien Robin committed
381 382
    return result[0].uid_count

383
  def countMessageWithTag(self, activity_tool, value):
384
    """Return the number of messages which match the given tag.
385
    """
386
    return self.countMessage(activity_tool, tag=value)
387

388
  # Required for tests (time shift)
389
  def timeShift(self, activity_tool, delay, processing_node=None, retry=None):
390 391 392 393
    """
      To simulate timeShift, we simply substract delay from
      all dates in SQLDict message table
    """
394
    activity_tool.SQLDict_timeShift(delay=delay, processing_node=processing_node,retry=retry)
395

396 397 398 399 400
  def getPriority(self, activity_tool):
    method = activity_tool.SQLDict_getPriority
    default =  RAMDict.getPriority(self, activity_tool)
    return self._getPriority(activity_tool, method, default)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
401
registerActivity(SQLDict)