Commit e3f4fb77 authored by Tim Peters's avatar Tim Peters

Partial savepoint review.

Added more comments.  Changed some comments to English.
Renamed some attributes and methods for clarity.
Switched to using a Python WeakKeyDictionary instead of
rolling our own out of a Python dict and raw weakrefs.
parent 4fee6a4b
...@@ -331,7 +331,6 @@ class Connection(ExportImport, object): ...@@ -331,7 +331,6 @@ class Connection(ExportImport, object):
# they've been unadded. This will make the code in _abort # they've been unadded. This will make the code in _abort
# confused. # confused.
self._abort() self._abort()
if self._savepoint_storage is not None: if self._savepoint_storage is not None:
...@@ -353,9 +352,9 @@ class Connection(ExportImport, object): ...@@ -353,9 +352,9 @@ class Connection(ExportImport, object):
# Note: If we invalidate a non-ghostifiable object # Note: If we invalidate a non-ghostifiable object
# (i.e. a persistent class), the object will # (i.e. a persistent class), the object will
# immediately reread it's state. That means that the # immediately reread its state. That means that the
# following call could result in a call to # following call could result in a call to
# self.setstate, which, of course, must suceed. # self.setstate, which, of course, must succeed.
# In general, it would be better if the read could be # In general, it would be better if the read could be
# delayed until the start of the next transaction. If # delayed until the start of the next transaction. If
# we read at the end of a transaction and if the # we read at the end of a transaction and if the
...@@ -365,7 +364,7 @@ class Connection(ExportImport, object): ...@@ -365,7 +364,7 @@ class Connection(ExportImport, object):
# can only delay the read if this abort corresponds to # can only delay the read if this abort corresponds to
# a top-level-transaction abort. We can't tell if # a top-level-transaction abort. We can't tell if
# this is a top-level-transaction abort, so we have to # this is a top-level-transaction abort, so we have to
# go ahead and invalidate now. Fortunately, it's # go ahead and invalidate now. Fortunately, it's
# pretty unlikely that the object we are invalidating # pretty unlikely that the object we are invalidating
# was invalidated by another thread, so the risk of a # was invalidated by another thread, so the risk of a
# reread is pretty low. # reread is pretty low.
...@@ -383,21 +382,20 @@ class Connection(ExportImport, object): ...@@ -383,21 +382,20 @@ class Connection(ExportImport, object):
def _flush_invalidations(self): def _flush_invalidations(self):
self._inv_lock.acquire() self._inv_lock.acquire()
try: try:
# Non-ghostifiable objects may need to read when they are # Non-ghostifiable objects may need to read when they are
# invalidated, so, we'll quickly just replace the # invalidated, so we'll quickly just replace the
# invalidating dict with a new one. We'll then process # invalidating dict with a new one. We'll then process
# the invalidations after freeing the lock *and* after # the invalidations after freeing the lock *and* after
# reseting the time. This means that invalidations will # resetting the time. This means that invalidations will
# happen after the start of the transactions. They are # happen after the start of the transactions. They are
# subject to conflict errors and to reading old data, # subject to conflict errors and to reading old data.
# TODO: There is a potential problem lurking for persistent # TODO: There is a potential problem lurking for persistent
# classes. Suppose we have an invlidation of a persistent # classes. Suppose we have an invalidation of a persistent
# class and of an instance. If the instance is # class and of an instance. If the instance is
# invalidated first and if the invalidation logic uses # invalidated first and if the invalidation logic uses
# data read from the class, then the invalidation could # data read from the class, then the invalidation could
# be performed with state data. Or, suppose that there # be performed with stale data. Or, suppose that there
# are instances of the class that are freed as a result of # are instances of the class that are freed as a result of
# invalidating some object. Perhaps code in their __del__ # invalidating some object. Perhaps code in their __del__
# uses class data. Really, the only way to properly fix # uses class data. Really, the only way to properly fix
...@@ -407,10 +405,10 @@ class Connection(ExportImport, object): ...@@ -407,10 +405,10 @@ class Connection(ExportImport, object):
# much worse than that though, because we'd also need to # much worse than that though, because we'd also need to
# deal with slots. When a class is ghostified, we'd need # deal with slots. When a class is ghostified, we'd need
# to replace all of the slot operations with versions that # to replace all of the slot operations with versions that
# reloaded the object when caled. It's hard to say which # reloaded the object when called. It's hard to say which
# is better for worse. For now, it seems the risk of # is better or worse. For now, it seems the risk of
# using a class while objects are being invalidated seems # using a class while objects are being invalidated seems
# small enough t be acceptable. # small enough to be acceptable.
invalidated = self._invalidated invalidated = self._invalidated
self._invalidated = {} self._invalidated = {}
......
...@@ -30,7 +30,7 @@ registers its _p_jar attribute. TODO: explain adapter ...@@ -30,7 +30,7 @@ registers its _p_jar attribute. TODO: explain adapter
Subtransactions Subtransactions
--------------- ---------------
Note: Suntransactions are deprecated! Note: Suntransactions are deprecated! Use savepoint/rollback instead.
A subtransaction applies the transaction notion recursively. It A subtransaction applies the transaction notion recursively. It
allows a set of modifications within a transaction to be committed or allows a set of modifications within a transaction to be committed or
...@@ -189,17 +189,20 @@ class Transaction(object): ...@@ -189,17 +189,20 @@ class Transaction(object):
interfaces.ITransactionDeprecated) interfaces.ITransactionDeprecated)
# Assign an index to each savepoint so we can invalidate # Assign an index to each savepoint so we can invalidate later savepoints
# later savepoints on rollback # on rollback. The first index assigned is 1, and it goes up by 1 each
# time.
_savepoint_index = 0 _savepoint_index = 0
# If savepoints are used, keep a weak key dict of them # If savepoints are used, keep a weak key dict of them. This maps a
_savepoints = None # savepoint to its index (see above).
_savepoint2index = None
# Remamber the savepoint for the last subtransaction # Remember the savepoint for the last subtransaction.
_subtransaction_savepoint = None _subtransaction_savepoint = None
# Meta data # Meta data. ._extension is also metadata, but is initialized to an
# emtpy dict in __init__.
user = "" user = ""
description = "" description = ""
...@@ -267,26 +270,21 @@ class Transaction(object): ...@@ -267,26 +270,21 @@ class Transaction(object):
resource = DataManagerAdapter(resource) resource = DataManagerAdapter(resource)
self._resources.append(resource) self._resources.append(resource)
if self._savepoint2index:
if self._savepoints:
# A data manager has joined a transaction *after* a savepoint # A data manager has joined a transaction *after* a savepoint
# was created. A couple of things are different in this case: # was created. A couple of things are different in this case:
#
# 1. We need to add it's savepoint to all previous savepoints. # 1. We need to add its savepoint to all previous savepoints.
# so that if they are rolled back, we roll this was back too. # so that if they are rolled back, we roll this one back too.
#
# 2. We don't actualy need to ask it for a savepoint. # 2. We don't actually need to ask the data manager for a
# Because is just joining. We can just abort it to roll # savepoint: because it's just joining, we can just abort it to
# back to the current state, so we simply use and # roll back to the current state, so we simply use an
# AbortSavepoint. # AbortSavepoint.
datamanager_savepoint = AbortSavepoint(resource, self) datamanager_savepoint = AbortSavepoint(resource, self)
for ref in self._savepoints: for transaction_savepoint in self._savepoint2index.keys():
transaction_savepoint = ref() transaction_savepoint._savepoints.append(
if transaction_savepoint is not None: datamanager_savepoint)
transaction_savepoint._savepoints.append(
datamanager_savepoint)
def savepoint(self, optimistic=False): def savepoint(self, optimistic=False):
if self.status is Status.COMMITFAILED: if self.status is Status.COMMITFAILED:
...@@ -298,43 +296,30 @@ class Transaction(object): ...@@ -298,43 +296,30 @@ class Transaction(object):
self._cleanup(self._resources) self._cleanup(self._resources)
self._saveCommitishError() # reraises! self._saveCommitishError() # reraises!
if self._savepoint2index is None:
self._savepoint2index = weakref.WeakKeyDictionary()
self._savepoint_index += 1 self._savepoint_index += 1
ref = weakref.ref(savepoint, self._remove_savepoint_ref) self._savepoint2index[savepoint] = self._savepoint_index
if self._savepoints is None:
self._savepoints = {}
self._savepoints[ref] = self._savepoint_index
return savepoint return savepoint
def _remove_savepoint_ref(self, ref): # Remove `savepoint` from _savepoint2index, and also remove and invalidate
try: # all savepoints we know about with an index larger than `savepoint`'s.
del self._savepoints[ref] # This is what's needed when a rollback _to_ `savepoint` is done.
except KeyError: def _remove_and_invalidate_after(self, savepoint):
pass savepoint2index = self._savepoint2index
index = savepoint2index.pop(savepoint)
def _invalidate_next(self, savepoint):
savepoints = self._savepoints
ref = weakref.ref(savepoint)
index = savepoints[ref]
del savepoints[ref]
# use items to make copy to avoid mutating while iterating # use items to make copy to avoid mutating while iterating
for ref, i in savepoints.items(): for savepoint, i in savepoint2index.items():
if i > index: if i > index:
savepoint = ref()
if savepoint is not None:
savepoint.transaction = None # invalidate
del savepoints[ref]
def _invalidate_savepoints(self):
savepoints = self._savepoints
for ref in savepoints:
savepoint = ref()
if savepoint is not None:
savepoint.transaction = None # invalidate savepoint.transaction = None # invalidate
del savepoint2index[savepoint]
savepoints.clear() # Invalidate and forget about all savepoints.
def _invalidate_all_savepoints(self):
for savepoint in self._savepoint2index.keys():
savepoint.transaction = None # invalidate
self._savepoint2index.clear()
def register(self, obj): def register(self, obj):
...@@ -375,8 +360,8 @@ class Transaction(object): ...@@ -375,8 +360,8 @@ class Transaction(object):
def commit(self, subtransaction=False): def commit(self, subtransaction=False):
if self._savepoints: if self._savepoint2index:
self._invalidate_savepoints() self._invalidate_all_savepoints()
if subtransaction: if subtransaction:
# TODO deprecate subtransactions # TODO deprecate subtransactions
...@@ -492,8 +477,8 @@ class Transaction(object): ...@@ -492,8 +477,8 @@ class Transaction(object):
self._subtransaction_savepoint.rollback() self._subtransaction_savepoint.rollback()
return return
if self._savepoints: if self._savepoint2index:
self._invalidate_savepoints() self._invalidate_all_savepoints()
self._synchronizers.map(lambda s: s.beforeCompletion(self)) self._synchronizers.map(lambda s: s.beforeCompletion(self))
...@@ -647,11 +632,10 @@ class DataManagerAdapter(object): ...@@ -647,11 +632,10 @@ class DataManagerAdapter(object):
return self._datamanager.sortKey() return self._datamanager.sortKey()
class Savepoint: class Savepoint:
"""Transaction savepoint """Transaction savepoint.
Transaction savepoints coordinate savepoints for data managers Transaction savepoints coordinate savepoints for data managers
participating in a transaction. participating in a transaction.
""" """
interface.implements(interfaces.ISavepoint) interface.implements(interfaces.ISavepoint)
...@@ -678,7 +662,7 @@ class Savepoint: ...@@ -678,7 +662,7 @@ class Savepoint:
if transaction is None: if transaction is None:
raise interfaces.InvalidSavepointRollbackError raise interfaces.InvalidSavepointRollbackError
self.transaction = None self.transaction = None
transaction._invalidate_next(self) transaction._remove_and_invalidate_after(self)
try: try:
for savepoint in self._savepoints: for savepoint in self._savepoints:
......
...@@ -24,7 +24,7 @@ support within the transaction system. This data manager is very ...@@ -24,7 +24,7 @@ support within the transaction system. This data manager is very
simple. It provides flat storage of named immutable values, like strings simple. It provides flat storage of named immutable values, like strings
and numbers. and numbers.
>>> import transaction.tests.savepointsample >>> import transaction.tests.savepointsample
>>> dm = transaction.tests.savepointsample.SampleSavepointDataManager() >>> dm = transaction.tests.savepointsample.SampleSavepointDataManager()
>>> dm['name'] = 'bob' >>> dm['name'] = 'bob'
...@@ -49,7 +49,7 @@ It allows deposits and debits to be entered for multiple people. ...@@ -49,7 +49,7 @@ It allows deposits and debits to be entered for multiple people.
It accepts a sequence of entries and generates a sequence of status It accepts a sequence of entries and generates a sequence of status
messages. For each entry, it applies the change and then validates messages. For each entry, it applies the change and then validates
the user's account. If the user's account is invalid, we role back the user's account. If the user's account is invalid, we role back
the change for that entry. The success or failure of an entry is the change for that entry. The success or failure of an entry is
indicated in the output status. First we'll initialize some accounts: indicated in the output status. First we'll initialize some accounts:
>>> dm['bob-balance'] = 0.0 >>> dm['bob-balance'] = 0.0
...@@ -65,7 +65,7 @@ Now, we'll define a validation function to validate an account: ...@@ -65,7 +65,7 @@ Now, we'll define a validation function to validate an account:
... raise ValueError('Overdrawn', name) ... raise ValueError('Overdrawn', name)
And a function to apply entries. If the function fails in some And a function to apply entries. If the function fails in some
unexpected way, it rolls back all of it's changes and unexpected way, it rolls back all of it's changes and
prints the error: prints the error:
>>> def apply_entries(entries): >>> def apply_entries(entries):
...@@ -107,7 +107,7 @@ Now let's try applying some entries: ...@@ -107,7 +107,7 @@ Now let's try applying some entries:
>>> dm['sally-balance'] >>> dm['sally-balance']
-80.0 -80.0
If we give provide entries that cause an unexpected error: If we give provide entries that cause an unexpected error:
>>> apply_entries([ >>> apply_entries([
...@@ -120,7 +120,7 @@ If we give provide entries that cause an unexpected error: ...@@ -120,7 +120,7 @@ If we give provide entries that cause an unexpected error:
Updated sally Updated sally
Unexpected exception unsupported operand type(s) for +=: 'float' and 'str' Unexpected exception unsupported operand type(s) for +=: 'float' and 'str'
Because the apply_entries used a savepoint for the entire function, Because the apply_entries used a savepoint for the entire function,
it was able to rollback the partial changes without rolling back it was able to rollback the partial changes without rolling back
changes made in the previous call to apply_entries: changes made in the previous call to apply_entries:
...@@ -199,7 +199,7 @@ support savepoints: ...@@ -199,7 +199,7 @@ support savepoints:
TypeError: ('Savepoints unsupported', {'name': 'bob'}) TypeError: ('Savepoints unsupported', {'name': 'bob'})
>>> transaction.abort() >>> transaction.abort()
However, a flag can be passed to the transaction savepoint method to However, a flag can be passed to the transaction savepoint method to
indicate that databases without savepoint support should be tolerated indicate that databases without savepoint support should be tolerated
until a savepoint is roled back. This allows transactions to proceed until a savepoint is roled back. This allows transactions to proceed
...@@ -259,11 +259,11 @@ with a data manager that didn't support savepoints: ...@@ -259,11 +259,11 @@ with a data manager that didn't support savepoints:
... ...
TypeError: ('Savepoints unsupported', {'name': 'sue'}) TypeError: ('Savepoints unsupported', {'name': 'sue'})
<BLANKLINE> <BLANKLINE>
>>> transaction.abort() >>> transaction.abort()
After clearing the transaction with an abort, we can get on with new After clearing the transaction with an abort, we can get on with new
transactions: transactions:
>>> dm_no_sp['name'] = 'sally' >>> dm_no_sp['name'] = 'sally'
>>> dm['name'] = 'sally' >>> dm['name'] = 'sally'
...@@ -272,4 +272,4 @@ transactions: ...@@ -272,4 +272,4 @@ transactions:
'sally' 'sally'
>>> dm['name'] >>> dm['name']
'sally' 'sally'
...@@ -29,10 +29,10 @@ class SampleDataManager(UserDict.DictMixin): ...@@ -29,10 +29,10 @@ class SampleDataManager(UserDict.DictMixin):
This data manager stores named simple values, like strings and numbers. This data manager stores named simple values, like strings and numbers.
""" """
interface.implements(transaction.interfaces.IDataManager) interface.implements(transaction.interfaces.IDataManager)
def __init__(self, transaction_manager = None): def __init__(self, transaction_manager=None):
if transaction_manager is None: if transaction_manager is None:
# Use the thread-local transaction manager if none is provided: # Use the thread-local transaction manager if none is provided:
transaction_manager = transaction.manager transaction_manager = transaction.manager
...@@ -43,7 +43,7 @@ class SampleDataManager(UserDict.DictMixin): ...@@ -43,7 +43,7 @@ class SampleDataManager(UserDict.DictMixin):
self.uncommitted = self.committed.copy() self.uncommitted = self.committed.copy()
# Our transaction state: # Our transaction state:
#
# If our uncommitted data is modified, we'll join a transaction # If our uncommitted data is modified, we'll join a transaction
# and keep track of the transaction we joined. Any commit # and keep track of the transaction we joined. Any commit
# related messages we get should be for this same transaction # related messages we get should be for this same transaction
...@@ -56,14 +56,14 @@ class SampleDataManager(UserDict.DictMixin): ...@@ -56,14 +56,14 @@ class SampleDataManager(UserDict.DictMixin):
####################################################################### #######################################################################
# Provide a mapping interface to uncommitted data. We provide # Provide a mapping interface to uncommitted data. We provide
# a basic subset of the interface. DictMixin does the rest. # a basic subset of the interface. DictMixin does the rest.
def __getitem__(self, name): def __getitem__(self, name):
return self.uncommitted[name] return self.uncommitted[name]
def __setitem__(self, name, value): def __setitem__(self, name, value):
self._join() # join the current transaction, if we haven't already self._join() # join the current transaction, if we haven't already
self.uncommitted[name] = value self.uncommitted[name] = value
def __delitem__(self, name): def __delitem__(self, name):
self._join() # join the current transaction, if we haven't already self._join() # join the current transaction, if we haven't already
del self.uncommitted[name] del self.uncommitted[name]
...@@ -126,13 +126,13 @@ class SampleDataManager(UserDict.DictMixin): ...@@ -126,13 +126,13 @@ class SampleDataManager(UserDict.DictMixin):
assert self.tpc_phase == 2, "Must be called in second phase of tpc" assert self.tpc_phase == 2, "Must be called in second phase of tpc"
self.committed = self.uncommitted.copy() self.committed = self.uncommitted.copy()
self._resetTransaction() self._resetTransaction()
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
assert transaction is self.transaction, "Must not change transactions" assert transaction is self.transaction, "Must not change transactions"
assert self.tpc_phase is not None, "Must be called inside of tpc" assert self.tpc_phase is not None, "Must be called inside of tpc"
self.uncommitted = self.committed.copy() self.uncommitted = self.committed.copy()
self._resetTransaction() self._resetTransaction()
# #
####################################################################### #######################################################################
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment