ActivityBuffer.py 5.7 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#          Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
#
# Based on: db.py in ZMySQLDA
#
##############################################################################

from Shared.DC.ZRDB.TM import TM
from zLOG import LOG, ERROR, INFO
Jean-Paul Smets's avatar
Jean-Paul Smets committed
28
import sys
Jean-Paul Smets's avatar
Jean-Paul Smets committed
29

30 31 32 33 34
try:
  from transaction import get as get_transaction
except ImportError:
  pass

35 36 37 38
# python2.3 compatibility
if not hasattr(globals()['__builtins__'], 'set'):
  from sets import Set as set

Jean-Paul Smets's avatar
Jean-Paul Smets committed
39
class ActivityBuffer(TM):
40

41
  _p_oid=_p_changed=_registered=None
42

43
  def __init__(self, activity_tool=None):
44
    self.requires_prepare = 0
45

46
  def _getBuffer(self):
47
    buffer = self
48 49 50 51 52 53
    # Create attributes only if they are not present.
    if not hasattr(buffer, 'queued_activity'):
      buffer.queued_activity = []
      buffer.flushed_activity = []
      buffer.message_list_dict = {}
      buffer.uid_set_dict = {}
54
    return buffer
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70

  def _clearBuffer(self):
    buffer = self._getBuffer()
    del buffer.queued_activity[:]
    del buffer.flushed_activity[:]
    buffer.message_list_dict.clear()
    buffer.uid_set_dict.clear()

  def getMessageList(self, activity):
    buffer = self._getBuffer()
    return buffer.message_list_dict.setdefault(activity, []) 

  def getUidSet(self, activity):
    buffer = self._getBuffer()
    return buffer.uid_set_dict.setdefault(activity, set())

71 72 73 74
  def _register(self, activity_tool):
    self._beginAndHook(activity_tool)
    TM._register(self)

75 76
  # Keeps a list of messages to add and remove
  # at end of transaction
77
  def _beginAndHook(self, activity_tool):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
78
    # LOG('ActivityBuffer', 0, '_begin %r' % (self,))
79 80 81
    from ActivityTool import activity_list
    self.requires_prepare = 1
    try:
82 83 84

      # Reset registration for each transaction.
      for activity in activity_list:
85
        activity.registerActivityBuffer(self)
86

87 88 89 90
      # In Zope 2.8 (ZODB 3.4), use beforeCommitHook instead of
      # patching Trasaction.
      transaction = get_transaction()
      try:
91
        transaction.beforeCommitHook(self.tpc_prepare, transaction, activity_tool=activity_tool)
92 93 94 95 96 97
      except AttributeError:
        pass
    except:
      LOG('ActivityBuffer', ERROR, "exception during _begin",
          error=sys.exc_info())
      raise
Jean-Paul Smets's avatar
Jean-Paul Smets committed
98

99
  def _finish(self, *ignored):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
100
    # LOG('ActivityBuffer', 0, '_finish %r' % (self,))
101 102 103
    try:
      try:
        # Try to push / delete all messages
104 105
        buffer = self._getBuffer()
        for (activity, message) in buffer.flushed_activity:
106
          activity.finishDeleteMessage(self._activity_tool_path, message)
107
        for (activity, message) in buffer.queued_activity:
108 109 110 111 112 113
          activity.finishQueueMessage(self._activity_tool_path, message)
      except:
        LOG('ActivityBuffer', ERROR, "exception during _finish",
            error=sys.exc_info())
        raise
    finally:
114
      self._clearBuffer()
115

116
  def _abort(self, *ignored):
117
    self._clearBuffer()
118

119 120 121
  def tpc_prepare(self, transaction, sub=None, activity_tool=None):
    assert activity_tool is not None
    self._activity_tool_path = activity_tool.getPhysicalPath()
122 123
    # Do nothing if it is a subtransaction
    if sub is not None:
124
      return
125 126

    if not self.requires_prepare:
127
      return
128 129

    self.requires_prepare = 0
130 131
    try:
      # Try to push / delete all messages
132 133
      buffer = self._getBuffer()
      for (activity, message) in buffer.flushed_activity:
134
        activity.prepareDeleteMessage(activity_tool, message)
135
      activity_dict = {}
136 137 138
      for (activity, message) in buffer.queued_activity:
        activity_dict.setdefault(activity, []).append(message)
      for activity, message_list in activity_dict.iteritems():
139
        if hasattr(activity, 'prepareQueueMessageList'):
140
          activity.prepareQueueMessageList(activity_tool, message_list)
141 142
        else:
          for message in message_list:
143
            activity.prepareQueueMessage(activity_tool, message)
144 145 146 147
    except:
      LOG('ActivityBuffer', ERROR, "exception during tpc_prepare",
          error=sys.exc_info())
      raise
Jean-Paul Smets's avatar
Jean-Paul Smets committed
148

149
  def deferredQueueMessage(self, activity_tool, activity, message):
150
    self._register(activity_tool)
151 152 153
    # Activity is called to prevent queuing some messages (useful for example
    # to prevent reindexing objects multiple times)
    if not activity.isMessageRegistered(self, activity_tool, message):
154 155
      buffer = self._getBuffer()
      buffer.queued_activity.append((activity, message))
156 157 158 159 160
      # We register queued messages so that we can
      # unregister them
      activity.registerMessage(self, activity_tool, message)

  def deferredDeleteMessage(self, activity_tool, activity, message):
161
    self._register(activity_tool)
162 163
    buffer = self._getBuffer()
    buffer.flushed_activity.append((activity, message))
164 165 166 167

  def sortKey(self, *ignored):
    """Activities must be finished before databases commit transactions."""
    return -1