Queue.py 8.04 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
import pickle, sys
30
from Acquisition import aq_base
31
from DateTime import DateTime
Jean-Paul Smets's avatar
Jean-Paul Smets committed
32 33
from Products.CMFActivity.ActivityTool import Message
from zLOG import LOG
Yoshinori Okuji's avatar
Yoshinori Okuji committed
34
from ZODB.POSException import ConflictError
Jean-Paul Smets's avatar
Jean-Paul Smets committed
35

36 37 38 39 40 41
# Error values for message validation
EXCEPTION      = -1
VALID          = 0
INVALID_PATH   = 1
INVALID_ORDER  = 2

42
# Time global parameters
43 44
MAX_PROCESSING_TIME = 900 # in seconds
VALIDATION_ERROR_DELAY = 30 # in seconds
45

Jean-Paul Smets's avatar
Jean-Paul Smets committed
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
class Queue:
  """
    Step 1: use lists

    Step 2: add some object related dict which prevents calling twice the same method

    Step 3: add some time information for deferred execution

    Step 4: use MySQL as a way to store events (with locks)

    Step 5: use periodic Timer to wakeup Scheduler

    Step 6: add multiple threads on a single Scheduler

    Step 7: add control thread to kill "events which last too long"

    Some data:

    - reindexObject = 50 ms

    - calling a MySQL read = 0.7 ms

    - calling a simple method by HTTP = 30 ms

    - calling a complex method by HTTP = 500 ms

    References:

    http://www.mysql.com/doc/en/InnoDB_locking_reads.html
    http://www.python.org/doc/current/lib/thread-objects.html
    http://www-poleia.lip6.fr/~briot/actalk/actalk.html
  """

  #scriptable_method_id_list = ['appendMessage', 'nextMessage', 'delMessage']

  def __init__(self):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
82 83
    self.is_alive = {}
    self.is_awake = {}
Jean-Paul Smets's avatar
Jean-Paul Smets committed
84
    self.is_initialized = 0
85
    self.max_processing_date = DateTime()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
86 87 88 89 90 91 92 93

  def initialize(self, activity_tool):
    # This is the only moment when
    # we can set some global variables related
    # to the ZODB context
    if not self.is_initialized:
      self.is_initialized = 1

Jean-Paul Smets's avatar
Jean-Paul Smets committed
94
  def queueMessage(self, activity_tool, m):    
95 96 97
    activity_tool.deferredQueueMessage(self, m)  
  
  def deleteMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
98 99 100
    if not getattr(m, 'is_deleted', 0):
      # We try not to delete twice
      # However this can not be garanteed in the case of messages loaded from SQL
Jean-Paul Smets's avatar
Jean-Paul Smets committed
101
      activity_tool.deferredDeleteMessage(self, m)  
Jean-Paul Smets's avatar
Jean-Paul Smets committed
102
    m.is_deleted = 1
103
    
Jean-Paul Smets's avatar
Jean-Paul Smets committed
104
  def dequeueMessage(self, activity_tool, processing_node):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
105 106
    pass

Jean-Paul Smets's avatar
Jean-Paul Smets committed
107 108 109 110
  def tic(self, activity_tool, processing_node):
    # Tic should return quickly to prevent locks or commit transactions at some point
    if self.dequeueMessage(activity_tool, processing_node):
      self.sleep(activity_tool, processing_node)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
111

Jean-Paul Smets's avatar
Jean-Paul Smets committed
112 113 114 115 116
  def distribute(self, activity_tool, node_count):
    pass

  def sleep(self, activity_tool, processing_node):
    self.is_awake[processing_node] = 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
117

Jean-Paul Smets's avatar
Jean-Paul Smets committed
118 119
  def wakeup(self, activity_tool, processing_node):
    self.is_awake[processing_node] = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
120

Jean-Paul Smets's avatar
Jean-Paul Smets committed
121 122 123
  def terminate(self, activity_tool, processing_node):
    self.is_awake[processing_node] = 0
    self.is_alive[processing_node] = 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
124

125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
  def validate(self, activity_tool, message, **kw):
    """
      This is the place where activity semantics is implemented
      **kw contains all parameters which allow to implement synchronisation,
      constraints, delays, etc.
      
      Standard synchronisation parameters:
      
      after_method_id   --  never validate message if after_method_id
                            is in the list of methods which are
                            going to be executed
    
      after_message_uid --  never validate message if after_message_uid
                            is in the list of messages which are
                            going to be executed
    
      after_path        --  never validate message if after_path
                            is in the list of path which are
                            going to be executed                                                        
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
145 146 147 148 149
    try:
      if activity_tool.unrestrictedTraverse(message.object_path) is None:
        # Do not try to call methods on objects which do not exist
        LOG('WARNING ActivityTool', 0,
           'Object %s does not exist' % '/'.join(message.object_path))
150 151
        return INVALID_PATH
      for k, v in kw.items():
152
        if activity_tool.validateOrder(message, k, v):
153
          return INVALID_ORDER
Yoshinori Okuji's avatar
Yoshinori Okuji committed
154 155
    except ConflictError:
      raise
Jean-Paul Smets's avatar
Jean-Paul Smets committed
156 157
    except:
      LOG('WARNING ActivityTool', 0,
158 159
          'Validation of Object %s raised exception' % '/'.join(message.object_path),
          error=sys.exc_info())
Jean-Paul Smets's avatar
Jean-Paul Smets committed
160
      # Do not try to call methods on objects which cause errors
Sebastien Robin's avatar
Sebastien Robin committed
161
      import pdb;pdb.set_trace()
162 163
      return EXCEPTION
    return VALID
Jean-Paul Smets's avatar
Jean-Paul Smets committed
164

Jean-Paul Smets's avatar
Jean-Paul Smets committed
165 166
  def isAwake(self, activity_tool, processing_node):
    return self.is_awake[processing_node]
Jean-Paul Smets's avatar
Jean-Paul Smets committed
167

168
  def hasActivity(self, activity_tool, object, processing_node=None, active_process=None, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
169 170
    return 0

Jean-Paul Smets's avatar
Jean-Paul Smets committed
171
  def flush(self, activity_tool, object, **kw):    
Jean-Paul Smets's avatar
Jean-Paul Smets committed
172 173
    pass

174 175 176 177 178 179 180 181
  def start(self, active_process=None):
    # Start queue / activities in queue for given process
    pass

  def stop(self, active_process=None):
    # Stop queue / activities in queue for given process
    pass

182 183 184 185
  def loadMessage(self, s, **kw):
    m = pickle.loads(s)
    m.__dict__.update(kw)
    return m
Jean-Paul Smets's avatar
Jean-Paul Smets committed
186 187 188 189

  def dumpMessage(self, m):
    return pickle.dumps(m)

190
  def getMessageList(self, activity_tool, processing_node=None,**kw):
191
    return []  
192

Sebastien Robin's avatar
Sebastien Robin committed
193 194 195 196 197 198
  def countMessage(self, activity_tool,**kw):
    return 0

  def countMessageWithTag(self, activity_tool,value):
    return 0
  
199 200
  # Transaction Management
  def prepareQueueMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
201
    # Called to prepare transaction commit for queued messages
202
    pass
203 204

  def finishQueueMessage(self, activity_tool_path, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
205
    # Called to commit queued messages
206 207
    pass

Sebastien Robin's avatar
Sebastien Robin committed
208
  def prepareDeleteMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
209
    # Called to prepare transaction commit for deleted messages
210
    pass
211 212

  def finishDeleteMessage(self, activity_tool_path, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
213
    # Called to commit deleted messages
214
    pass
215

Jean-Paul Smets's avatar
Jean-Paul Smets committed
216 217 218
  # Registration Management
  def registerActivityBuffer(self, activity_buffer):
    class_name = self.__class__.__name__
Jean-Paul Smets's avatar
Jean-Paul Smets committed
219
    setattr(activity_buffer, '_%s_message_list' % class_name, [])  
220

Jean-Paul Smets's avatar
Jean-Paul Smets committed
221 222 223
  def isMessageRegistered(self, activity_buffer, activity_tool, m):
    class_name = self.__class__.__name__
    return m in getattr(activity_buffer, '_%s_message_list' % class_name)
224

Jean-Paul Smets's avatar
Jean-Paul Smets committed
225 226 227 228
  def registerMessage(self, activity_buffer, activity_tool, m):
    class_name = self.__class__.__name__
    getattr(activity_buffer, '_%s_message_list' % class_name).append(m)
    m.is_registered = 1
229

Jean-Paul Smets's avatar
Jean-Paul Smets committed
230 231
  def unregisterMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 0
232

Jean-Paul Smets's avatar
Jean-Paul Smets committed
233 234
  def getRegisteredMessageList(self, activity_buffer, activity_tool):
    class_name = self.__class__.__name__
235
    if hasattr(activity_buffer, '_%s_message_list' % class_name):
236
      return filter(lambda m: m.is_registered, getattr(activity_buffer, '_%s_message_list' % class_name))
237
    else:
238 239 240 241
      return ()

  # Required for tests (time shift)
  def timeShift(self, activity_tool, delay):
242 243 244
    """
      delay is provided in fractions of day
    """
245
    pass