Commit eea28a70 authored by Jeremy Hylton's avatar Jeremy Hylton

Catch errors that current during 2nd phase of 2PC.

It's not clear what the right answer is here.  The implementation
aborts any remaining resource managers and hopes for the best, which
isn't very satisfying.  Perhaps the hosed feature should come back.

Add some comments about the issues involved.  Revise the tests to
cover the current behavior.
parent 05aa51a9
import logging
import sys
from transaction.interfaces import *
from transaction.txn import Transaction, Status, Set
......@@ -38,14 +39,28 @@ class AbstractTransactionManager(object):
def _finishCommit(self, txn):
self.logger.debug("%s: commit", txn)
# finish the two-phase commit
for r in txn._resources:
r.commit(txn)
txn._status = Status.COMMITTED
try:
for r in txn._resources:
r.commit(txn)
txn._status = Status.COMMITTED
except:
# An error occured during the second phase of 2PC. We can
# no longer guarantee the system is in a consistent state.
# The best we can do is abort() all the resource managers
# that haven't already committed and hope for the best.
error = sys.exc_info()
txn._status = Status.FAILED
self.abort(txn)
msg = ("Transaction failed during second phase of two-"
"phase commmit")
self.logger.critical(msg, exc_info=error)
raise TransactionError("Transaction failed during second "
"phase of two-phase commit")
def abort(self, txn):
self.logger.debug("%s: abort", txn)
assert txn._status in (Status.ACTIVE, Status.PREPARED, Status.FAILED)
assert txn._status in (Status.ACTIVE, Status.PREPARED, Status.FAILED,
Status.ABORTED)
txn._status = Status.PREPARING
for r in txn._resources:
r.abort(txn)
......
......@@ -80,7 +80,6 @@ class BaseTxnTests(unittest.TestCase):
self.assertEqual(txn.status(), Status.ABORTED)
self.assertRaises(IllegalStateError, txn.commit)
self.assertRaises(IllegalStateError, txn.savepoint)
self.assertRaises(IllegalStateError, txn.abort)
def testTrivialSavepoint(self):
txn = self.manager.begin()
......@@ -131,6 +130,14 @@ class BaseTxnTests(unittest.TestCase):
self.assertRaises(IllegalStateError, txn.commit)
txn.abort()
def testCommitFailure(self):
txn = self.manager.begin()
txn.join(TestDataManager())
txn.join(TestDataManager(fail="commit"))
self.assertRaises(TransactionError, txn.commit)
self.assertEqual(txn.status(), Status.ABORTED)
txn.abort()
class SimpleTxnTests(BaseTxnTests):
ManagerFactory = TransactionManager
......
......@@ -64,6 +64,8 @@ class Transaction:
def abort(self):
"""Rollback to initial state."""
assert self._manager is not None
if self._status == Status.ABORTED:
return
if self._status not in (Status.ACTIVE, Status.PREPARED, Status.FAILED):
raise IllegalStateError("abort", self._status)
self._manager.abort(self)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment