Commit 6cd3100c authored by Fred Drake's avatar Fred Drake

parent 25cdb974
This package is currently a facade of the ZODB.Transaction module.
It exists to support:
- Application code that uses the ZODB 4 transaction API
- ZODB4-style data managers (transaction.interfaces.IDataManager)
Note that the data manager API, transaction.interfaces.IDataManager,
is syntactically simple, but semantically complex. The semantics
were not easy to express in the interface. This could probably use
more work. The semantics are presented in detail through examples of
a sample data manager in transaction.tests.test_SampleDataManager.
##############################################################################
############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# Copyright (c) 2001, 2002, 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
......@@ -10,12 +10,7 @@
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
############################################################################
from transaction.manager import ThreadedTransactionManager
from ZODB.Transaction import get_transaction
_manager = ThreadedTransactionManager()
get_transaction = _manager.get
def set_factory(factory):
_manager.txn_factory = factory
......@@ -11,131 +11,107 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from zope.interface import Interface
class TransactionError(StandardError):
"""An error occured due to normal transaction processing."""
try:
from zope.interface import Interface
except ImportError:
class Interface:
pass
class ConflictError(TransactionError):
"""Two transactions tried to modify the same object at once
This transaction should be resubmitted.
"""
class IllegalStateError(TransactionError):
"""An operation was invoked that wasn't valid in the current
transaction state.
"""
def __init__(self, verb, state):
self._verb = verb
self._state = state
def __str__(self):
return "Can't %s transaction in %s state" % (self._verb,
self._state)
class AbortError(TransactionError):
"""Transaction commit failed and the application must abort."""
class IDataManager(Interface):
"""Data management interface for storing objects transactionally
def __init__(self, datamgr):
self.datamgr = datamgr
This is currently implemented by ZODB database connections.
def __str__(self):
str = self.__class__.__doc__ + " Failed data manager: %s"
return str % self.datamgr
XXX This exists to document ZODB4 behavior, to help create some
backward-compatability support for Zope 3. New classes shouldn't
implement this. They should implement ZODB.interfaces.IDataManager
for now. Our hope is that there will eventually be an interface
like this or that this interface will evolve and become the
standard interface. There are some issues to be resolved first, like:
class RollbackError(TransactionError):
"""An error occurred rolling back a savepoint."""
- Probably want separate abort methods for use in and out of
two-phase commit.
class IDataManager(Interface):
"""Data management interface for storing objects transactionally
- The savepoint api may need some more thought.
This is currently implemented by ZODB database connections.
"""
def prepare(transaction):
"""Begin two-phase commit of a transaction.
"""Perform the first phase of a 2-phase commit
The data manager prepares for commit any changes to be made
persistent. A normal return from this method indicated that
the data manager is ready to commit the changes.
The data manager must raise an exception if it is not prepared
to commit the transaction after executing prepare().
The transaction must match that used for preceeding
savepoints, if any.
"""
# This is equivalent to zodb3's tpc_begin, commit, and
# tpc_vote combined.
def abort(transaction):
"""Abort changes made by transaction."""
"""Abort changes made by transaction
def commit(transaction):
"""Commit changes made by transaction."""
This may be called before two-phase commit or in the second
phase of two-phase commit.
def savepoint(transaction):
"""Do tentative commit of changes to this point.
The transaction must match that used for preceeding
savepoints, if any.
Should return an object implementing IRollback
"""
class IRollback(Interface):
# This is equivalent to *both* zodb3's abort and tpc_abort
# calls. This should probably be split into 2 methods.
def rollback():
"""Rollback changes since savepoint."""
def commit(transaction):
"""Finish two-phase commit
class ITransaction(Interface):
"""Transaction objects
The prepare method must be called, with the same transaction,
before calling commit.
Application code typically gets these by calling
get_transaction().
"""
def abort():
"""Abort the current transaction."""
def begin():
"""Begin a transaction."""
# This is equivalent to zodb3's tpc_finish
def commit():
"""Commit a transaction."""
def savepoint(transaction):
"""Do tentative commit of changes to this point.
def join(resource):
"""Join a resource manager to the current transaction."""
Should return an object implementing IRollback that can be used
to rollback to the savepoint.
def status():
"""Return status of the current transaction."""
Note that (unlike zodb3) this doesn't use a 2-phase commit
protocol. If this call fails, or if a rollback call on the
result fails, the (containing) transaction should be
aborted. Aborting the containing transaction is *not* the
responsibility of the data manager, however.
def suspend():
"""Suspend the current transaction.
An implementation that doesn't support savepoints should
implement this method by returning a rollback implementation
that always raises an error when it's rollback method is
called. The savepoing method shouldn't raise an error. This
way, transactions that create savepoints can proceed as long
as an attempt is never made to roll back a savepoint.
If a transaction is suspended, the transaction manager no
longer treats it as active. The resume() method must be
called before the transaction can be used.
"""
def resume():
"""Resume the current transaction.
If another transaction is active, it must be suspended before
resume() is called.
"""
class IRollback(Interface):
class ITransactionManager(Interface):
"""Coordinates application use of transactional resources."""
def rollback():
"""Rollback changes since savepoint.
def get():
"""Return the curren transaction.
IOW, rollback to the last savepoint.
Calls new() to start a new transaction if one does not exist.
"""
It is an error to rollback to a savepoint if:
def begin():
"""Return a new transaction.
- An earlier savepoint within the same transaction has been
rolled back to, or
If a transaction is currently active for the calling thread,
it is aborted.
- The transaction has ended.
"""
def commit(txn):
"""Commit txn."""
def abort(txn):
"""Abort txn."""
def savepoint(txn):
"""Return rollback object that can restore txn to current state."""
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import logging
import sys
from zope.interface import implements
from transaction.interfaces import *
from transaction.txn import Transaction, Status, Set
# XXX need to change asserts of transaction status into explicit checks
# that raise some exception
# XXX need lots of error checking
class AbstractTransactionManager(object):
# base class to provide commit logic
# concrete class must provide logger attribute
txn_factory = Transaction
# XXX the methods below use assertions, but perhaps they should
# check errors. on the other hand, the transaction instances
# do raise exceptions.
def commit(self, txn):
# commit calls _finishCommit() or abort()
assert txn._status is Status.ACTIVE
txn._status = Status.PREPARING
self.logger.debug("%s: prepare", txn)
try:
for r in txn._resources:
r.prepare(txn)
except:
txn._status = Status.FAILED
raise
txn._status = Status.PREPARED
self._finishCommit(txn)
def _finishCommit(self, txn):
self.logger.debug("%s: commit", txn)
try:
for r in txn._resources:
r.commit(txn)
txn._status = Status.COMMITTED
except:
# An error occured during the second phase of 2PC. We can
# no longer guarantee the system is in a consistent state.
# The best we can do is abort() all the resource managers
# that haven't already committed and hope for the best.
error = sys.exc_info()
txn._status = Status.FAILED
self.abort(txn)
msg = ("Transaction failed during second phase of two-"
"phase commit")
self.logger.critical(msg, exc_info=error)
raise TransactionError(msg)
def abort(self, txn):
self.logger.debug("%s: abort", txn)
assert txn._status in (Status.ACTIVE, Status.PREPARED, Status.FAILED,
Status.ABORTED)
txn._status = Status.PREPARING
for r in txn._resources:
r.abort(txn)
txn._status = Status.ABORTED
def savepoint(self, txn):
assert txn._status == Status.ACTIVE
self.logger.debug("%s: savepoint", txn)
return Rollback(txn, [r.savepoint(txn) for r in txn._resources])
class TransactionManager(AbstractTransactionManager):
implements(ITransactionManager)
def __init__(self):
self.logger = logging.getLogger("txn")
self._current = None
self._suspended = Set()
def get(self):
if self._current is None:
self._current = self.begin()
return self._current
def begin(self):
if self._current is not None:
self._current.abort()
self._current = self.txn_factory(self)
self.logger.debug("%s: begin", self._current)
return self._current
def commit(self, txn):
super(TransactionManager, self).commit(txn)
self._current = None
def abort(self, txn):
super(TransactionManager, self).abort(txn)
self._current = None
def suspend(self, txn):
if self._current != txn:
raise TransactionError("Can't suspend transaction because "
"it is not active")
self._suspended.add(txn)
self._current = None
def resume(self, txn):
if self._current is not None:
raise TransactionError("Can't resume while other "
"transaction is active")
self._suspended.remove(txn)
self._current = txn
class Rollback(object):
implements(IRollback)
def __init__(self, txn, resources):
self._txn = txn
self._resources = resources
def rollback(self):
if self._txn.status() != Status.ACTIVE:
raise IllegalStateError("rollback", self._txn.status())
for r in self._resources:
r.rollback()
# make the transaction manager visible to client code
import thread
class ThreadedTransactionManager(AbstractTransactionManager):
# XXX Do we need locking on _pool or _suspend?
# Most methods read and write pool based on the id of the current
# thread, so they should never interfere with each other.
# The suspend() and resume() methods modify the _suspend set,
# but suspend() only adds a new thread. The resume() method
# does need a lock to prevent two different threads from resuming
# the same transaction.
implements(ITransactionManager)
def __init__(self):
self.logger = logging.getLogger("txn")
self._pool = {}
self._suspend = Set()
self._lock = thread.allocate_lock()
def get(self):
tid = thread.get_ident()
txn = self._pool.get(tid)
if txn is None:
txn = self.begin()
return txn
def begin(self):
tid = thread.get_ident()
txn = self._pool.get(tid)
if txn is not None:
txn.abort()
txn = self.txn_factory(self)
self._pool[tid] = txn
return txn
def _finishCommit(self, txn):
tid = thread.get_ident()
assert self._pool[tid] is txn
super(ThreadedTransactionManager, self)._finishCommit(txn)
del self._pool[tid]
def abort(self, txn):
tid = thread.get_ident()
assert self._pool[tid] is txn
super(ThreadedTransactionManager, self).abort(txn)
del self._pool[tid]
# XXX should we require that the transaction calling suspend()
# be the one that is using the transaction?
# XXX need to add locking to suspend() and resume()
def suspend(self, txn):
tid = thread.get_ident()
if self._pool.get(tid) is txn:
self._suspend.add(txn)
del self._pool[tid]
else:
raise TransactionError("txn %s not owned by thread %s" %
(txn, tid))
def resume(self, txn):
tid = thread.get_ident()
if self._pool.get(tid) is not None:
raise TransactionError("thread %s already has transaction" %
tid)
if txn not in self._suspend:
raise TransactionError("unknown transaction: %s" % txn)
self._suspend.remove(txn)
self._pool[tid] = txn
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
# Make this directory a package
......@@ -27,10 +27,11 @@ the data actually gets written/not written to the storge.
Obviously this test suite should be expanded.
$Id: abstestIDataManager.py,v 1.3 2003/05/01 19:34:57 faassen Exp $
$Id: abstestIDataManager.py,v 1.4 2004/02/20 16:56:57 fdrake Exp $
"""
from unittest import TestCase
from transaction.interfaces import IRollback
class IDataManagerTests(TestCase, object):
......@@ -59,5 +60,4 @@ class IDataManagerTests(TestCase, object):
def testRollback(self):
tran = self.get_transaction()
rb = self.datamgr.savepoint(tran)
if rb is not None:
rb.rollback()
self.assert_(IRollback.isImplementedBy(rb))
This diff is collapsed.
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Basic tests of the transaction manager."""
import unittest
from transaction.interfaces import *
from transaction.manager import TransactionManager, ThreadedTransactionManager
from transaction.txn import Status
from zope.testing.loghandler import Handler
from logging import CRITICAL
class TestDataManager:
def __init__(self, fail=None, vote=True):
# pass the name of a method that should fail as fail
self._fail = fail
# pass the return value for prepare as vote
self._vote = vote
def prepare(self, txn):
if self._fail == "prepare":
raise RuntimeError
def abort(self, txn):
if self._fail == "abort":
raise RuntimeError
def commit(self, txn):
if self._fail == "commit":
raise RuntimeError
def savepoint(self, txn):
if self._fail == "savepoint":
raise RuntimeError
# XXX should anything be done here?
class BaseTxnTests(unittest.TestCase):
def setUp(self):
self.manager = self.ManagerFactory()
self.handler = Handler(self)
self.handler.add("txn")
def tearDown(self):
self.handler.close()
def testBegin(self):
txn = self.manager.begin()
self.assertEqual(txn.status(), Status.ACTIVE)
txn2 = self.manager.get()
self.assertEqual(id(txn), id(txn2))
txn3 = self.manager.begin()
self.assert_(id(txn) != id(txn3))
self.assertEqual(txn.status(), Status.ABORTED)
# the trivial tests don't involve any resource managers
def testTrivialCommit(self):
txn = self.manager.begin()
txn.commit()
self.assertEqual(txn.status(), Status.COMMITTED)
self.assertRaises(IllegalStateError, txn.commit)
self.assertRaises(IllegalStateError, txn.savepoint)
self.assertRaises(IllegalStateError, txn.abort)
def testTrivialAbort(self):
txn = self.manager.begin()
txn.abort()
self.assertEqual(txn.status(), Status.ABORTED)
self.assertRaises(IllegalStateError, txn.commit)
self.assertRaises(IllegalStateError, txn.savepoint)
def testTrivialSavepoint(self):
txn = self.manager.begin()
r1 = txn.savepoint()
r2 = txn.savepoint()
r2.rollback()
txn.abort()
self.assertRaises(IllegalStateError, r2.rollback)
def testTrivialSuspendResume(self):
txn1 = self.manager.begin()
txn1.suspend()
self.assertRaises(TransactionError, txn1.suspend)
txn2 = self.manager.begin()
self.assert_(txn1 != txn2)
txn2.suspend()
txn1.resume()
txn1.commit()
self.assertRaises(TransactionError, txn1.suspend)
txn2.resume()
txn2.abort()
self.assertRaises(TransactionError, txn2.suspend)
# XXX need a multi-threaded test of suspend / resume
# more complex tests use a simple data manager
def testCommit(self):
txn = self.manager.begin()
for i in range(3):
txn.join(TestDataManager())
txn.commit()
def testCommitPrepareException(self):
txn = self.manager.begin()
txn.join(TestDataManager())
txn.join(TestDataManager(fail="prepare"))
self.assertRaises(RuntimeError, txn.commit)
self.assertEqual(txn.status(), Status.FAILED)
txn.abort()
def testCommitFailure(self):
txn = self.manager.begin()
txn.join(TestDataManager())
txn.join(TestDataManager(fail="commit"))
self.assertRaises(TransactionError, txn.commit)
self.assertEqual(txn.status(), Status.ABORTED)
txn.abort()
self.handler.assertLogsMessage("Transaction failed during second "
"phase of two-phase commit",
level=CRITICAL)
class SimpleTxnTests(BaseTxnTests):
ManagerFactory = TransactionManager
class ThreadedTxnTests(BaseTxnTests):
ManagerFactory = ThreadedTransactionManager
def test_suite():
s = unittest.TestSuite()
for klass in SimpleTxnTests, ThreadedTxnTests:
s.addTest(unittest.makeSuite(klass))
return s
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test transaction utilities
$Id: test_util.py,v 1.2 2004/02/20 16:56:58 fdrake Exp $
"""
import unittest
from doctest import DocTestSuite
def test_suite():
return DocTestSuite('transaction.util')
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
__metaclass__ = type
from threading import Lock
from transaction.interfaces import *
from zope.interface import implements
try:
from sets import Set
except ImportError:
class Set(dict):
def add(self, k):
self[k] = 1
def remove(self, k):
del self[k]
class Status:
ACTIVE = "Active"
PREPARING = "Preparing"
PREPARED = "Prepared"
FAILED = "Failed"
COMMITTED = "Committed"
ABORTING = "Aborting"
ABORTED = "Aborted"
SUSPENDED = "Suspended"
class Transaction:
implements(ITransaction)
def __init__(self, manager=None, parent=None):
self._manager = manager
self._parent = parent
self._status = Status.ACTIVE
self._suspend = None
self._resources = Set()
self._lock = Lock()
def __repr__(self):
return "<%s %X %s>" % (self.__class__.__name__, id(self), self._status)
def begin(self, parent=None):
"""Begin a transaction.
If parent is not None, it is the parent transaction for this one.
"""
assert self._manager is not None
if parent is not None:
t = Transaction(self._manager, self)
return t
def commit(self):
"""Commit a transaction."""
assert self._manager is not None
if self._status != Status.ACTIVE:
raise IllegalStateError("commit", self._status)
self._manager.commit(self)
def abort(self):
"""Rollback to initial state."""
assert self._manager is not None
if self._status == Status.ABORTED:
return
if self._status not in (Status.ACTIVE, Status.PREPARED, Status.FAILED):
raise IllegalStateError("abort", self._status)
self._manager.abort(self)
def savepoint(self):
"""Save current progress and return a savepoint."""
assert self._manager is not None
if self._status != Status.ACTIVE:
raise IllegalStateError("create savepoint", self._status)
return self._manager.savepoint(self)
def join(self, resource):
"""resource is participating in the transaction."""
assert self._manager is not None
if self._status != Status.ACTIVE:
raise IllegalStateError("join", self._status)
self._manager.logger.debug("%s join %s" % (self, resource))
self._resources.add(resource)
def status(self):
"""Return the status of the transaction."""
return self._status
def suspend(self):
self._lock.acquire()
try:
if self._status == Status.SUSPENDED:
raise IllegalStateError("suspend", self._status)
self._manager.suspend(self)
self._suspend = self._status
self._status = Status.SUSPENDED
finally:
self._lock.release()
def resume(self):
self._lock.acquire()
try:
if self._status != Status.SUSPENDED:
raise TransactionError("Can only resume suspended transaction")
self._manager.resume(self)
self._status = self._suspend
self._suspend = None
finally:
self._lock.release()
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Utility classes or functions
$Id: util.py,v 1.2 2004/02/20 16:56:56 fdrake Exp $
"""
from transaction.interfaces import IRollback
try:
from zope.interface import implements
except ImportError:
def implements(*args):
pass
class NoSavepointSupportRollback:
"""Rollback for data managers that don't support savepoints
>>> class DataManager:
... def savepoint(self, txn):
... return NoSavepointSupportRollback(self)
>>> rb = DataManager().savepoint('some transaction')
>>> rb.rollback()
Traceback (most recent call last):
...
NotImplementedError: """ \
"""DataManager data managers do not support """ \
"""savepoints (aka subtransactions
"""
implements(IRollback)
def __init__(self, dm):
self.dm = dm.__class__.__name__
def rollback(self):
raise NotImplementedError(
"%s data managers do not support savepoints (aka subtransactions"
% self.dm)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment