SQLQueue.py 12.9 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2
##############################################################################
#
3
# Copyright (c) 2002,2007 Nexedi SA and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
30
from Queue import VALID, INVALID_PATH
31
from Products.CMFActivity.Errors import ActivityFlushError
Romain Courteaud's avatar
Romain Courteaud committed
32
from ZODB.POSException import ConflictError
33
from SQLBase import SQLBase, sort_message_key
34
from zExceptions import ExceptionFormatter
Jean-Paul Smets's avatar
Jean-Paul Smets committed
35

36
import transaction
37

38
from zLOG import LOG, WARNING, ERROR, INFO, PANIC, TRACE
Jean-Paul Smets's avatar
Jean-Paul Smets committed
39

40 41 42 43 44
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000

45 46
MAX_MESSAGE_LIST_SIZE = 100

47
class SQLQueue(SQLBase):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
48
  """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
49 50 51
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
52
  """
53
  sql_table = 'message_queue'
54
  merge_duplicate = False
55 56

  def prepareQueueMessageList(self, activity_tool, message_list):
57 58 59
    registered_message_list = [m for m in message_list if m.is_registered]
    for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE):
      message_list = registered_message_list[i:i + MAX_MESSAGE_LIST_SIZE]
60 61
      # The uid_list also is store in the ZODB
      uid_list = activity_tool.getPortalObject().portal_ids.generateNewIdList(
62 63 64 65 66 67 68 69 70 71 72
        id_generator='uid', id_group='portal_activity_queue',
        id_count=len(message_list))
      path_list = ['/'.join(m.object_path) for m in message_list]
      active_process_uid_list = [m.active_process_uid for m in message_list]
      method_id_list = [m.method_id for m in message_list]
      priority_list = [m.activity_kw.get('priority', 1) for m in message_list]
      date_list = [m.activity_kw.get('at_date') for m in message_list]
      group_method_id_list = [m.getGroupId() for m in message_list]
      tag_list = [m.activity_kw.get('tag', '') for m in message_list]
      serialization_tag_list = [m.activity_kw.get('serialization_tag', '')
                                for m in message_list]
73 74 75 76
      processing_node_list = []
      for m in message_list:
        m.order_validation_text = x = self.getOrderValidationText(m)
        processing_node_list.append(0 if x == 'none' else -1)
77 78 79 80 81 82 83 84 85 86 87
      dumped_message_list = map(self.dumpMessage, message_list)
      activity_tool.SQLQueue_writeMessageList(
        uid_list=uid_list,
        path_list=path_list,
        active_process_uid_list=active_process_uid_list,
        method_id_list=method_id_list,
        priority_list=priority_list,
        message_list=dumped_message_list,
        group_method_id_list=group_method_id_list,
        date_list=date_list,
        tag_list=tag_list,
88
        processing_node_list=processing_node_list,
89
        serialization_tag_list=serialization_tag_list)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
90

91
  def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
92 93
    hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
    if hasMessage is not None:
94 95 96 97
      if object is None:
        my_object_path = None
      else:
        my_object_path = '/'.join(object.getPhysicalPath())
98 99 100
      result = hasMessage(path=my_object_path, method_id=method_id, only_valid=only_valid, active_process_uid=active_process_uid)
      if len(result) > 0:
        return result[0].message_count > 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
101
    return 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
102

103
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
104 105 106
    """
      object_path is a tuple
    """
107 108
    delMessage = getattr(activity_tool, 'SQLBase_delMessage', None)
    if delMessage is not None:
109 110 111 112 113 114
      #return # Do nothing here to precent overlocking
      path = '/'.join(object_path)
      # Parse each message in registered
      for m in activity_tool.getRegisteredMessageList(self):
        if object_path == m.object_path and (method_id is None or method_id == m.method_id):
          if invoke:
115 116
            # First Validate
            validate_value = m.validate(self, activity_tool)
117
            if validate_value is VALID:
118
              activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
119
              if m.getExecutionState() != MESSAGE_EXECUTED:                                                 # Make sure message could be invoked
120 121
                # The message no longer exists
                raise ActivityFlushError, (
122 123
                    'Could not evaluate %s on %s' % (m.method_id , path))
            elif validate_value is INVALID_PATH:
124 125
              # The message no longer exists
              raise ActivityFlushError, (
126
                  'The document %s does not exist' % path)
127 128 129 130 131
            else:
              raise ActivityFlushError, (
                  'Could not validate %s on %s' % (m.method_id , path))
          activity_tool.unregisterMessage(self, m)
      # Parse each message in SQL queue
132 133
      result = self._getMessageList(activity_tool, processing=0, path=path,
        **({'method_id': method_id} if method_id else {}))
134 135 136
      for line in result:
        path = line.path
        method_id = line.method_id
137
        m = self.loadMessage(line.message, uid=line.uid, line=line)
138
        if invoke:
139 140 141 142 143
          # First Validate (only if message is marked as new)
          if line.processing_node == -1:
            validate_value = m.validate(self, activity_tool)
          else:
            validate_value = VALID
144 145
          if validate_value is VALID:
            activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
146
            if m.getExecutionState() != MESSAGE_EXECUTED:                                                 # Make sure message could be invoked
147 148 149 150 151 152 153 154 155 156
              # The message no longer exists
              raise ActivityFlushError, (
                  'Could not evaluate %s on %s' % (method_id , path))
          elif validate_value is INVALID_PATH:
            # The message no longer exists
            raise ActivityFlushError, (
                'The document %s does not exist' % path)
          else:
            raise ActivityFlushError, (
                'Could not validate %s on %s' % (m.method_id , path))
157 158
      if result:
        delMessage(table=self.sql_table, uid=[line.uid for line in result])
Jean-Paul Smets's avatar
Jean-Paul Smets committed
159

160 161 162
  def countMessage(self, activity_tool, tag=None, path=None,
                   method_id=None, message_uid=None, **kw):
    """Return the number of messages which match the given parameters.
Sebastien Robin's avatar
Sebastien Robin committed
163
    """
164
    if isinstance(tag, str):
Sebastien Robin's avatar
Sebastien Robin committed
165
      tag = [tag]
166
    if isinstance(path, str):
Sebastien Robin's avatar
Sebastien Robin committed
167
      path = [path]
168
    if isinstance(method_id, str):
Sebastien Robin's avatar
Sebastien Robin committed
169 170
      method_id = [method_id]
    result = activity_tool.SQLQueue_validateMessageList(method_id=method_id, 
171 172 173
                                                        path=path,
                                                        message_uid=message_uid, 
                                                        tag=tag,
174
                                                        serialization_tag=None,
175
                                                        count=1)
Sebastien Robin's avatar
Sebastien Robin committed
176 177 178
    return result[0].uid_count

  def countMessageWithTag(self, activity_tool, value):
179
    """Return the number of messages which match the given tag.
Sebastien Robin's avatar
Sebastien Robin committed
180
    """
181
    return self.countMessage(activity_tool, tag=value)
Sebastien Robin's avatar
Sebastien Robin committed
182

183 184 185
  def dumpMessageList(self, activity_tool):
    # Dump all messages in the table.
    message_list = []
186 187 188
    dumpMessageList = getattr(activity_tool, 'SQLQueue_dumpMessageList', None)
    if dumpMessageList is not None:
      result = dumpMessageList()
189
      for line in result:
190
        m = self.loadMessage(line.message, uid=line.uid, line=line)
191 192
        message_list.append(m)
    return message_list
193

194
  def distribute(self, activity_tool, node_count):
195
    offset = 0
196 197
    assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None)
    if assignMessage is not None:
198 199
      now_date = self.getNow(activity_tool)
      validated_count = 0
Julien Muchembled's avatar
Julien Muchembled committed
200
      while 1:
201 202 203
        result = self._getMessageList(activity_tool, processing_node=-1,
                                      to_date=now_date, processing=0,
                                      offset=offset, count=READ_MESSAGE_LIMIT)
Julien Muchembled's avatar
Julien Muchembled committed
204 205
        if not result:
          return
206
        transaction.commit()
207

208 209 210
        validation_text_dict = {'none': 1}
        message_dict = {}
        for line in result:
Julien Muchembled's avatar
Julien Muchembled committed
211
          message = self.loadMessage(line.message, uid=line.uid, line=line)
212 213
          if not hasattr(message, 'order_validation_text'): # BBB
            message.order_validation_text = self.getOrderValidationText(message)
214 215
          self.getExecutableMessageList(activity_tool, message, message_dict,
                                        validation_text_dict, now_date=now_date)
216 217 218 219 220 221 222 223 224 225 226 227 228 229
        if message_dict:
          distributable_uid_set = set()
          serialization_tag_dict = {}
          for message in message_dict.itervalues():
            serialization_tag = message.activity_kw.get('serialization_tag')
            if serialization_tag is None:
              distributable_uid_set.add(message.uid)
            else:
              serialization_tag_dict.setdefault(serialization_tag,
                                                []).append(message)
          for message_list in serialization_tag_dict.itervalues():
            # Sort list of messages to validate the message with highest score
            message_list.sort(key=sort_message_key)
            distributable_uid_set.add(message_list[0].uid)
230 231
            group_method_id = message_list[0].line.group_method_id
            if group_method_id == '\0':
232 233
              continue
            for message in message_list[1:]:
234
              if group_method_id == message.line.group_method_id:
235
                distributable_uid_set.add(message.uid)
236 237
          distributable_count = len(distributable_uid_set)
          if distributable_count:
238
            assignMessage(table=self.sql_table,
239 240 241 242
              processing_node=0, uid=tuple(distributable_uid_set))
            validated_count += distributable_count
            if validated_count >= MAX_VALIDATED_LIMIT:
              return
Julien Muchembled's avatar
Julien Muchembled committed
243
        offset += READ_MESSAGE_LIMIT
244

245
  # Validation private methods
246 247
  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
                serialization_tag=None):
248 249 250 251 252 253 254
    if isinstance(method_id, str):
      method_id = [method_id]
    if isinstance(path, str):
      path = [path]
    if isinstance(tag, str):
      tag = [tag]

255
    if method_id or message_uid or path or tag or serialization_tag:
256 257 258 259
      validateMessageList = activity_tool.SQLQueue_validateMessageList
      result = validateMessageList(method_id=method_id,
                                   message_uid=message_uid,
                                   path=path,
260
                                   tag=tag,
261
                                   count=False,
262
                                   serialization_tag=serialization_tag)
263 264 265
      message_list = []
      for line in result:
        m = self.loadMessage(line.message,
266
                             line=line,
267 268 269
                             uid=line.uid,
                             date=line.date,
                             processing_node=line.processing_node)
270 271
        if not hasattr(m, 'order_validation_text'): # BBB
          m.order_validation_text = self.getOrderValidationText(m)
272 273 274 275 276
        message_list.append(m)
      return message_list
    else:
      return []

277
  # Required for tests (time shift)
278
  def timeShift(self, activity_tool, delay, processing_node = None):
279 280
    """
      To simulate timeShift, we simply substract delay from
Vincent Pelletier's avatar
Vincent Pelletier committed
281
      all dates in SQLQueue message table
282
    """
283
    activity_tool.SQLQueue_timeShift(delay=delay, processing_node=processing_node)
284

285 286
  def getPriority(self, activity_tool):
    method = activity_tool.SQLQueue_getPriority
287
    default =  SQLBase.getPriority(self, activity_tool)
288 289
    return self._getPriority(activity_tool, method, default)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
290
registerActivity(SQLQueue)