Transaction.py 17.3 KB
Newer Older
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1
##############################################################################
matt@zope.com's avatar
matt@zope.com committed
2
#
Guido van Rossum's avatar
Guido van Rossum committed
3 4
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
5
#
matt@zope.com's avatar
matt@zope.com committed
6 7 8 9 10 11
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
12
#
Jim Fulton's avatar
alpha1  
Jim Fulton committed
13 14 15
##############################################################################
"""Transaction management

Jeremy Hylton's avatar
Jeremy Hylton committed
16
$Id: Transaction.py,v 1.44 2002/12/02 22:22:38 jeremy Exp $
17
"""
Jim Fulton's avatar
alpha1  
Jim Fulton committed
18

19
import time, sys, struct, POSException
Jim Fulton's avatar
alpha1  
Jim Fulton committed
20 21
from struct import pack
from string import split, strip, join
22
from zLOG import LOG, ERROR, PANIC, INFO, BLATHER, WARNING
Jim Fulton's avatar
Jim Fulton committed
23
from POSException import ConflictError
24
from ZODB import utils
Jim Fulton's avatar
alpha1  
Jim Fulton committed
25

26 27 28
# Flag indicating whether certain errors have occurred.
hosed=0

29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
# There is an order imposed on all jars, based on the storages they
# serve, that must be consistent across all applications using the
# storages.  The order is defined by the sortKey() method of the jar.

def jar_cmp(j1, j2):
    # Call sortKey() every time, because a ZEO client could reconnect
    # to a different server at any time.
    try:
        k1 = j1.sortKey()
    except:
        LOG("TM", WARNING, "jar missing sortKey() method: %s" % j1)
        k1 = id(j1)

    try:
        k2 = j2.sortKey()
    except:
        LOG("TM", WARNING, "jar missing sortKey() method: %s" % j2)
        k2 = id(j2)
        
    return cmp(k1, k2)

Jim Fulton's avatar
alpha1  
Jim Fulton committed
50
class Transaction:
51 52 53 54 55
    user = ''
    description = ''
    _connections = None
    _extension = None
    _sub = None # This is a subtrasaction flag
Jim Fulton's avatar
alpha1  
Jim Fulton committed
56

57 58 59 60 61 62
    # The _non_st_objects variable is either None or a list
    # of jars that do not support subtransactions. This is used to
    # manage non-subtransaction-supporting jars during subtransaction
    # commits and aborts to ensure that they are correctly committed
    # or aborted in the "outside" transaction.
    _non_st_objects=None
63

64 65
    def __init__(self, id=None):
        self._id=id
Jim Fulton's avatar
alpha1  
Jim Fulton committed
66 67
        self._objects=[]
        self._append=self._objects.append
68 69 70 71 72

    def _init(self):
        self._objects=[]
        self._append=self._objects.append
        self.user=self.description=''
Jim Fulton's avatar
alpha1  
Jim Fulton committed
73 74 75
        if self._connections:
            for c in self._connections.values(): c.close()
            del self._connections
76

77 78 79
    def log(self, msg, level=INFO, error=None):
        LOG("TM:%s" % self._id, level, msg, error=error)

80
    def sub(self):
Jim Fulton's avatar
Jim Fulton committed
81
        # Create a manually managed subtransaction for internal use
82 83 84
        r=self.__class__()
        r.user=self.user
        r.description=self.description
85
        r._extension=self._extension
86
        return r
87

88 89 90 91 92
    def __str__(self):
        if self._id is None:
            return "Transaction user=%s" % `self.user`
        else:
            return "Transaction thread=%s user=%s" % (self._id, `self.user`)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
93

94
    def __del__(self):
Jeremy Hylton's avatar
Jeremy Hylton committed
95 96
        if self._objects:
            self.abort(freeme=0)
97

98
    def abort(self, subtransaction=0, freeme=1):
Jeremy Hylton's avatar
Jeremy Hylton committed
99
        """Abort the transaction.
100

Jeremy Hylton's avatar
Jeremy Hylton committed
101
        This is called from the application.  This means that we haven't
102
        entered two-phase commit yet, so no tpc_ messages are sent.
Jeremy Hylton's avatar
Jeremy Hylton committed
103
        """
104
        if subtransaction and (self._non_st_objects is not None):
Jim Fulton's avatar
Jim Fulton committed
105 106 107 108 109
            raise POSException.TransactionError, (
                """Attempted to abort a sub-transaction, but a participating
                data manager doesn't support partial abort.
                """)

110
        t = None
Jim Fulton's avatar
Jim Fulton committed
111

112
        if not subtransaction:
113 114 115 116
            # Must add in any non-subtransaction supporting objects that
            # may have been stowed away from previous subtransaction
            # commits.
            if self._non_st_objects is not None:
117
                self._objects.extend(self._non_st_objects)
118 119
                self._non_st_objects = None

120
            if self._sub is not None:
Jim Fulton's avatar
Jim Fulton committed
121 122
                # Abort of top-level transaction after commiting
                # subtransactions.
123 124
                subjars = self._sub.values()
                subjars.sort(jar_cmp)
125
                self._sub = None
126 127
            else:
                subjars = []
Jim Fulton's avatar
Jim Fulton committed
128

Jim Fulton's avatar
alpha1  
Jim Fulton committed
129
        try:
130
            # Abort the objects
Jim Fulton's avatar
alpha1  
Jim Fulton committed
131 132
            for o in self._objects:
                try:
133 134 135
                    j = getattr(o, '_p_jar', o)
                    if j is not None:
                        j.abort(o, self)
136
                except:
137
                    # Record the first exception that occurred
138
                    if t is None:
139
                        t, v, tb = sys.exc_info()
140 141 142
                    else:
                        self.log("Failed to abort object %016x" %
                                 utils.U64(o._p_oid), error=sys.exc_info())
Jim Fulton's avatar
Jim Fulton committed
143

144 145 146 147 148 149 150
            # tpc_begin() was never called, so tpc_abort() should not be
            # called.

            if not subtransaction:
                # abort_sub() must be called to clear subtransaction state
                for jar in subjars:
                    jar.abort_sub(self) # This should never fail
151

152 153
            if t is not None:
                raise t, v, tb
154

Jim Fulton's avatar
alpha1  
Jim Fulton committed
155
        finally:
156 157
            if t is not None:
                del tb # don't keep traceback in local variable
158 159
            del self._objects[:] # Clear registered
            if not subtransaction and freeme:
160 161 162 163
                if self._id is not None:
                    free_transaction()
            else:
                self._init()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
164

165
    def begin(self, info=None, subtransaction=None):
166
        """Begin a new transaction.
Jim Fulton's avatar
alpha1  
Jim Fulton committed
167 168

        This aborts any transaction in progres.
169
        """
170 171
        if self._objects:
            self.abort(subtransaction, 0)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
172 173 174
        if info:
            info=split(info,'\t')
            self.user=strip(info[0])
175
            self.description=strip(join(info[1:],'\t'))
Jim Fulton's avatar
alpha1  
Jim Fulton committed
176

177
    def commit(self, subtransaction=None):
178
        ""Finalize the transaction."""
Jeremy Hylton's avatar
Jeremy Hylton committed
179
        objects = self._objects
180

181
        subjars = []
182
        if subtransaction:
183 184 185 186
            if self._sub is None:
                # Must store state across multiple subtransactions
                # so that the final commit can commit all subjars.
                self._sub = {}
Jim Fulton's avatar
Jim Fulton committed
187
        else:
188 189 190 191 192
            if self._sub is not None:
                # This commit is for a top-level transaction that
                # has previously committed subtransactions.  Do
                # one last subtransaction commit to clear out the
                # current objects, then commit all the subjars.
Jim Fulton's avatar
Jim Fulton committed
193 194
                if objects:
                    self.commit(1)
Jeremy Hylton's avatar
Jeremy Hylton committed
195
                    objects = []
196 197
                subjars = self._sub.values()
                subjars.sort(jar_cmp)
Jeremy Hylton's avatar
Jeremy Hylton committed
198
                self._sub = None
199

200 201 202 203 204 205
                # If there were any non-subtransaction-aware jars
                # involved in earlier subtransaction commits, we need
                # to add them to the list of jars to commit.
                if self._non_st_objects is not None:
                    objects.extend(self._non_st_objects)
                    self._non_st_objects = None
Jim Fulton's avatar
Jim Fulton committed
206

207 208 209
        if (objects or subjars) and hosed:
            # Something really bad happened and we don't
            # trust the system state.
Jeremy Hylton's avatar
Jeremy Hylton committed
210 211 212 213 214 215 216 217 218 219 220 221 222 223
            raise POSException.TransactionError, hosed_msg

        # It's important that:
        #
        # - Every object in self._objects is either committed or
        #   aborted.
        #
        # - For each object that is committed we call tpc_begin on
        #   it's jar at least once
        #
        # - For every jar for which we've called tpc_begin on, we
        #   either call tpc_abort or tpc_finish. It is OK to call
        #   these multiple times, as the storage is required to ignore
        #   these calls if tpc_begin has not been called.
224 225 226 227
        #
        # - That we call tpc_begin() in a globally consistent order,
        #   so that concurrent transactions involving multiple storages
        #   do not deadlock.
Jim Fulton's avatar
alpha1  
Jim Fulton committed
228
        try:
Jeremy Hylton's avatar
Jeremy Hylton committed
229
            ncommitted = 0
230
            jars = self._get_jars(objects, subtransaction)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
231
            try:
232 233 234 235 236 237 238 239
                # If not subtransaction, then jars will be modified.
                self._commit_begin(jars, subjars, subtransaction)
                ncommitted += self._commit_objects(objects)
                if not subtransaction:
                    # Unless this is a really old jar that doesn't
                    # implement tpc_vote(), it must raise an exception
                    # if it can't commit the transaction.
                    for jar in jars:
Jeremy Hylton's avatar
Jeremy Hylton committed
240 241
                        try:
                            vote = jar.tpc_vote
242
                        except AttributeError:
Jeremy Hylton's avatar
Jeremy Hylton committed
243 244
                            pass
                        else:
245
                            vote(self)
Jeremy Hylton's avatar
Jeremy Hylton committed
246

247 248 249
                # Handle multiple jars separately.  If there are
                # multiple jars and one fails during the finish, we
                # mark this transaction manager as hosed.
250 251
                if len(jars) == 1:
                    self._finish_one(jars[0])
252
                else:
253
                    self._finish_many(jars)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
254
            except:
255
                # Ugh, we got an got an error during commit, so we
256 257 258 259 260 261 262 263 264 265 266 267 268
                # have to clean up.  First save the original exception
                # in case the cleanup process causes another
                # exception.
                t, v, tb = sys.exc_info()
                try:
                    self._commit_error(objects, ncommitted, jars, subjars)
                except:
                    LOG('ZODB', ERROR,
                        "A storage error occured during transaction "
                        "abort.  This shouldn't happen.",
                        error=sys.exc_info())
                    
                raise t, v, tb
Jim Fulton's avatar
alpha1  
Jim Fulton committed
269
        finally:
270
            del objects[:] # clear registered
271 272
            if not subtransaction and self._id is not None:
                free_transaction()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
273

274 275 276 277 278 279 280 281 282
    def _get_jars(self, objects, subtransaction):
        # Returns a list of jars for this transaction.
        
        # Find all the jars and sort them in a globally consistent order.
        # objects is a list of persistent objects and jars.
        # If this is a subtransaction and a jar is not subtransaction aware,
        # it's object gets delayed until the parent transaction commits.
        
        d = {}
Jeremy Hylton's avatar
Jeremy Hylton committed
283
        for o in objects:
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
            jar = getattr(o, '_p_jar', o)
            if jar is None:
                # I don't think this should ever happen, but can't
                # prove that it won't.  If there is no jar, there
                # is nothing to be done.
                self.log("Object with no jar registered for transaction: "
                         "%s" % repr(o), level=BLATHER)
                continue
            # jar may not be safe as a dictionary key
            key = id(jar)
            d[key] = jar

            if subtransaction:
                if hasattr(jar, "commit_sub"):
                    self._sub[key] = jar
                else:
                    if self._non_st_objects is None:
                        self._non_st_objects = []
                    self._non_st_objects.append(o)
                
        jars = d.values()
        jars.sort(jar_cmp)
Jeremy Hylton's avatar
Jeremy Hylton committed
306

307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
        return jars

    def _commit_begin(self, jars, subjars, subtransaction):
        if subtransaction:
            assert not subjars
            for jar in jars:
                try:
                    jar.tpc_begin(self, subtransaction)
                except TypeError:
                    # Assume that TypeError means that tpc_begin() only
                    # takes one argument, and that the jar doesn't
                    # support subtransactions.
                    jar.tpc_begin(self)
        else:
            # Merge in all the jars used by one of the subtransactions.

            # When the top-level subtransaction commits, the tm must
            # call commit_sub() for each jar involved in one of the
            # subtransactions.  The commit_sub() method should call
            # tpc_begin() on the storage object.

            # It must also call tpc_begin() on jars that were used in
            # a subtransaction but don't support subtransactions.

            # These operations must be performed on the jars in order.

            # Modify jars inplace to include the subjars, too.
            jars += subjars
            jars.sort(jar_cmp)
            # assume that subjars is small, so that it's cheaper to test
            # whether jar in subjars than to make a dict and do has_key.
            for jar in jars:
                if jar in subjars:
                    jar.commit_sub(self)
                else:
                    jar.tpc_begin(self)

    def _commit_objects(self, objects):
        ncommitted = 0
        for o in objects:
            jar = getattr(o, "_p_jar", o)
            if jar is None:
                continue
            jar.commit(o, self)
Jeremy Hylton's avatar
Jeremy Hylton committed
351 352 353
            ncommitted += 1
        return ncommitted

354
    def _finish_one(self, jar):
Jeremy Hylton's avatar
Jeremy Hylton committed
355
        try:
356 357
            # The database can't guarantee consistency if call fails.
            jar.tpc_finish(self)
Jeremy Hylton's avatar
Jeremy Hylton committed
358 359 360 361 362 363 364 365
        except:
            # Bug if it does, we need to keep track of it
            LOG('ZODB', ERROR,
                "A storage error occurred in the last phase of a "
                "two-phase commit.  This shouldn\'t happen. ",
                error=sys.exc_info())
            raise

366
    def _finish_many(self, jars):
Jeremy Hylton's avatar
Jeremy Hylton committed
367 368
        global hosed
        try:
369 370 371
            for jar in jars:
                # The database can't guarantee consistency if call fails.
                jar.tpc_finish(self)
372
        except:
Jeremy Hylton's avatar
Jeremy Hylton committed
373 374 375 376
            hosed = 1
            LOG('ZODB', PANIC,
                "A storage error occurred in the last phase of a "
                "two-phase commit.  This shouldn\'t happen. "
377
                "The application will not be allowed to commit "
Jeremy Hylton's avatar
Jeremy Hylton committed
378 379 380
                "until the site/storage is reset by a restart. ",
                error=sys.exc_info())
            raise
381

382 383 384 385
    def _commit_error(self, objects, ncommitted, jars, subjars):
        # First, we have to abort any uncommitted objects.  The abort
        # will mark the object for invalidation, so that it's last
        # committed state will be restored.
Jeremy Hylton's avatar
Jeremy Hylton committed
386 387 388 389 390 391
        for o in objects[ncommitted:]:
            try:
                j = getattr(o, '_p_jar', o)
                if j is not None:
                    j.abort(o, self)
            except:
392 393 394 395 396 397 398 399
                # nothing to do but log the error
                self.log("Failed to abort object %016x" % utils.U64(o._p_oid),
                         error=sys.exc_info())

        # Abort the two-phase commit.  It's only necessary to abort the
        # commit for jars that began it, but it is harmless to abort it
        # for all.
        for j in jars:
Jeremy Hylton's avatar
Jeremy Hylton committed
400 401
            try:
                j.tpc_abort(self) # This should never fail
402
            except:
Jeremy Hylton's avatar
Jeremy Hylton committed
403 404 405
                LOG('ZODB', ERROR,
                    "A storage error occured during object abort. This "
                    "shouldn't happen. ", error=sys.exc_info())
406

407 408 409 410 411 412 413 414
        # After the tpc_abort(), call abort_sub() on all the
        # subtrans-aware jars to *really* abort the subtransaction.
        
        # Example: For Connection(), the tpc_abort() will abort the
        # subtransaction TmpStore() and abort_sub() will remove the
        # TmpStore.

        for j in subjars:
Jeremy Hylton's avatar
Jeremy Hylton committed
415 416 417 418 419 420 421 422
            try:
                j.abort_sub(self) # This should never fail
            except:
                LOG('ZODB', ERROR,
                    "A storage error occured during sub-transaction "
                    "object abort.  This shouldn't happen.",
                    error=sys.exc_info())

Jim Fulton's avatar
alpha1  
Jim Fulton committed
423 424 425 426
    def register(self,object):
        'Register the given object for transaction control.'
        self._append(object)

Jim Fulton's avatar
Jim Fulton committed
427
    def note(self, text):
Jim Fulton's avatar
alpha1  
Jim Fulton committed
428 429
        if self.description:
            self.description = "%s\n\n%s" % (self.description, strip(text))
430
        else:
Jim Fulton's avatar
alpha1  
Jim Fulton committed
431
            self.description = strip(text)
432

Jim Fulton's avatar
alpha1  
Jim Fulton committed
433 434
    def setUser(self, user_name, path='/'):
        self.user="%s %s" % (path, user_name)
Jim Fulton's avatar
Jim Fulton committed
435 436 437 438 439 440

    def setExtendedInfo(self, name, value):
        ext=self._extension
        if ext is None:
            ext=self._extension={}
        ext[name]=value
Jim Fulton's avatar
alpha1  
Jim Fulton committed
441

Jeremy Hylton's avatar
Jeremy Hylton committed
442 443 444 445 446 447 448 449 450 451 452 453
hosed_msg = \
"""A serious error, which was probably a system error,
occurred in a previous database transaction.  This
application may be in an invalid state and must be
restarted before database updates can be allowed.

Beware though that if the error was due to a serious
system problem, such as a disk full condition, then
the application may not come up until you deal with
the system problem.  See your application log for
information on the error that lead to this problem.
"""
454

Jim Fulton's avatar
alpha1  
Jim Fulton committed
455 456
############################################################################
# install get_transaction:
Jim Fulton's avatar
Jim Fulton committed
457

Jim Fulton's avatar
alpha1  
Jim Fulton committed
458 459
try:
    import thread
460 461

except:
462
    _t = Transaction(None)
463

464 465
    def get_transaction(_t=_t):
        return _t
466

467 468
    def free_transaction(_t=_t):
        _t.__init__()
469 470

else:
471
    _t = {}
472

473 474 475 476 477
    def get_transaction(_id=thread.get_ident, _t=_t, get=_t.get):
        id = _id()
        t = get(id, None)
        if t is None:
            _t[id] = t = Transaction(id)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
478
        return t
Jim Fulton's avatar
Jim Fulton committed
479

Jim Fulton's avatar
alpha1  
Jim Fulton committed
480
    def free_transaction(_id=thread.get_ident, _t=_t):
481 482 483 484 485
        id = _id()
        try:
            del _t[id]
        except KeyError:
            pass
Jim Fulton's avatar
Jim Fulton committed
486

Jim Fulton's avatar
alpha1  
Jim Fulton committed
487
    del thread
's avatar
committed
488

Jim Fulton's avatar
alpha1  
Jim Fulton committed
489
del _t
Jim Fulton's avatar
Jim Fulton committed
490

491 492 493
import __builtin__
__builtin__.get_transaction=get_transaction
del __builtin__