SQLDict.py 13.1 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
import random
Jean-Paul Smets's avatar
Jean-Paul Smets committed
30 31
from Products.CMFActivity.ActivityTool import registerActivity
from RAMDict import RAMDict
32
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
Jean-Paul Smets's avatar
Jean-Paul Smets committed
33 34 35

from zLOG import LOG

36
MAX_PRIORITY = 5
Jean-Paul Smets's avatar
Jean-Paul Smets committed
37

38 39 40 41 42 43 44
priority_weight = \
  [1] * 64 + \
  [2] * 20 + \
  [3] * 10 + \
  [4] * 5 + \
  [5] * 1

45 46 47
class ActivityFlushError(Exception):
    """Error during active message flush"""

Jean-Paul Smets's avatar
Jean-Paul Smets committed
48 49 50 51 52 53
class SQLDict(RAMDict):
  """
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
  """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
54
  # Transaction commit methods
55
  def prepareQueueMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
56 57 58 59 60 61
    if m.is_registered:
      activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) ,
                                          method_id = m.method_id,
                                          priority = m.activity_kw.get('priority', 1),
                                          message = self.dumpMessage(m))
                                          # Also store uid of activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
62

63 64
  def prepareDeleteMessage(self, activity_tool, m):
    # Erase all messages in a single transaction
Jean-Paul Smets's avatar
Jean-Paul Smets committed
65 66
    path = '/'.join(m.object_path)
    uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=m.method_id,processing_node=None)
67
    uid_list = map(lambda x:x.uid, uid_list)
68 69
    if len(uid_list)>0:
      activity_tool.SQLDict_delMessage(uid = uid_list) 
70
    
Jean-Paul Smets's avatar
Jean-Paul Smets committed
71 72
  # Registration management    
  def registerActivityBuffer(self, activity_buffer):
73 74 75
    class_name = self.__class__.__name__
    setattr(activity_buffer, '_%s_uid_dict' % class_name, {})  
    setattr(activity_buffer, '_%s_message_list' % class_name, [])  
Jean-Paul Smets's avatar
Jean-Paul Smets committed
76
            
Jean-Paul Smets's avatar
Jean-Paul Smets committed
77
  def isMessageRegistered(self, activity_buffer, activity_tool, m):
78 79 80
    class_name = self.__class__.__name__
    uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
    return uid_dict.has_key((tuple(m.object_path), m.method_id))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
81 82 83
          
  def registerMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 1
84 85 86 87
    class_name = self.__class__.__name__
    uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
    uid_dict[(tuple(m.object_path), m.method_id)] = 1
    getattr(activity_buffer,'_%s_message_list' % class_name).append(m)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
88 89 90
          
  def unregisterMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 0 # This prevents from inserting deleted messages into the queue
91 92 93 94
    class_name = self.__class__.__name__
    uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
    if uid_dict.has_key((tuple(m.object_path), m.method_id)):
      del uid_dict[(tuple(m.object_path), m.method_id)]
Jean-Paul Smets's avatar
Jean-Paul Smets committed
95 96

  def getRegisteredMessageList(self, activity_buffer, activity_tool):
97 98 99 100
    class_name = self.__class__.__name__
    if hasattr(activity_buffer,'_%s_message_list' % class_name):
      message_list = getattr(activity_buffer,'_%s_message_list' % class_name)
      return filter(lambda m: m.is_registered, message_list)
101 102
    else:
      return ()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
103 104
                
  # Queue semantic
Jean-Paul Smets's avatar
Jean-Paul Smets committed
105
  def dequeueMessage(self, activity_tool, processing_node):
106 107 108
    if hasattr(activity_tool,'SQLDict_readMessageList'):
      priority = random.choice(priority_weight)
      # Try to find a message at given priority level
Jean-Paul Smets's avatar
Jean-Paul Smets committed
109
      result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority)
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
      if len(result) == 0:
        # If empty, take any message
        priority = None
        result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority)
      if len(result) > 0:
        line = result[0]
        path = line.path
        method_id = line.method_id
        uid_list = activity_tool.SQLDict_readUidList( path=path, method_id= method_id, processing_node = None )
        uid_list = map(lambda x:x.uid, uid_list)
        # Make sure message can not be processed anylonger
        if len(uid_list) > 0:
          activity_tool.SQLDict_processMessage(uid = uid_list)
        get_transaction().commit() # Release locks before starting a potentially long calculation
        # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
        m = self.loadMessage(line.message, uid = line.uid)
        # Make sure object exists
        if not m.validate(self, activity_tool):
128 129
          if line.priority > MAX_PRIORITY:
            # This is an error
Jean-Paul Smets's avatar
Jean-Paul Smets committed
130
            if len(uid_list) > 0:
131
              activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE)
132
                                                                              # Assign message back to 'error' state
133
            #m.notifyUser(activity_tool)                                       # Notify Error
134 135 136
            get_transaction().commit()                                        # and commit
          else:
            # Lower priority
Jean-Paul Smets's avatar
Jean-Paul Smets committed
137 138
            if len(uid_list) > 0:
              activity_tool.SQLDict_setPriority(uid = uid_list,
139
                                              priority = line.priority + 1)
140
            get_transaction().commit() # Release locks before starting a potentially long calculation
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
        else:
          # Try to invoke
          activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
          if m.is_executed:                                          # Make sure message could be invoked
            if len(uid_list) > 0:
              activity_tool.SQLDict_delMessage(uid = uid_list)                # Delete it
            get_transaction().commit()                                        # If successful, commit
            if m.active_process:
              active_process = activity_tool.unrestrictedTraverse(m.active_process)
              if not active_process.hasActivity():
                # Not more activity
                m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
          else:
            get_transaction().abort()                                         # If not, abort transaction and start a new one
            if line.priority > MAX_PRIORITY:
              # This is an error
              if len(uid_list) > 0:
                activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
                                                                                # Assign message back to 'error' state
              m.notifyUser(activity_tool)                                       # Notify Error
              get_transaction().commit()                                        # and commit
            else:
              # Lower priority
              if len(uid_list) > 0:
                activity_tool.SQLDict_setPriority(uid = uid_list,
                                                  priority = line.priority + 1)
              get_transaction().commit() # Release locks before starting a potentially long calculation
        return 0
      get_transaction().commit() # Release locks before starting a potentially long calculation
Jean-Paul Smets's avatar
Jean-Paul Smets committed
170 171
    return 1

172
  def hasActivity(self, activity_tool, object, **kw):
173 174 175 176 177 178 179 180
    if hasattr(activity_tool,'SQLDict_readMessageList'):
      if object is not None:
        my_object_path = '/'.join(object.getPhysicalPath())
        result = activity_tool.SQLDict_hasMessage(path=my_object_path, **kw)
        if len(result) > 0:
          return result[0].message_count > 0
      else:
        return 1 # Default behaviour if no object specified is to return 1 until active_process implemented
Jean-Paul Smets's avatar
Jean-Paul Smets committed
181 182
    return 0

Jean-Paul Smets's avatar
Jean-Paul Smets committed
183
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
184 185
    """
      object_path is a tuple
Jean-Paul Smets's avatar
Jean-Paul Smets committed
186 187 188 189 190 191

      commit allows to choose mode
        - if we commit, then we make sure no locks are taken for too long
        - if we do not commit, then we can use flush in a larger transaction

      commit should in general not be used
192 193

      NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
Jean-Paul Smets's avatar
Jean-Paul Smets committed
194 195
    """
    path = '/'.join(object_path)
196
    # LOG('Flush', 0, str((path, invoke, method_id)))
197
    method_dict = {}
198 199 200 201 202
    if hasattr(activity_tool,'SQLDict_readMessageList'):
      # Parse each message in registered
      for m in activity_tool.getRegisteredMessageList(self):
        if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id):
          activity_tool.unregisterMessage(self, m)
203 204 205
          #if not method_dict.has_key(method_id or m.method_id):
          if not method_dict.has_key((tuple(object_path),method_id or m.method_id)):
            method_dict[(tuple(object_path),method_id or m.method_id)] = 1 # Prevents calling invoke twice
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
            if invoke:
              # First Validate
              if m.validate(self, activity_tool):
                activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
                if not m.is_executed:                                                 # Make sure message could be invoked
                  # The message no longer exists
                  raise ActivityFlushError, (
                      'Could not evaluate %s on %s' % (method_id , path))
              else:
                # The message no longer exists
                raise ActivityFlushError, (
                    'The document %s does not exist' % path)               
      # Parse each message in SQL dict
      result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None)
      for line in result:
        path = line.path
        method_id = line.method_id
Jean-Paul Smets's avatar
Jean-Paul Smets committed
223
        if not method_dict.has_key(method_id):
224 225 226 227
          # Only invoke once (it would be different for a queue)
          method_dict[method_id] = 1
          m = self.loadMessage(line.message, uid = line.uid)
          self.deleteMessage(activity_tool, m)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
228 229 230 231 232 233 234 235 236 237 238
          if invoke:
            # First Validate
            if m.validate(self, activity_tool):
              activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
              if not m.is_executed:                                                 # Make sure message could be invoked
                # The message no longer exists
                raise ActivityFlushError, (
                    'Could not evaluate %s on %s' % (method_id , path))
            else:
              # The message no longer exists
              raise ActivityFlushError, (
239
                  'The document %s does not exist' % path)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
240

Jean-Paul Smets's avatar
Jean-Paul Smets committed
241
  def getMessageList(self, activity_tool, processing_node=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
242
    # YO: reading all lines might cause a deadlock
Jean-Paul Smets's avatar
Jean-Paul Smets committed
243
    message_list = []
244 245 246 247 248 249 250
    if hasattr(activity_tool,'SQLDict_readMessageList'):
      result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None)
      for line in result:
        m = self.loadMessage(line.message, uid = line.uid)
        m.processing_node = line.processing_node
        m.priority = line.priority
        message_list.append(m)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
251 252
    return message_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
253 254
  def distribute(self, activity_tool, node_count):
    processing_node = 1
255 256 257 258 259 260 261 262 263 264 265 266 267 268
    if hasattr(activity_tool,'SQLDict_readMessageList'):
      result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages
      get_transaction().commit() # Release locks before starting a potentially long calculation
      path_dict = {}
      for line in result:
        path = line.path
        if not path_dict.has_key(path):
          # Only assign once (it would be different for a queue)
          path_dict[path] = 1
          activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None)
          get_transaction().commit() # Release locks immediately to allow processing of messages
          processing_node = processing_node + 1
          if processing_node > node_count:
            processing_node = 1 # Round robin
Jean-Paul Smets's avatar
Jean-Paul Smets committed
269

Jean-Paul Smets's avatar
Jean-Paul Smets committed
270
registerActivity(SQLDict)