RAMDict.py 6.85 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

from Products.CMFActivity.ActivityTool import registerActivity
30
from Products.CMFActivity.Errors import ActivityFlushError
31
from Queue import Queue, VALID
Jean-Paul Smets's avatar
Jean-Paul Smets committed
32 33 34

from zLOG import LOG

35 36 37 38 39
try:
  from transaction import get as get_transaction
except ImportError:
  pass

Jean-Paul Smets's avatar
Jean-Paul Smets committed
40 41 42 43 44 45 46 47 48 49 50
class RAMDict(Queue):
  """
    A simple RAM based queue. It is not compatible with transactions which
    means methods can be called before an object even exists or before
    it is modified. This also means there is no garantee on any kind of sequenciality.

    Dictionnary is global.
  """

  def __init__(self):
    Queue.__init__(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
51
    self.queue_dict = {}
Jean-Paul Smets's avatar
Jean-Paul Smets committed
52

53 54 55 56
  def getDict(self, activity_tool_path):
    return self.queue_dict.setdefault(activity_tool_path, {})

  def finishQueueMessage(self, activity_tool_path, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
57
    if m.is_registered:
58
      self.getDict(activity_tool_path)[(tuple(m.object_path), m.method_id)] = m
Jean-Paul Smets's avatar
Jean-Paul Smets committed
59

60 61
  def finishDeleteMessage(self, activity_tool_path, message):
    for key, m in self.getDict(activity_tool_path).items():
62
      if m.object_path == message.object_path and m.method_id == message.method_id:
63
        del self.getDict(activity_tool_path)[(tuple(m.object_path), m.method_id)]
64

Jean-Paul Smets's avatar
Jean-Paul Smets committed
65
  def registerActivityBuffer(self, activity_buffer):
66
    pass
67

Jean-Paul Smets's avatar
Jean-Paul Smets committed
68
  def isMessageRegistered(self, activity_buffer, activity_tool, m):
69 70
    uid_set = activity_buffer.getUidSet(self)
    return (tuple(m.object_path), m.method_id) in uid_set
71

Jean-Paul Smets's avatar
Jean-Paul Smets committed
72
  def registerMessage(self, activity_buffer, activity_tool, m):
73 74 75 76
    message_list = activity_buffer.getMessageList(self)
    message_list.append(m)
    uid_set = activity_buffer.getUidSet(self)
    uid_set.add((tuple(m.object_path), m.method_id))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
77
    m.is_registered = 1
78

Jean-Paul Smets's avatar
Jean-Paul Smets committed
79
  def dequeueMessage(self, activity_tool, processing_node):
80 81
    path = activity_tool.getPhysicalPath()
    if len(self.getDict(path).keys()) is 0:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
82
      return 1  # Go to sleep
83
    for key, m in self.getDict(path).items():
84
      if m.validate(self, activity_tool) is VALID:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
85
        activity_tool.invoke(m)
86
        if m.is_executed:
87
          del self.getDict(path)[key]
88 89 90 91 92
          get_transaction().commit()
          return 0
        else:
          # Start a new transaction and keep on to next message
          get_transaction().commit()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
93 94
    return 1

95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
  def countMessage(self, activity_tool,path=None,method_id=None,**kw):
    tool_path = activity_tool.getPhysicalPath()
    count = 0
    for (key,m) in self.getDict(tool_path).items():
      add = 1
      if path is not None:
        object_path = '/'.join(m.object_path)
        if object_path != path:
          add = 0
      if method_id is not None:
        if m.method_id != method_id:
          add = 0
      count += add
    return count

110
  def hasActivity(self, activity_tool, object, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
111 112 113
    if object is not None:
      object_path = object.getPhysicalPath()
    else:
114 115 116 117
      object_path = None
    active_process = kw.get('active_process', None)
    path = activity_tool.getPhysicalPath()
    for m in self.getDict(path).values():
Jean-Paul Smets's avatar
Jean-Paul Smets committed
118 119 120 121
      # Filter active process and path if defined
      if active_process is None or m.active_process == active_process:
        if object_path is None or m.object_path == object_path:
          return 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
122 123
    return 0

124
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
125 126 127 128 129 130
    path = '/'.join(object_path)
    # LOG('Flush', 0, str((path, invoke, method_id)))
    method_dict = {}
    # Parse each message in registered
    for m in activity_tool.getRegisteredMessageList(self):
      if object_path == m.object_path and (method_id is None or method_id == m.method_id):
131
        if not method_dict.has_key(m.method_id):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
132 133
          if invoke:
            # First Validate
134
            if m.validate(self, activity_tool) is VALID:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
135 136 137 138 139
              activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
              if not m.is_executed:                                                 # Make sure message could be invoked
                # The message no longer exists
                raise ActivityFlushError, (
                    'Could not evaluate %s on %s' % (method_id , path))
140 141 142
              else:
                method_dict[m.method_id] = 1
                activity_tool.unregisterMessage(self, m) 
Jean-Paul Smets's avatar
Jean-Paul Smets committed
143 144 145
            else:
              # The message no longer exists
              raise ActivityFlushError, (
146 147 148
                  'The document %s does not exist' % path) 
          else:
            method_dict[m.method_id] = 1
149 150
            activity_tool.unregisterMessage(self, m)
        else:
151 152
          method_dict[m.method_id] = 1
          activity_tool.unregisterMessage(self, m)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
153
    # Parse each message in RAM dict
154 155
    path = activity_tool.getPhysicalPath()
    for key, m in self.getDict(path).items():
156
      if object_path == m.object_path and (method_id is None or method_id == m.method_id):
157 158
        if not method_dict.has_key(m.method_id):
          LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path))
159
          if invoke:
160 161 162 163 164 165 166
            activity_tool.invoke(m)
            if m.is_executed:
              method_dict[m.method_id] = 1
              self.deleteMessage(activity_tool, m)
          else:
            method_dict[m.method_id] = 1
            self.deleteMessage(activity_tool, m)
167
        else:
168
          self.deleteMessage(activity_tool, m)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
169

170
  def getMessageList(self, activity_tool, processing_node=None,**kw):
171
    new_queue = []
172 173
    path = activity_tool.getPhysicalPath()
    for m in self.getDict(path).values():
174 175 176 177
      m.processing_node = 1
      m.priority = 0
      new_queue.append(m)
    return new_queue
178

Jean-Paul Smets's avatar
Jean-Paul Smets committed
179
registerActivity(RAMDict)