Commit c9fab9ff authored by Barry Warsaw's avatar Barry Warsaw

"Experimental" support for undoing past an object's creation. This

works by writing a metadata record which contains a special lrevid
(pickle pointer).  Upon loading a zombified object, naive applications
will get a KeyError as would be expected for loading a non-existant
object.

The actual exception though is ObjectDoesNotExist -- derived from
KeyError -- which contains a `revid' attribute pointing to the
revision of the object which zombified it.  Thus, by undoing this
revid, the object can be reborn to its initial state.

Specifically,

ObjectDoesNotExist: New exception, derived from KeyError.

load(): If an object's metadata has an lrevid with the special `dne'
value (64-bits full of 1's), the object is a zombie and
ObjectDoesNotExist is raised.

transactionalUndo(): If prevrevid for the revision we're undoing is
zero, it means that we're undoing the object's creation.  Instead of
raising an UndoError, write a new metadata record containing the
original record, but with lrevid == dne, and prevrevid == tid.

This change also contains initial (untested) support for pack(),
specifically,

_zaprevision(): pack doesn't happen in a transaction, so remove the
txn argument.  Add the referencesf argument, passed from the pack()
call.

pack(): Initial implementation.  Seems fairly simple really (too
simple? ;)
parent 629c7393
...@@ -4,7 +4,7 @@ See Minimal.py for an implementation of Berkeley storage that does not support ...@@ -4,7 +4,7 @@ See Minimal.py for an implementation of Berkeley storage that does not support
undo or versioning. undo or versioning.
""" """
# $Revision: 1.11 $ # $Revision: 1.12 $
__version__ = '0.1' __version__ = '0.1'
import struct import struct
...@@ -34,6 +34,14 @@ UNDOABLE_TRANSACTION = 'Y' ...@@ -34,6 +34,14 @@ UNDOABLE_TRANSACTION = 'Y'
PROTECTED_TRANSACTION = 'N' PROTECTED_TRANSACTION = 'N'
zero = '\0'*8 zero = '\0'*8
dne = '\377'*8 # does not exist
class ObjectDoesNotExist(KeyError):
def __init__(self, msg, revid):
POSException.VersionError.__init__(self, msg, revid)
self.revid = revid
...@@ -295,7 +303,6 @@ class Full(BerkeleyBase): ...@@ -295,7 +303,6 @@ class Full(BerkeleyBase):
rec = c.next_dup() rec = c.next_dup()
finally: finally:
c.close() c.close()
except: except:
# If any errors whatsoever occurred, abort the transaction with # If any errors whatsoever occurred, abort the transaction with
# Berkeley, leave the commit log file in the PROMISED state (since # Berkeley, leave the commit log file in the PROMISED state (since
...@@ -462,7 +469,7 @@ class Full(BerkeleyBase): ...@@ -462,7 +469,7 @@ class Full(BerkeleyBase):
# #
def load(self, oid, version): def load(self, oid, version):
global zero global zero, dne
# BAW: in the face of application level conflict resolution, it's # BAW: in the face of application level conflict resolution, it's
# /possible/ to load an object that is sitting in the commit log. # /possible/ to load an object that is sitting in the commit log.
# That's bogus though because there's no way to know what to return; # That's bogus though because there's no way to know what to return;
...@@ -490,6 +497,8 @@ class Full(BerkeleyBase): ...@@ -490,6 +497,8 @@ class Full(BerkeleyBase):
if vid == zero and version: if vid == zero and version:
raise POSException.VersionError( raise POSException.VersionError(
'Object not found in version: %s' % version) 'Object not found in version: %s' % version)
if lrevid == dne:
raise ObjectDoesNotExist('Object no longer exists', revid)
if vid == zero or self._versions[vid] == version: if vid == zero or self._versions[vid] == version:
return self._pickles[oid+lrevid], revid return self._pickles[oid+lrevid], revid
# Otherwise, we recognize that an object cannot be stored in more # Otherwise, we recognize that an object cannot be stored in more
...@@ -607,12 +616,13 @@ class Full(BerkeleyBase): ...@@ -607,12 +616,13 @@ class Full(BerkeleyBase):
return self._serial return self._serial
def transactionalUndo(self, tid, transaction): def transactionalUndo(self, tid, transaction):
global zero global zero, dne
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
newrevs = [] newrevs = []
oids = []
c = None c = None
self._lock_acquire() self._lock_acquire()
try: try:
...@@ -643,10 +653,13 @@ class Full(BerkeleyBase): ...@@ -643,10 +653,13 @@ class Full(BerkeleyBase):
revid = self._serials[oid] revid = self._serials[oid]
if revid == tid: if revid == tid:
# We can always undo the last transaction # We can always undo the last transaction
prevrevid = self._metadata[oid+tid][24:] vid, nvrevid, lrevid, prevrevid = struct.unpack(
'>8s8s8s8s', self._metadata[oid+tid])
if prevrevid == zero: if prevrevid == zero:
raise POSException.UndoError, 'Nothing to undo' # We're undoing the object's creation
newrevs.append((oid, self._metadata[oid+prevrevid])) newrevs.append((oid, vid+nvrevid+dne+tid))
else:
newrevs.append((oid, self._metadata[oid+prevrevid]))
else: else:
# We need to compare the lrevid (pickle pointers) of the # We need to compare the lrevid (pickle pointers) of the
# transaction previous to the current one, and the # transaction previous to the current one, and the
...@@ -668,7 +681,6 @@ class Full(BerkeleyBase): ...@@ -668,7 +681,6 @@ class Full(BerkeleyBase):
# Okay, we've checked all the objects affected by the transaction # Okay, we've checked all the objects affected by the transaction
# we're about to undo, and everything looks good. So now we'll # we're about to undo, and everything looks good. So now we'll
# write to the log the new object records we intend to commit. # write to the log the new object records we intend to commit.
oids = []
for oid, metadata in newrevs: for oid, metadata in newrevs:
vid, nvrevid, lrevid, prevrevid = struct.unpack( vid, nvrevid, lrevid, prevrevid = struct.unpack(
'>8s8s8s8s', metadata) '>8s8s8s8s', metadata)
...@@ -801,21 +813,21 @@ class Full(BerkeleyBase): ...@@ -801,21 +813,21 @@ class Full(BerkeleyBase):
tid=self._current[oid] tid=self._current[oid]
finally: self._lock_release() finally: self._lock_release()
def _zaprevision(self, key, txn): def _zaprevision(self, key, referencesf):
# Delete the metadata record pointed to by the key, decrefing the # Delete the metadata record pointed to by the key, decrefing the
# reference counts of the pickle pointed to by this record, and # reference counts of the pickle pointed to by this record, and
# perform cascading decrefs on the referenced objects. # perform cascading decrefs on the referenced objects.
# #
# We need the lrevid which points to the pickle for this revision... # We need the lrevid which points to the pickle for this revision...
vid, nvrevid, lrevid = self._metadata.get(key, txn=txn)[16:24] vid, nvrevid, lrevid = self._metadata.get(key)[16:24]
# ...and now delete the metadata record for this object revision # ...and now delete the metadata record for this object revision
self._metadata.delete(key, txn=txn) self._metadata.delete(key)
# Decref the reference count of the pickle pointed to by oid+lrevid. # Decref the reference count of the pickle pointed to by oid+lrevid.
# If the reference count goes to zero, we can garbage collect the # If the reference count goes to zero, we can garbage collect the
# pickle, and decref all the objects pointed to by the pickle (with of # pickle, and decref all the objects pointed to by the pickle (with of
# course, cascading garbage collection). # course, cascading garbage collection).
pkey = key[:8] + lrevid pkey = key[:8] + lrevid
refcount = self._pickleRefcounts.get(pkey, txn=txn) refcount = self._pickleRefcounts.get(pkey)
# It's possible the pickleRefcounts entry for this oid has already # It's possible the pickleRefcounts entry for this oid has already
# been deleted by a previous pass of _zaprevision(). If so, we're # been deleted by a previous pass of _zaprevision(). If so, we're
# done. # done.
...@@ -823,12 +835,12 @@ class Full(BerkeleyBase): ...@@ -823,12 +835,12 @@ class Full(BerkeleyBase):
return return
refcount = utils.U64(refcount) - 1 refcount = utils.U64(refcount) - 1
if refcount > 0: if refcount > 0:
self._pickleRefcounts.put(pkey, utils.p64(refcount), txn=txn) self._pickleRefcounts.put(pkey, utils.p64(refcount))
return return
# The refcount of this pickle has gone to zero, so we need to garbage # The refcount of this pickle has gone to zero, so we need to garbage
# collect it, and decref all the objects it points to. # collect it, and decref all the objects it points to.
self._pickleRefcounts.delete(pkey, txn=txn) self._pickleRefcounts.delete(pkey)
pickle = self._pickles.get(pkey, txn=txn) pickle = self._pickles.get(pkey)
# Sniff the pickle to get the objects it refers to # Sniff the pickle to get the objects it refers to
collectables = [] collectables = []
refoids = [] refoids = []
...@@ -837,10 +849,10 @@ class Full(BerkeleyBase): ...@@ -837,10 +849,10 @@ class Full(BerkeleyBase):
# goes to zero, remember the oid so we can recursively zap its # goes to zero, remember the oid so we can recursively zap its
# metadata too. # metadata too.
for oid in refoids: for oid in refoids:
refcount = self._refcounts.get(oid, txn=txn) refcount = self._refcounts.get(oid)
refcount = utils.U64(refcount) - 1 refcount = utils.U64(refcount) - 1
if refcount > 0: if refcount > 0:
self._refcounts.put(oid, utils.p64(refcount), txn=txn) self._refcounts.put(oid, utils.p64(refcount))
else: else:
collectables.append(oid) collectables.append(oid)
# Now for all objects whose refcounts just went to zero, we want to # Now for all objects whose refcounts just went to zero, we want to
...@@ -850,8 +862,8 @@ class Full(BerkeleyBase): ...@@ -850,8 +862,8 @@ class Full(BerkeleyBase):
# in the most efficient manner possible. # in the most efficient manner possible.
tids = [] tids = []
for oid in collectables: for oid in collectables:
self._serials.delete(oid, txn=txn) self._serials.delete(oid)
self._refcounts.delete(oid, txn=txn) self._refcounts.delete(oid)
# To delete all the metadata records associated with this object # To delete all the metadata records associated with this object
# id, we use a trick of Berkeley cursor objects to only partially # id, we use a trick of Berkeley cursor objects to only partially
# specify the key. This works because keys are compared # specify the key. This works because keys are compared
...@@ -867,13 +879,13 @@ class Full(BerkeleyBase): ...@@ -867,13 +879,13 @@ class Full(BerkeleyBase):
# it is impossible to pack current records (and hence # it is impossible to pack current records (and hence
# currentVersions). # currentVersions).
tids.append(rec[0][8:]) # second 1/2 of the key tids.append(rec[0][8:]) # second 1/2 of the key
self._zaprevision(rec[0], txn) self._zaprevision(rec[0], referencesf)
rec = c.next() rec = c.next()
finally: finally:
c.close() c.close()
# Delete all the txnoids entries that referenced this oid # Delete all the txnoids entries that referenced this oid
for tid in tids: for tid in tids:
c = self._txnoids.cursor(txn=txn) c = self._txnoids.cursor()
try: try:
rec = c.set_both(tid, oid) rec = c.set_both(tid, oid)
while rec: while rec:
...@@ -885,11 +897,33 @@ class Full(BerkeleyBase): ...@@ -885,11 +897,33 @@ class Full(BerkeleyBase):
c.close() c.close()
def pack(self, t, referencesf): def pack(self, t, referencesf):
# FIXME
self._lock_acquire() self._lock_acquire()
c = None
tidmarks = {}
try: try:
pass # Figure out when to pack to. We happen to know that our
finally: self._lock_release() # transaction ids are really timestamps.
t0 = TimeStamp(t)
c = self._txnoids.cursor()
rec = c.set(`t0`)
while rec:
tid, oid = rec
rec = c.prev_dup()
# We need to mark this transaction as having participated in a
# pack, so that undo will not create a temporal anomaly.
if not tidmarks.has_key(tid):
meta = self._txnMetadata[tid]
self._txnMetadata[tid] = PROTECTED_TRANSACTION + meta[1:]
tidmarks[tid] = 1
# Find out if the oid is current, if so skip it. The oid
# record could be missing from serials if it's already been
# garbage collected.
revid = self._serials.get(oid)
if revid in (None, tid):
continue
self._zaprevision(oid+revid, referencesf)
finally:
self._lock_release()
# Other interface assertions # Other interface assertions
def supportsUndo(self): def supportsUndo(self):
......
...@@ -4,7 +4,7 @@ See Minimal.py for an implementation of Berkeley storage that does not support ...@@ -4,7 +4,7 @@ See Minimal.py for an implementation of Berkeley storage that does not support
undo or versioning. undo or versioning.
""" """
# $Revision: 1.11 $ # $Revision: 1.12 $
__version__ = '0.1' __version__ = '0.1'
import struct import struct
...@@ -34,6 +34,14 @@ UNDOABLE_TRANSACTION = 'Y' ...@@ -34,6 +34,14 @@ UNDOABLE_TRANSACTION = 'Y'
PROTECTED_TRANSACTION = 'N' PROTECTED_TRANSACTION = 'N'
zero = '\0'*8 zero = '\0'*8
dne = '\377'*8 # does not exist
class ObjectDoesNotExist(KeyError):
def __init__(self, msg, revid):
POSException.VersionError.__init__(self, msg, revid)
self.revid = revid
...@@ -295,7 +303,6 @@ class Full(BerkeleyBase): ...@@ -295,7 +303,6 @@ class Full(BerkeleyBase):
rec = c.next_dup() rec = c.next_dup()
finally: finally:
c.close() c.close()
except: except:
# If any errors whatsoever occurred, abort the transaction with # If any errors whatsoever occurred, abort the transaction with
# Berkeley, leave the commit log file in the PROMISED state (since # Berkeley, leave the commit log file in the PROMISED state (since
...@@ -462,7 +469,7 @@ class Full(BerkeleyBase): ...@@ -462,7 +469,7 @@ class Full(BerkeleyBase):
# #
def load(self, oid, version): def load(self, oid, version):
global zero global zero, dne
# BAW: in the face of application level conflict resolution, it's # BAW: in the face of application level conflict resolution, it's
# /possible/ to load an object that is sitting in the commit log. # /possible/ to load an object that is sitting in the commit log.
# That's bogus though because there's no way to know what to return; # That's bogus though because there's no way to know what to return;
...@@ -490,6 +497,8 @@ class Full(BerkeleyBase): ...@@ -490,6 +497,8 @@ class Full(BerkeleyBase):
if vid == zero and version: if vid == zero and version:
raise POSException.VersionError( raise POSException.VersionError(
'Object not found in version: %s' % version) 'Object not found in version: %s' % version)
if lrevid == dne:
raise ObjectDoesNotExist('Object no longer exists', revid)
if vid == zero or self._versions[vid] == version: if vid == zero or self._versions[vid] == version:
return self._pickles[oid+lrevid], revid return self._pickles[oid+lrevid], revid
# Otherwise, we recognize that an object cannot be stored in more # Otherwise, we recognize that an object cannot be stored in more
...@@ -607,12 +616,13 @@ class Full(BerkeleyBase): ...@@ -607,12 +616,13 @@ class Full(BerkeleyBase):
return self._serial return self._serial
def transactionalUndo(self, tid, transaction): def transactionalUndo(self, tid, transaction):
global zero global zero, dne
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
newrevs = [] newrevs = []
oids = []
c = None c = None
self._lock_acquire() self._lock_acquire()
try: try:
...@@ -643,10 +653,13 @@ class Full(BerkeleyBase): ...@@ -643,10 +653,13 @@ class Full(BerkeleyBase):
revid = self._serials[oid] revid = self._serials[oid]
if revid == tid: if revid == tid:
# We can always undo the last transaction # We can always undo the last transaction
prevrevid = self._metadata[oid+tid][24:] vid, nvrevid, lrevid, prevrevid = struct.unpack(
'>8s8s8s8s', self._metadata[oid+tid])
if prevrevid == zero: if prevrevid == zero:
raise POSException.UndoError, 'Nothing to undo' # We're undoing the object's creation
newrevs.append((oid, self._metadata[oid+prevrevid])) newrevs.append((oid, vid+nvrevid+dne+tid))
else:
newrevs.append((oid, self._metadata[oid+prevrevid]))
else: else:
# We need to compare the lrevid (pickle pointers) of the # We need to compare the lrevid (pickle pointers) of the
# transaction previous to the current one, and the # transaction previous to the current one, and the
...@@ -668,7 +681,6 @@ class Full(BerkeleyBase): ...@@ -668,7 +681,6 @@ class Full(BerkeleyBase):
# Okay, we've checked all the objects affected by the transaction # Okay, we've checked all the objects affected by the transaction
# we're about to undo, and everything looks good. So now we'll # we're about to undo, and everything looks good. So now we'll
# write to the log the new object records we intend to commit. # write to the log the new object records we intend to commit.
oids = []
for oid, metadata in newrevs: for oid, metadata in newrevs:
vid, nvrevid, lrevid, prevrevid = struct.unpack( vid, nvrevid, lrevid, prevrevid = struct.unpack(
'>8s8s8s8s', metadata) '>8s8s8s8s', metadata)
...@@ -801,21 +813,21 @@ class Full(BerkeleyBase): ...@@ -801,21 +813,21 @@ class Full(BerkeleyBase):
tid=self._current[oid] tid=self._current[oid]
finally: self._lock_release() finally: self._lock_release()
def _zaprevision(self, key, txn): def _zaprevision(self, key, referencesf):
# Delete the metadata record pointed to by the key, decrefing the # Delete the metadata record pointed to by the key, decrefing the
# reference counts of the pickle pointed to by this record, and # reference counts of the pickle pointed to by this record, and
# perform cascading decrefs on the referenced objects. # perform cascading decrefs on the referenced objects.
# #
# We need the lrevid which points to the pickle for this revision... # We need the lrevid which points to the pickle for this revision...
vid, nvrevid, lrevid = self._metadata.get(key, txn=txn)[16:24] vid, nvrevid, lrevid = self._metadata.get(key)[16:24]
# ...and now delete the metadata record for this object revision # ...and now delete the metadata record for this object revision
self._metadata.delete(key, txn=txn) self._metadata.delete(key)
# Decref the reference count of the pickle pointed to by oid+lrevid. # Decref the reference count of the pickle pointed to by oid+lrevid.
# If the reference count goes to zero, we can garbage collect the # If the reference count goes to zero, we can garbage collect the
# pickle, and decref all the objects pointed to by the pickle (with of # pickle, and decref all the objects pointed to by the pickle (with of
# course, cascading garbage collection). # course, cascading garbage collection).
pkey = key[:8] + lrevid pkey = key[:8] + lrevid
refcount = self._pickleRefcounts.get(pkey, txn=txn) refcount = self._pickleRefcounts.get(pkey)
# It's possible the pickleRefcounts entry for this oid has already # It's possible the pickleRefcounts entry for this oid has already
# been deleted by a previous pass of _zaprevision(). If so, we're # been deleted by a previous pass of _zaprevision(). If so, we're
# done. # done.
...@@ -823,12 +835,12 @@ class Full(BerkeleyBase): ...@@ -823,12 +835,12 @@ class Full(BerkeleyBase):
return return
refcount = utils.U64(refcount) - 1 refcount = utils.U64(refcount) - 1
if refcount > 0: if refcount > 0:
self._pickleRefcounts.put(pkey, utils.p64(refcount), txn=txn) self._pickleRefcounts.put(pkey, utils.p64(refcount))
return return
# The refcount of this pickle has gone to zero, so we need to garbage # The refcount of this pickle has gone to zero, so we need to garbage
# collect it, and decref all the objects it points to. # collect it, and decref all the objects it points to.
self._pickleRefcounts.delete(pkey, txn=txn) self._pickleRefcounts.delete(pkey)
pickle = self._pickles.get(pkey, txn=txn) pickle = self._pickles.get(pkey)
# Sniff the pickle to get the objects it refers to # Sniff the pickle to get the objects it refers to
collectables = [] collectables = []
refoids = [] refoids = []
...@@ -837,10 +849,10 @@ class Full(BerkeleyBase): ...@@ -837,10 +849,10 @@ class Full(BerkeleyBase):
# goes to zero, remember the oid so we can recursively zap its # goes to zero, remember the oid so we can recursively zap its
# metadata too. # metadata too.
for oid in refoids: for oid in refoids:
refcount = self._refcounts.get(oid, txn=txn) refcount = self._refcounts.get(oid)
refcount = utils.U64(refcount) - 1 refcount = utils.U64(refcount) - 1
if refcount > 0: if refcount > 0:
self._refcounts.put(oid, utils.p64(refcount), txn=txn) self._refcounts.put(oid, utils.p64(refcount))
else: else:
collectables.append(oid) collectables.append(oid)
# Now for all objects whose refcounts just went to zero, we want to # Now for all objects whose refcounts just went to zero, we want to
...@@ -850,8 +862,8 @@ class Full(BerkeleyBase): ...@@ -850,8 +862,8 @@ class Full(BerkeleyBase):
# in the most efficient manner possible. # in the most efficient manner possible.
tids = [] tids = []
for oid in collectables: for oid in collectables:
self._serials.delete(oid, txn=txn) self._serials.delete(oid)
self._refcounts.delete(oid, txn=txn) self._refcounts.delete(oid)
# To delete all the metadata records associated with this object # To delete all the metadata records associated with this object
# id, we use a trick of Berkeley cursor objects to only partially # id, we use a trick of Berkeley cursor objects to only partially
# specify the key. This works because keys are compared # specify the key. This works because keys are compared
...@@ -867,13 +879,13 @@ class Full(BerkeleyBase): ...@@ -867,13 +879,13 @@ class Full(BerkeleyBase):
# it is impossible to pack current records (and hence # it is impossible to pack current records (and hence
# currentVersions). # currentVersions).
tids.append(rec[0][8:]) # second 1/2 of the key tids.append(rec[0][8:]) # second 1/2 of the key
self._zaprevision(rec[0], txn) self._zaprevision(rec[0], referencesf)
rec = c.next() rec = c.next()
finally: finally:
c.close() c.close()
# Delete all the txnoids entries that referenced this oid # Delete all the txnoids entries that referenced this oid
for tid in tids: for tid in tids:
c = self._txnoids.cursor(txn=txn) c = self._txnoids.cursor()
try: try:
rec = c.set_both(tid, oid) rec = c.set_both(tid, oid)
while rec: while rec:
...@@ -885,11 +897,33 @@ class Full(BerkeleyBase): ...@@ -885,11 +897,33 @@ class Full(BerkeleyBase):
c.close() c.close()
def pack(self, t, referencesf): def pack(self, t, referencesf):
# FIXME
self._lock_acquire() self._lock_acquire()
c = None
tidmarks = {}
try: try:
pass # Figure out when to pack to. We happen to know that our
finally: self._lock_release() # transaction ids are really timestamps.
t0 = TimeStamp(t)
c = self._txnoids.cursor()
rec = c.set(`t0`)
while rec:
tid, oid = rec
rec = c.prev_dup()
# We need to mark this transaction as having participated in a
# pack, so that undo will not create a temporal anomaly.
if not tidmarks.has_key(tid):
meta = self._txnMetadata[tid]
self._txnMetadata[tid] = PROTECTED_TRANSACTION + meta[1:]
tidmarks[tid] = 1
# Find out if the oid is current, if so skip it. The oid
# record could be missing from serials if it's already been
# garbage collected.
revid = self._serials.get(oid)
if revid in (None, tid):
continue
self._zaprevision(oid+revid, referencesf)
finally:
self._lock_release()
# Other interface assertions # Other interface assertions
def supportsUndo(self): def supportsUndo(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment