SQLDict.py 8.34 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
import random
Jean-Paul Smets's avatar
Jean-Paul Smets committed
30 31 32 33 34
from Products.CMFActivity.ActivityTool import registerActivity
from RAMDict import RAMDict

from zLOG import LOG

35
MAX_RETRY = 5
Jean-Paul Smets's avatar
Jean-Paul Smets committed
36

Jean-Paul Smets's avatar
Jean-Paul Smets committed
37 38 39 40
DISTRIBUTABLE_STATE = -1
INVOKE_ERROR_STATE = -2
VALIDATE_ERROR_STATE = -3

41 42 43 44 45 46 47
priority_weight = \
  [1] * 64 + \
  [2] * 20 + \
  [3] * 10 + \
  [4] * 5 + \
  [5] * 1

Jean-Paul Smets's avatar
Jean-Paul Smets committed
48 49 50 51 52 53 54 55
class SQLDict(RAMDict):
  """
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
  """

  def queueMessage(self, activity_tool, m):
56 57 58 59
    activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) ,
                                       method_id = m.method_id,
                                       priority = m.activity_kw.get('priority', 1),
                                       message = self.dumpMessage(m))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
60

Jean-Paul Smets's avatar
Jean-Paul Smets committed
61
  def dequeueMessage(self, activity_tool, processing_node):
62 63 64 65 66 67
    priority = random.choice(priority_weight)
    # Try to find a message at given priority level
    result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority)
    if len(result) == 0:
      # If empty, take any message
      result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=None)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
68 69 70 71
    if len(result) > 0:
      line = result[0]
      path = line.path
      method_id = line.method_id
72
      # Make sure message can not be processed anylonger
Jean-Paul Smets's avatar
Jean-Paul Smets committed
73
      activity_tool.SQLDict_processMessage(path=path, method_id=method_id, processing_node = processing_node)
74
      get_transaction().commit() # Release locks before starting a potentially long calculation
Jean-Paul Smets's avatar
Jean-Paul Smets committed
75
      m = self.loadMessage(line.message)
76 77 78 79
      retry = 0
      while retry < MAX_RETRY:
        if m.validate(self, activity_tool): # We should validate each time XXX in case someone is deleting it at the same time
          valid = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
80 81 82 83 84 85
          activity_tool.invoke(m) # Try to invoke the message
          if m.is_executed:
            retry=MAX_RETRY
          else:
            get_transaction().abort() # Abort and retry
            retry = retry + 1
86 87 88 89
        else:
          valid = 0
          retry=MAX_RETRY
      if valid: # We should validate each time XXX in case someone is deleting it at the same time
Jean-Paul Smets's avatar
Jean-Paul Smets committed
90 91 92 93 94
        if m.is_executed:                                          # Make sure message could be invoked
          activity_tool.SQLDict_delMessage(path=path, method_id=method_id, processing_node=processing_node)  # Delete it
          get_transaction().commit()                                        # If successful, commit
        else:
          get_transaction().abort()                                         # If not, abort transaction and start a new one
Jean-Paul Smets's avatar
Jean-Paul Smets committed
95
          activity_tool.SQLDict_assignMessage(path=path, method_id=method_id, processing_node = INVOKE_ERROR_STATE)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
96 97 98 99 100 101
                                                                            # Assign message back to 'error' state
          get_transaction().commit()                                        # and commit
      else:
        activity_tool.SQLDict_assignMessage(path=path, method_id=method_id, processing_node = VALIDATE_ERROR_STATE)
                                                                          # Assign message back to 'error' state
        get_transaction().commit()                                        # and commit
Jean-Paul Smets's avatar
Jean-Paul Smets committed
102
      return 0
103
    get_transaction().commit() # Release locks before starting a potentially long calculation
Jean-Paul Smets's avatar
Jean-Paul Smets committed
104 105 106 107 108 109 110 111 112
    return 1

  def hasActivity(self, activity_tool, object, method_id=None, **kw):
    my_object_path = '/'.join(object.getPhysicalPath())
    result = activity_tool.SQLDict_hasMessage(path=my_object_path, method_id=method_id)
    if len(result) > 0:
      return result[0].message_count > 0
    return 0

Jean-Paul Smets's avatar
Jean-Paul Smets committed
113
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
114 115
    """
      object_path is a tuple
Jean-Paul Smets's avatar
Jean-Paul Smets committed
116 117 118 119 120 121

      commit allows to choose mode
        - if we commit, then we make sure no locks are taken for too long
        - if we do not commit, then we can use flush in a larger transaction

      commit should in general not be used
Jean-Paul Smets's avatar
Jean-Paul Smets committed
122 123
    """
    path = '/'.join(object_path)
124
    # LOG('Flush', 0, str((path, invoke, method_id)))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
125 126
    result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None)
    if commit: get_transaction().commit() # Release locks before starting a potentially long calculation
Jean-Paul Smets's avatar
Jean-Paul Smets committed
127 128 129 130 131 132 133 134 135
    method_dict = {}
    if invoke:
      for line in result:
        path = line.path
        method_id = line.method_id
        if not method_dict.has_key(method_id):
          # Only invoke once (it would be different for a queue)
          method_dict[method_id] = 1
          m = self.loadMessage(line.message)
136 137 138 139 140 141 142 143 144 145 146 147 148 149
          retry = 0
          while retry < MAX_RETRY:
            if m.validate(self, activity_tool): # We should validate each time XXX in case someone is deleting it at the same time
              valid = 1
              activity_tool.invoke(m) # Try to invoke the message
              if m.is_executed:
                retry=MAX_RETRY
              else:
                get_transaction().abort() # Abort and retry
                retry = retry + 1
            else:
              valid = 0
              retry=MAX_RETRY
          if valid: # We should validate each time XXX in case someone is deleting it at the same time
Jean-Paul Smets's avatar
Jean-Paul Smets committed
150
            if m.is_executed:                                                 # Make sure message could be invoked
151
              activity_tool.SQLDict_delMessage(path=path, method_id=method_id, processing_node=None)  # Delete it
Jean-Paul Smets's avatar
Jean-Paul Smets committed
152 153 154 155 156 157
              if commit: get_transaction().commit()                           # If successful, commit
            else:
              if commit: get_transaction().abort()    # If not, abort transaction and start a new one
    else:
      activity_tool.SQLDict_delMessage(path=path, method_id=method_id)  # Delete all
      if commit: get_transaction().abort() # Commit flush
Jean-Paul Smets's avatar
Jean-Paul Smets committed
158

Jean-Paul Smets's avatar
Jean-Paul Smets committed
159
  def getMessageList(self, activity_tool, processing_node=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
160
    message_list = []
Jean-Paul Smets's avatar
Jean-Paul Smets committed
161
    result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
162 163
    for line in result:
      m = self.loadMessage(line.message)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
164
      m.processing_node = line.processing_node
Jean-Paul Smets's avatar
Jean-Paul Smets committed
165 166 167
      message_list.append(m)
    return message_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
  def distribute(self, activity_tool, node_count):
    processing_node = 1
    result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages
    get_transaction().commit() # Release locks before starting a potentially long calculation
    path_dict = {}
    for line in result:
      path = line.path
      if not path_dict.has_key(path):
        # Only assign once (it would be different for a queue)
        path_dict[path] = 1
        activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node)
        get_transaction().commit() # Release locks immediately to allow processing of messages
        processing_node = processing_node + 1
        if processing_node > node_count:
          processing_node = 1 # Round robin

Jean-Paul Smets's avatar
Jean-Paul Smets committed
184
registerActivity(SQLDict)