Commit 65033185 authored by Barry Warsaw's avatar Barry Warsaw

With apologies to future readers of this log entry:

    Woo boy, lots 'o changes.

This represents my current cut at the updated implementation of full
Berkeley storage.  It is as yet untested, it is being checked in
mainly to sync and checkpoint to CVS.
parent 3ec68ae5
from base import Base """Berkeley storage with full undo and versioning support.
See Minimal.py for an implementation of Berkeley storage that does not support
undo or versioning.
"""
# $Revision: 1.4 $
__version__ = '0.1'
import struct
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net
from bsddb3 import db from bsddb3 import db
from struct import pack, unpack
import os, tempfile, string, marshal
from ZODB import POSException, utils
from marshal import dump, load
class Full(Base): from ZODB import POSException
from ZODB import utils
def _setupDbs(self): from ZODB.referencesf import referencesf
# Supports Base framework from ZODB import TimeStamp
self._index=self._setupDB('current')
for name in ( # BerkeleyBase.BerkeleyBase class provides some common functionality for both
'pickle', 'record', 'transactions', 'vids', 'versions', # the Full and Minimal implementations. It in turn inherits from
'referenceCount', 'pickleReferenceCount', # ZODB.BaseStorage.BaseStorage which itself provides some common storage
): # functionality.
self._setupDB(name) from BerkeleyBase import BerkeleyBase
from CommitLog import FullLog
self._setupDB('currentVersions', flags=db.DB_DUP)
self._setupDB('transaction_oids', flags=db.DB_DUP) # Flags for transaction status in the transaction metadata table. You can
self._setupDB('references', flags=db.DB_DUP) # only undo back to the last pack, and any transactions before the pack time
# get marked with the PROTECTED_TRANSACTION flag. An attempt to undo past a
c=self._vids.cursor() # PROTECTED_TRANSACTION will raise an POSException.UndoError. By default,
v=c.get(db.DB_LAST) # transactions are marked with the UNDOABLE_TRANSACTION status flag.
if v: self._vid=utils.U64(v[0]) UNDOABLE_TRANSACTION = 'Y'
else: self._vid=0L PROTECTED_TRANSACTION = 'N'
class InternalInconsistencyError(POSException.POSError, AssertError):
"""Raised when we detect an internal inconsistency in our tables."""
class Full(BerkeleyBase):
#
# Overrides of base class methods
#
def _setupDBs(self):
# Data Type Assumptions:
#
# object ids (oid) are 8-bytes
# object serial numbers are 8-bytes
# transaction ids (tid) are 8-bytes
# revision ids (revid) are the same as transaction ids, just used in a
# different context.
# version ids (vid) are 8-bytes
# data pickles are of arbitrary length
#
# Create the tables used to maintain the relevant information. The
# full storage needs a bunch of tables. These two are defined by the
# base class infrastructure and are shared by the Minimal
# implementation.
#
# serials -- {oid -> serial}
# Maps oids to object serial numbers. The serial number is
# essentially a timestamp used to determine if conflicts have
# arisen, and serial numbers double as transaction ids and object
# revision ids. If an attempt is made to store an object with a
# serial number that is different than the current serial number
# for the object, a ConflictError is raised.
#
# pickles -- {(oid+revid) -> pickle}
# Maps the concrete object referenced by oid+revid to that
# object's data pickle.
#
# These are used only by the Full implementation.
#
# vids -- {version_string -> vid}
# Maps version strings (which are arbitrary) to vids.
#
# versions -- {vid -> version_string}
# Maps vids to version strings.
#
# currentVersions -- {vid -> [oid]}
# Maps vids to the oids of the objects modified in that version
# for all current versions (except the 0th version, which is the
# non-version).
#
# metadata -- {oid+revid -> vid+nvrevid+lrevid+previd}
# Maps oid+revid to object metadata. This mapping is used to find
# other information about a particular concrete object revision.
# Essentially it stitches all the other pieces together.
#
# vid is the version id for the concrete object revision, and will
# be zero if the object isn't living in a version.
#
# nvrevid is the revision id pointing to the most current
# non-version concrete object revision. So, if the object is
# living in a version and that version is aborted, the nvrevid
# points to the object revision that will soon be restored.
# nvrevid will be zero if the object was never modified in a
# version.
#
# lrevid is the revision id pointing to object revision's pickle
# state (I think of it as the "live revision id" since it's the
# state that gives life to the concrete object described by this
# metadata record).
#
# prevrevid is the revision id pointing to the previous state of
# the object. This is used for undo.
#
# txnMetadata -- {tid -> status+userlen+desclen+user+desc+ext}
# Maps tids to metadata about a transaction.
#
# Status is a 1-character status flag, which is used by the undo
# mechanism, and has the following values (see constants above):
# 'N' -- This transaction is "pack protected". You can only
# undo back to the last pack, and any transactions
# before the pack time get marked with this flag.
# 'Y' -- It is okay to undo past this transaction.
#
# userlen is the length in characters of the `user' field as an
# 8-byte unsigned long integer
# desclen is the length in characters of the `desc' field as an
# 8-byte unsigned long integer
# user is the user information passed to tpc_finish()
# desc is the description info passed to tpc_finish()
# ext is the extra info passed to tpc_finish(). It is a
# dictionary that we get already pickled by BaseStorage.
#
# txnOids -- {tid -> [oid]}
# Maps transaction ids to the oids of the objects modified by the
# transaction.
#
# refcounts -- {oid -> count}
# Maps objects to their reference counts.
#
# references -- {oid+tid -> [oid]}
# Maps the concrete object referenced by oid+tid to the list of
# objects it references. This is essentially a cache, since we
# could look up the pickle associated with oid+tid and "sniff" the
# pickle for its references.
#
# pickleRefcounts -- {oid+tid -> count}
# Maps the concrete object referenced by oid+tid to the reference
# count of its pickle.
#
# Tables common to the base framework
self._serials = self._setupDB('serials')
self._pickles = self._setupDB('pickles')
# These are specific to the full implementation
self._vids = self._setupDB('vids')
self._versions = self._setupDB('versions')
self._currentVersions = self._setupDB('currentVersions', db.DB_DUP)
self._metadata = self._setupDB('metadata')
self._txnMetadata = self._setupDB('txnMetadata')
self._txnOids = self._setupDB('txnOids', db.DB_DUP)
self._refcounts = self._setupDB('refcounts')
self._references = self._setupDB('references', db.DB_DUP)
self._pickleRefcounts = self._setupDB('pickleRefcounts')
# Initialize our cache of the next available version id.
record = self._versions.cursor().last()
if record:
# Convert to a Python long integer. Note that cursor.last()
# returns key/value, and we want the key (which for the _version
# table is is the vid).
self.__nextvid = utils.U64(vid[0])
else:
self.__nextvid = 0L
def close(self):
self._serials.close()
self._pickles.close()
self._vids.close()
self._versions.close()
self._currentVersions.close()
self._metadata.close()
self._txnMetadata.close()
self._txnOids.close()
self._refcounts.close()
self._references.close()
self._pickleReferenceCount.close()
BerkeleyBase.close(self)
def _begin(self, tid, u, d, e):
# Begin the current transaction. Currently, this just makes sure that
# the commit log is in the proper state.
if self._commitlog is None:
# JF: Chris was getting some weird errors / bizarre behavior from
# Berkeley when using an existing directory or having non-BSDDB
# files in that directory.
self._commitlog = FullLog(dir=self._env.db_home)
self._commitlog.start()
def _vote(self, transaction):
# From here on out, we promise to commit all the registered changes,
# so rewind and put our commit log in the PROMISED state.
self._commitlog.promise()
def _dbnames(self): def _finish(self, tid, u, d, e):
# Supports Base framework # This is called from the storage interface's tpc_finish() method.
return ('current', 'pickle', 'record', # Its responsibilities are to finish the transaction with the
'transactions', 'transaction_oids', # underlying database.
'vids', 'versions', 'currentVersions', #
'referenceCount', 'pickleReferenceCount', # We have a problem here because tpc_finish() is not supposed to raise
'references', # any exceptions. However because finishing with the backend database
) # /can/ cause exceptions, they may be thrown from here as well. If
# that happens, we abort the transaction.
#
# Because of the locking semantics issue described above, finishing
# the transaction in this case involves:
# - starting a transaction with Berkeley DB
# - replaying our commit log for object updates
# - storing those updates in BSDDB
# - committing those changes to BSDDB
#
# Once the changes are committed successfully to BSDDB, we're done
# with our log file.
#
# tid is the transaction id
# u is the user associated with the transaction
# d is the description of the transaction
# e is the transaction extension
zero = '\0'*8
txn = self._env.txn_begin()
try:
# Update the transaction metadata
userlen = len(u)
desclen = len(d)
lengths = struct.pack('>II', userlen, desclen)
# BAW: it's slightly faster to use '%s%s%s%s%s' % ()
# concatentation than string adds, but that will be dependent on
# string length. Those are both faster than using %c as first in
# format string (even though we know the first one is a
# character), and those are faster still than string.join().
self._txnMetadata.put(tid,
UNDOABLE_TRANSACTION + lengths + u + d + e,
txn=txn)
while 1:
rec = self._commitlog.next_object()
if rec is None:
break
op, data = rec
if op == 'o':
# This is a `versioned' object record. Information about
# this object must be stored in the pickle table, the
# object metadata table, the currentVersions tables , and
# the transactions->oid table.
oid, vid, nvrevid, lrevid, pickle, prevrevid = data
key = oid + tid
if pickle:
# This was the result of a store() call which gives us
# a brand new pickle, so we need to update the pickles
# table. The lrevid will be empty, and we make it the
# tid of this transaction
#
# Otherwise, this was the result of a commitVersion()
# or abortVersion() call, essentially moving the
# object to a new version. We don't need to update
# the pickle table because we aren't creating a new
# pickle.
self._pickles.put(key, pickle, txn=txn)
lrevid = tid
# Update the metadata table
self._metadata.put(key, vid+nvrevid+tid+prevrevid, txn=txn)
# If we're in a real version, update this table too
if vid <> zero:
self._currentVersions.put(vid, oid, txn=txn)
self._serials.put(oid, tid, txn=txn)
self._txnOids.put(tid, oid, txn=txn)
# Boost the refcount of all the objects referred to by
# this pickle. referencesf() scans a pickle and returns
# the list of objects referenced by the pickle. BAW: In
# Zope 2.3.1, which we need to target, the signature of
# this function requires an empty list, but it returns
# that list. In future versions of Zope, there's a
# default argument for that.
for roid in referencesf(pickle, []):
refcount = self._refcounts.get(roid, zero, txn=txn)
refcount = utils.p64(utils.U64(refcount) + 1)
self._refcounts.put(roid, refcount, txn=txn)
# Update the pickle's reference count. Remember, the
# refcount is stored as a string, so we have to do the
# string->long->string dance.
refcount = self._pickleReferenceCount.get(key, zero,
txn=txn)
refcount = utils.p64(utils.U64(refcount) + 1)
self._pickleReferenceCount.put(key, refcount, txn=txn)
elif op == 'v':
# This is a "create-a-version" record
version, vid = data
self._versions.put(vid, version, txn=txn)
self._vids.put(version, vid, txn=txn)
elif op == 'd':
# This is a "delete-a-version" record
vid = data[0]
self._currentVersions.delete(vid, txn=txn)
except:
# If any errors whatsoever occurred, abort the transaction with
# Berkeley, leave the commit log file in the PROMISED state (since
# its changes were never committed), and re-raise the exception.
txn.abort()
raise
else:
# Everything is hunky-dory. Commit the Berkeley transaction, and
# reset the commit log for the next transaction.
txn.commit()
self._closelog()
def abortVersion(self, src, transaction): #
# Do some things in a version
#
def abortVersion(self, version, transaction):
# Abort the version, but retain enough information to make the abort
# undoable.
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
c=0 zero = '\0'*8
c = None # the currentVersions cursor
self._lock_acquire() self._lock_acquire()
try: try:
newtid=self._serial # The transaction id for this abort
vid=self._vids[src] tid = self._serial
# Let KeyErrors percolate up. This is how we ensure that the
oids=[]; save_oid=oids.append # version we're aborting is not the empty string.
c=self._currentVersions.cursor() vid = self._vids[version]
i=c.set(vid) # We need to keep track of the oids that are affected by the abort
get=c.get # so that we can return it to the connection, which must
current=self._current # invalidate the objects so they can be reloaded.
records=self._record oids = []
storeNV=self._tmp.storeNV c = self._currentVersions.cursor()
zero="\0\0\0\0\0\0\0\0" rec = c.set(vid)
while i: # Now cruise through all the records for this version, looking for
v, oid = i # objects modified in this version, but which were not created in
# this version. For each of these objects, we're going to want to
# Get current record data # write a log entry that will cause the non-version revision of
tid=current[oid] # the object to become current. This preserves the version
record=records[oid+tid] # information for undo.
rvid, nv, data = unpack("8s8s8s", record[:24]) while rec:
if rvid != vid: raise "vid inconsistent with currentVersions" oid = rec[1] # ignore the key
if nv == zero: revid = self._serials[oid]
# This object was created in the version, so there's meta = self._metadata[oid+revid]
# nothing to do. We can skip it. curvid, nvrevid = struct.unpack('8s8s8s', meta[:16])
# Make sure that the vid in the metadata record is the same as
# the vid we sucked out of the vids table, otherwise we've got
# an internal database inconsistency.
if curvid <> vid:
raise InternalInconsistencyError
if nvrevid == zero:
# This object was created in the version, so we don't need
# to do anything about it.
continue continue
# Get the non-version data for the object
# Get non-version data nvmeta = self._metadata[oid+nvrevid]
record=records[oid+nv] curvid, nvrevid, lrevid = unpack('8s8s8s', nvmeta[:24])
rvid, nv, data = unpack("8s8s8s", record[:24]) # We expect curvid to be zero because we just got the
if rvid: raise "expected non-version data" # non-version entry.
if curvid <> zero:
storeNV(oid, data, tid) raise InternalInconsistencyError
# Write the object id, live revision id, and this transaction
save_oid(oid) # id (which serves as the previous revid) to the commit log.
self._commitlog.write_nonversion_object(oid, lrevid, tid)
i=get(db.DB_NEXT_DUP) # Remember to return the oid...
oids.append(oid)
self._tmp.versionDiscard(vid) # ...and get the next record for this vid
rec = c.next_dup()
# We've now processed all the objects on the discarded version, so
# write this to the commit log and return the list of oids to
# invalidate.
self._commitlog.write_discard_version(vid)
return oids return oids
finally: finally:
if c != 0: c.close() if c:
c.close()
self._lock_release() self._lock_release()
def commitVersion(self, src, dest, transaction): def commitVersion(self, src, dest, transaction):
# Commit a source version `src' to a destination version `dest'. It's
# perfectly valid to move an object from one version to another. src
# and dest are version strings, and if we're committing to a
# non-version, dest will be empty.
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
c=0 zero = '\0'*8
c = None # the currentVersions cursor
self._lock_acquire() self._lock_acquire()
try: try:
newtid=self._serial # The transaction id for this commit
vid=self._vids[src] tid = self._serial
# Get the version ids associated with the source and destination
oids=[]; save_oid=oids.append # version strings.
c=self._currentVersions.cursor() svid = self._vids[src]
i=c.set(vid) if not dest:
get=c.get dvid = zero
current=self._current else:
records=self._record # Find the vid for the destination version, or create one if
store=self._tmp.store # necessary.
zero="\0\0\0\0\0\0\0\0" dvid = self.__findcreatevid(dest)
# Keep track of the oids affected by this commit.
try: dvid=self._vids[dest] oids = []
except KeyError: c = self._currentVersions.cursor()
dvid=self._newvid() rec = c.set(vid)
self._tmp.newVersion(version, vid) # Now cruise through all the records for this version, writing to
# the commit log all the objects changed in this version.
while i: while rec:
v, oid = i oid = rec[1] # ignore the key
revid = self._serials[oid]
# Get current record data meta = self._metadata[oid+revid]
tid=current[oid] curvid, nvrevid, lrevid = struct.unpack('8s8s8s', meta[:24])
record=records[oid+tid] # Our database better be consistent.
rvid, nv, data = unpack("8s8s8s", record[:24]) if curvid <> svid:
if rvid != vid: raise "vid inconsistent with currentVersions" raise InternalInconsistencyError
# If we're committing to a non-version, then the non-version
if not dest: nv=zero # revision id ought to be zero also, regardless of what it was
store(oid,dvid,nv,data,'',tid) # for the source version.
if not dest:
save_oid(oid) nvrevid = zero
self._commitlog.write_moved_object(
i=get(db.DB_NEXT_DUP) oid, dvid, nvrevid, lrevid, tid)
# Remember to return the oid...
self._tmp.versionDiscard(vid) oids.append(oid)
# ...and get the next record for this vid
rec = c.next_dup()
# Now that we're done, we can discard this version
self._commitlog.write_discard_version(vid)
return oids return oids
finally: finally:
if c != 0: c.close() if c:
c.close()
self._lock_release() self._lock_release()
def modifiedInVersion(self, oid):
# Return the version string of the version that contains the most
# recent change to the object. The empty string means the change
# isn't in a version.
self._lock_acquire()
try:
# Let KeyErrors percolate up
revid = self._serials[oid]
vid = self._metadata[oid+revid][:8]
if vid == '\0'*8:
# Not in a version
return ''
return self._versions[vid]
finally:
self._lock_release()
#
# Public storage interface
#
def load(self, oid, version): def load(self, oid, version):
# BAW: in the face of application level conflict resolution, it's
# /possible/ to load an object that is sitting in the commit log.
# That's bogus though because there's no way to know what to return;
# i.e. returning the not-yet-committed state isn't right (because you
# don't know if the txn will be committed or aborted), and returning
# the last committed state doesn't help. So, don't do this!
#
# The solution is, in the Connection object wait to reload the object
# until the transaction has been committed. Still, we don't check for
# this condition, although maybe we should.
self._lock_acquire() self._lock_acquire()
try: try:
t=self._index[oid] # Get the current revid for the object. As per the protocol, let
vid, nv, data = unpack(">8s8s8s", self._record[oid+t][:24]) # any KeyErrors percolate up.
if vid == '\0\0\0\0\0\0\0\0' or self._versions[vid]==version: revid = self._serials[oid]
return self._pickle[oid+data], t # Get the metadata associated with this revision of the object.
t=nv # All we really need is the vid, the non-version revid and the
data = self._record[oid+t][16:24] # pickle pointer revid.
return self._pickle[oid+data], t rec = self._metadata[oid+revid]
finally: self._lock_release() vid, nvrevid, lrevid = struct.unpack('>8s8s8s', rec[:24])
# If the object isn't living in a version, or if the version the
# object is living in is equal to the version that's being
# requested, then we can simply return the pickle referenced by
# the revid.
if vid == '\0'*8 or self._versions[vid] == version:
return self._pickle[oid+lrevid], revid
# Otherwise, we recognize that an object cannot be stored in more
# than one version at a time (although this may change if/when
# "Unlocked" versions are added). So we return the non-version
# revision of the object. BAW: should we assert that version is
# empty in this case?
lrevid = self._metadata[oid+nvrevid][16:24]
return self._pickle[oid+lrevid], nvrevid
finally:
self._lock_release()
def loadSerial(self, oid, serial): def loadSerial(self, oid, serial):
# Return the revision of the object with the given serial number.
self._lock_acquire() self._lock_acquire()
try: try:
data = self._record[oid+serial][16:24] # Get the pointer to the pickle (i.e. live revid, or lrevid)
return self._pickle[oid+data] # corresponding to the oid and the supplied serial
finally: self._lock_release() # a.k.a. revision.
lrevid = self._metadata[oid+serial][16:24]
return self._pickle[oid+lrevid]
finally:
self._lock_release()
def modifiedInVersion(self, oid): def __findcreatevid(self, version):
self._lock_acquire() # Get the vid associated with a version string, or create one if there
try: # is no vid for the version.
tid=self._current[oid] #
vid=self._record[oid+tid][:8] # First we look for the version in the Berkeley table. If not
if vid == '\0\0\0\0\0\0\0\0': return '' # present, then we look in the commit log to see if a new version
return self._versions[vid] # creation is pending. If still missing, then create the new version
finally: self._lock_release() # and add it to the commit log.
vid = self._vids.get(version)
def _newvid(self): if vid is None:
self._vid=self._vid+1 vid = self._commitlog.get_vid(version)
return utils.p64(self._vid) if vid is None:
self.__nextvid = self.__nextvid + 1
# Convert the int/long version ID into an 8-byte string
vid = utils.p64(self.__nextvid)
self._commitlog.write_new_version(version, vid)
return vid
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction: if transaction is not self._transaction:
...@@ -171,322 +524,305 @@ class Full(Base): ...@@ -171,322 +524,305 @@ class Full(Base):
self._lock_acquire() self._lock_acquire()
try: try:
# Check for conflict errors. JF says: under some circumstances,
# it is possible that we'll get two stores for the same object in
# a single transaction. It's not clear though under what
# situations that can occur or what the semantics ought to be.
# For now, we'll assume this doesn't happen.
oserial = orevid = self._serials.get(oid)
if oserial is None:
# There's never been a previous revision of this object, so
# set its non-version revid to zero.
nvrevid = '\0'*8
elif serial <> oserial:
# The object exists in the database, but the serial number
# given in the call is not the same as the last stored serial
# number. Raise a ConflictError.
raise POSException.ConflictError(
'serial number mismatch (was: %s, has: %s)' %
(oserial, utils.U64(serial)))
# Do we already know about this version? If not, we need to
# record the fact that a new version is being created. `version'
# will be the empty string when the transaction is storing on the
# non-version revision of the object.
if version: if version:
try: vid=self._vids[version] vid = self.__findcreatevid(version)
except:
vid=self._newvid()
self._tmp.newVersion(version, vid)
else:
vid=nv='\0\0\0\0\0\0\0\0'
if self._index.has_key(oid):
old=self._index[oid]
if serial != old: raise POSException.ConflictError
ovid, nv = unpack(">8s8s", self._record[oid+old][:16])
if ovid != vid:
raise POSException.VersionLockError, (`oid`, ovid)
if version and ovid == '\0\0\0\0\0\0\0\0': nv=old
else: else:
nv='\0\0\0\0\0\0\0\0' # vid 0 means no explicit version
vid = '\0'*8
self._tmp.store(oid, vid, nv, '', data, old) # A VersionLockError occurs when a particular object is being
# stored on a version different than the last version it was
finally: self._lock_release() # previously stored on (as long as the previous version wasn't
# zero, of course).
#
# Get the old version, which only makes sense if there was a
# previously stored revision of the object.
if orevid:
rec = self._metadata[oid+orevid]
ovid, onvrevid = struct.unpack('>8s8s', rec[:16])
if ovid == '\0'*8:
# The old revision's vid was zero any version is okay.
# But if we're storing this on a version, then the
# non-version revid will be the previous revid for the
# object.
if version:
nvrevid = orevid
elif ovid <> vid:
# The old revision was on a version different than the
# current version. That's a no no.
raise POSException.VersionLockError(
'version mismatch for object %s (was: %s, got: %s)' %
(oid, ovid, vid))
# Record the update to this object in the commit log.
self._commitlog.write_object(oid, vid, nvrevid, data, oserial)
finally:
self._lock_release()
# Return our cached serial number for the object.
return serial return serial
def supportsUndo(self): return 1 def _decref(self, oid, lrevid, txn):
def supportsVersions(self): return 1 # Decref the reference count of the pickle pointed to by oid+lrevid.
# If the reference count goes to zero, we can garbage collect the
def _finish(self, tid, u, d, e): # pickle, and decref all the object pointed to by the pickle (with of
txn=self._env.txn_begin() # course, cascading garbage collection).
try: key = oid + lrevid
tmp=self._tmp refcount = self._pickleReferenceCount.get(key, txn=txn)
ltmp=tmp.tell() refcount = utils.U64(refcount) - 1
if not ltmp: return if refcount > 0:
load=marshal.load self._pickleReferenceCount.put(key, utils.p64(refcount), txn=txn)
tid=self._serial return
records_put=self._records.put # The refcount of this pickle has gone to zero, so we need to garbage
pickles_put=self._pickle.put # collect it, and decref all the objects it points to.
current_put=self._current.put self._pickleReferenceCount.delete(key, txn=txn)
transaction_oids_put=self._transaction_oids.put pickle = self._pickles.get(key, txn=txn)
currentVersions_put=self._currentVersions.put collectedOids = []
l=pack(">HI",len(u), len(d)) for roid in referencesf(pickle, []):
self._transactions.put(tid, ' '+l+u+d+e, txn) refcount = self._refcounts.get(roid, txn=txn)
while ltmp: refcount = utils.U64(refcount) - 1
try: op, arg = load(tmp) if refcount > 0:
except EOFError: self._refcounts.put(roid, utils.p64(refcount), txn=txn)
if tmp.tell()==ltmp: ltmp=0 else:
else: raise # This sucker gets garbage collected itself.
else: self._refcounts.delete(roid, txn=txn)
if op=='s': collectedOids.append(roid)
# store data # Now, for all the objects whose refcounts just went to zero, we need
oid, vid, nv, back, data, pre = arg # to recursively decref their pickles.
key=oid+tid for roid in collectedOids:
if data: serial = self._serials.get(roid, txn=txn)
pickles_pud(key, data, txn) # The pickle for this metadata record is pointed to by lrevid
data=tid lrevid = self._metadata.get(roid+serial, txn=txn)[16:24]
else: # Now recurse...
data=back self._decref(roid, lrevid, txn)
records_put(key, vid+nv+data+pre, txn)
if vid != '/0/0/0/0/0/0/0/0':
versions_put(vid, oid, txn)
current_put(oid, tid, txn)
transaction_oids_put(tid, oid, txn)
elif op='d':
# discard a version (part of version commit and abort)
self._currentVersions.delete(arg, txn)
elif op='v':
# save a version definition
vid, version = arg
self._versions.put(vid, version, txn)
self._vids.put(version, vid, txn)
except:
txn.abort()
raise
else:
txn.commit()
def _undoable(self, txn):
txn.abort()
raise POSException.UndoError, 'Undoable transaction'
def undo(self, tid): def undo(self, tid):
# Attempt to undo transaction. NOTE: the current storage interface
# documentation says that this method takes a third argument, which is
# a limit on the number of oids to return. JF says, "Let's get rid of
# the third argument."
c = None # txnOids cursor
oids = []
self._lock_acquire() self._lock_acquire()
try: try:
status = self._transactions[tid][:1] # Make sure the transaction is undoable. If this transaction
if status == 'p': # occurred earlier than a pack operation, it is no longer
raise POSException.UndoError, 'Undoable transaction' # undoable. The status flag indicates its undoability.
status = self._txnMetadata[tid][1]
txn=self._env.txn_begin() if status == PROTECTED_TRANSACTION:
raise POSException.UndoError, 'Transaction cannot be undone'
current=self._current # Create the cursor and begin the transaction
record=self._record zero = '\0'*8
pickle=self._pickle txn = self._env.txn_begin()
currentVersions=self._currentVersions c = self._txnOids.cursor()
unpack=struct.unpack
try: try:
for oid in dups(self._transaction_oids raise POSException.UndoError, 'Undoable transaction' rec = c.set(tid)
while rec:
txn=self._env.txn_begin() oid = rec[1]
oids.append(oid)
current=self._current # Make sure the tid is current
record=self._record if self._serials.get(oid, txn=txn) <> tid:
pickle=self._pickle # BAW: we can only undo the most current revision of
currentVersions=self._currentVersions # the object???
unpack=struct.unpack raise POSException.UndoError(
"Not object's current transaction")
try: key = oid + tid
for oid in dups(self._transaction_oids, tid, txn): # Get the metadata for this object revision, and then
if current.get(oid, txn) != tid: self._undoable(txn) # delete the metadata record.
key=oid+tid vid, nvrevid, lrevid, prevrevid = struct.unpack(
vid, nv, data, pre = unpack("8s8s8s8s", '8s8s8s8s', self._metadata.get(key, txn=txn))
record.get(key, txn)) self._metadata.delete(key, txn=txn)
record.delete(key, txn) # Decref the reference count of the pickle that we're
if data==tid: pickle.delete(key, txn) # pointing to and garbage collect it if the refcount falls
if pre == '\0\0\0\0\0\0\0\0': # to zero.
current.delete(oid, txn) self._decref(oid, lrevid, txn)
# If the prevrevid is zero, then we've just undone the
# creation of this object, so we can get rid of its
# serials record. Otherwise, update the serials record to
# point to the previous revision of the object.
if prevrevid == zero:
self._serials.delete(oid, txn=txn)
else: else:
current.put(oid, pre, txn) self._serials.put(oid, prevrevid, txn=txn)
try: pvid=record.get(oid+pre, txn) prec = self._metadata.get(oid+prevrevid, txn=txn)
except KeyError: self._undoable(txn) # BAW: what does it mean if the metadata for the
if pvid != vid: # previous revision of the object doesn't exist???
if vid != '\0\0\0\0\0\0\0\0': if not prec:
del_dup(currentVersions, vid, oid, txn) raise POSException.UndoError(
if pvid != '\0\0\0\0\0\0\0\0': "No previous revision for object")
currentVersions.put(pvid, oid, txn) pvid = prec[:8]
# If the version for the previous revision of the
self._transactions.delete(tid, txn) # object is different than the current revision of the
self._transaction_oids.delete(tid, txn) # object, then we're undoing past a version creation,
# so we can delete the entry for this vid/oid pair in
# the currentVersions table.
if pvid <> vid:
# Don't delete the non-version revision of the
# object.
if vid <> zero:
tmpc = self._currentVersions.cursor(txn=txn)
try:
rec = tmpc.get_both(vid, oid)
if rec:
tmpc.delete()
finally:
tmpc.close()
if pvid <> zero:
# Make the previous version the current one
self._currentVersions.put(pvid, oid, txn=txn)
# Finally, delete the transaction metadata associated with
# the transaction we just undid.
self._txnMetadata.delete(tid, txn=txn)
self._txnOids.delete(tid, txn=txn)
except: except:
txn.abort() txn.abort()
raise raise
else: else:
txn.commit() txn.commit()
return oids
finally: self._lock_release() finally:
if c:
c.close()
self._lock_release()
def undoLog(self, first, last, filter=None): def undoLog(self, first, last, filter=None):
# Get a list of transaction ids that can be undone, based on the
# determination of the filter. filter is a function which takes a
# transaction id and returns true or false.
#
# Note that this method has been deprecated by undoInfo() which itself
# has some flaws, but is the best we have now. We don't actually need
# to implement undoInfo() because BaseStorage (which we eventually
# inherit from) mixes in the UndoLogCompatible class which provides an
# implementation written in terms of undoLog().
#
c = None # tnxMetadata cursor
txnDescriptions = [] # the return value
i = 0 # first <= i < last
self._lock_acquire() self._lock_acquire()
try: try:
c=self._transactions.cursor() c = self._txnMetadata.cursor()
try: # We start at the last transaction and scan backwards because we
i=0; r=[]; a=r.append # can stop early if we find a transaction that is earlier than a
data=c.get(db.DB_LAST) # pack. We still have the potential to scan through all the
while data and i < last: # transactions.
tid, data = data rec = c.get_last()
status = data[:1] while rec and i < last:
if status == 'p': break tid, data = rec
luser, ldesc = unpack("HI", data[1:17]) status = data[0]
user=data[17:luser+17] if status == PROTECTED_TRANSACTION:
desc=data[luser+17:luser+17+ldesc] break
ext=data[luser+17+ldesc:] userlen, desclen = struct.unpack('>II', data[1:17])
user = data[17:17+userlen]
data={'id': tid, desc = data[17+userlen:17+userlen+desclen]
'time': TimeStamp(tid).timeTime(), ext = data[17+userlen+desclen:]
'user_name': user or '', # Create a dictionary for the TransactionDescription
'description': desc or '', txndesc = {'id' : tid,
} 'time' : TimeStamp(tid).timeTime(),
if ext: 'user_name' : user,
try: 'description': desc,
ext=loads(ext) }
data.update(ext) # The extension stuff is a picklable mapping, so if we can
except: pass # unpickle it, we update the TransactionDescription dictionary
# with that data. BAW: The bare except is moderately
if filter is None or filter(data): # disgusting, but I'm too lazy to figure out what exceptions
if i >= first: a(data) # could actually be raised here...
i=i+1 if ext:
try:
data=c.get(db.DB_PREV) txndesc.update(pickle.loads(ext))
except:
return r pass
# Now call the filter to see if this transaction should be
finally: c.close() # added to the return list...
finally: self._lock_release() if filter is None or filter(txndesc):
# ...and see if this is within the requested ordinals
if i >= first:
txnDescriptions.append(txndesc)
i = i + 1
# And get the previous record
rec = c.get_prev()
return txnDescriptions
finally:
if c:
c.close()
self._lock_release()
def versionEmpty(self, version): def versionEmpty(self, version):
# Return true if version is empty.
self._lock_acquire() self._lock_acquire()
try: try:
try: self._currentVersions[self._vids[version]] # Let these KeyError exceptions percolate up
except KeyError: return 1 vid = self._vids[version]
else: return 0 # But catch these, because it means the version is empty
finally: self._lock_release() if self._currentVersions.has_key(vid):
return 1
else:
return 0
finally:
self._lock_release()
def versions(self, max=None): def versions(self, max=None):
# Return the list of current versions, as strings, up to the maximum
# requested.
self._lock_acquire() self._lock_acquire()
c = None
try: try:
c=self._currentVersions.cursor() c = self._currentVersions.cursor()
try: rec = c.first()
try: data=c.get(db.DB_NEXT_NODUP) retval = []
except: return () while rec and (max is None or max > 0):
r=[]; a=r.append # currentVersions maps vids to [oid]'s so dig the key out of
while data: # the returned record and look the vid up in the
a(data[0]) # vids->versions table.
data=c.get(db.DB_NEXT_NODUP) retval.append(self._versions[rec[0]])
# Since currentVersions has duplicates (i.e. multiple vid keys
return r # with different oids), get the next record that has a
# different key than the current one.
finally: c.close() rec = c.next_nodup()
finally: self._lock_release() if max is not None:
max = max - 1
return retval
finally:
if c:
c.close()
self._lock_release()
def history(self, oid, version=None, length=1, filter=None): def history(self, oid, version=None, length=1, filter=None):
# FIXME
self._lock_acquire() self._lock_acquire()
try: try:
tid=self._current[oid] tid=self._current[oid]
finally: self._lock_release() finally: self._lock_release()
def pack(self, t, referencesf): def pack(self, t, referencesf):
# FIXME
self._lock_acquire() self._lock_acquire()
try: try:
pass
finally: self._lock_release() finally: self._lock_release()
# Other interface assertions
def supportsUndo(self):
return 1
class dups: def supportsVersions(self):
"""Iterator for duplicate-record databases" return 1
def __init__(self, db, key, txn=0):
if txn==0:
c=db.cursor()
else:
c=db.cursor(txn)
self._v=c.set(key)
self._g=c.get
self._i=0
def __getitem__(self, index):
i=self._i
if index==i: return self._v
if index < i or i < 0: raise IndexError, index
while i < index:
v=self._g(db.DB_NEXT_DUP)
if v:
i=i+1
else:
self._i=-1
raise IndexError, index
self._i=i
self._v=v
return v
def del_dup(database, key, value, txn):
c=database.cursor(txn)
try:
c.getBoth(key, value)
c.delete()
finally:
c.close()
class Log:
def __init__(self, file):
self._file=file
file.seek(0)
h=file.read(5)
size=0
if len(h) == 5:
state=h[0]
if state=='c':
size=unpack(">i", h[1:])
else:
state='s'
file.seek(0)
file.write('s\0\0\0\0')
self._state=state
self._size=size
def clear(self):
if self._state=='p':
raise "Can't clear state with uncommitted promised log"
self._file.seek(0)
self._file.write('s')
self._size=0
def promise(self):
self._state='p'
size=self._file.tell()-5
self._file.seek(0)
self._file.write('p'+pack(">i", size))
self._size=size
def commit(self):
file=self._file
l=file.tell()-1
if l:
self._file.seek(0)
self._file.write('c')
self._state='c'
def store(self, oid, vid, nv, dataptr, pickle, previous,
dump=marshal.dump):
dump(('s',(oid,vid,nv,data,pickle,previous)), self._file)
def storeNV(self, oid, data, tid,
dump=marshal.dump, zero='\0\0\0\0\0\0\0\0'):
dump(('s',(oid,zero,zero,data,'',tid)), self._file)
def versionDiscard(self, vid,
dump=marshal.dump):
dump(('d',(vid)), self._file)
def newVersion(self, version, vid,
dump=marshal.dump):
dump(('v',(version, vid)), self._file)
from base import Base """Berkeley storage with full undo and versioning support.
See Minimal.py for an implementation of Berkeley storage that does not support
undo or versioning.
"""
# $Revision: 1.4 $
__version__ = '0.1'
import struct
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net
from bsddb3 import db from bsddb3 import db
from struct import pack, unpack
import os, tempfile, string, marshal
from ZODB import POSException, utils
from marshal import dump, load
class Full(Base): from ZODB import POSException
from ZODB import utils
def _setupDbs(self): from ZODB.referencesf import referencesf
# Supports Base framework from ZODB import TimeStamp
self._index=self._setupDB('current')
for name in ( # BerkeleyBase.BerkeleyBase class provides some common functionality for both
'pickle', 'record', 'transactions', 'vids', 'versions', # the Full and Minimal implementations. It in turn inherits from
'referenceCount', 'pickleReferenceCount', # ZODB.BaseStorage.BaseStorage which itself provides some common storage
): # functionality.
self._setupDB(name) from BerkeleyBase import BerkeleyBase
from CommitLog import FullLog
self._setupDB('currentVersions', flags=db.DB_DUP)
self._setupDB('transaction_oids', flags=db.DB_DUP) # Flags for transaction status in the transaction metadata table. You can
self._setupDB('references', flags=db.DB_DUP) # only undo back to the last pack, and any transactions before the pack time
# get marked with the PROTECTED_TRANSACTION flag. An attempt to undo past a
c=self._vids.cursor() # PROTECTED_TRANSACTION will raise an POSException.UndoError. By default,
v=c.get(db.DB_LAST) # transactions are marked with the UNDOABLE_TRANSACTION status flag.
if v: self._vid=utils.U64(v[0]) UNDOABLE_TRANSACTION = 'Y'
else: self._vid=0L PROTECTED_TRANSACTION = 'N'
class InternalInconsistencyError(POSException.POSError, AssertError):
"""Raised when we detect an internal inconsistency in our tables."""
class Full(BerkeleyBase):
#
# Overrides of base class methods
#
def _setupDBs(self):
# Data Type Assumptions:
#
# object ids (oid) are 8-bytes
# object serial numbers are 8-bytes
# transaction ids (tid) are 8-bytes
# revision ids (revid) are the same as transaction ids, just used in a
# different context.
# version ids (vid) are 8-bytes
# data pickles are of arbitrary length
#
# Create the tables used to maintain the relevant information. The
# full storage needs a bunch of tables. These two are defined by the
# base class infrastructure and are shared by the Minimal
# implementation.
#
# serials -- {oid -> serial}
# Maps oids to object serial numbers. The serial number is
# essentially a timestamp used to determine if conflicts have
# arisen, and serial numbers double as transaction ids and object
# revision ids. If an attempt is made to store an object with a
# serial number that is different than the current serial number
# for the object, a ConflictError is raised.
#
# pickles -- {(oid+revid) -> pickle}
# Maps the concrete object referenced by oid+revid to that
# object's data pickle.
#
# These are used only by the Full implementation.
#
# vids -- {version_string -> vid}
# Maps version strings (which are arbitrary) to vids.
#
# versions -- {vid -> version_string}
# Maps vids to version strings.
#
# currentVersions -- {vid -> [oid]}
# Maps vids to the oids of the objects modified in that version
# for all current versions (except the 0th version, which is the
# non-version).
#
# metadata -- {oid+revid -> vid+nvrevid+lrevid+previd}
# Maps oid+revid to object metadata. This mapping is used to find
# other information about a particular concrete object revision.
# Essentially it stitches all the other pieces together.
#
# vid is the version id for the concrete object revision, and will
# be zero if the object isn't living in a version.
#
# nvrevid is the revision id pointing to the most current
# non-version concrete object revision. So, if the object is
# living in a version and that version is aborted, the nvrevid
# points to the object revision that will soon be restored.
# nvrevid will be zero if the object was never modified in a
# version.
#
# lrevid is the revision id pointing to object revision's pickle
# state (I think of it as the "live revision id" since it's the
# state that gives life to the concrete object described by this
# metadata record).
#
# prevrevid is the revision id pointing to the previous state of
# the object. This is used for undo.
#
# txnMetadata -- {tid -> status+userlen+desclen+user+desc+ext}
# Maps tids to metadata about a transaction.
#
# Status is a 1-character status flag, which is used by the undo
# mechanism, and has the following values (see constants above):
# 'N' -- This transaction is "pack protected". You can only
# undo back to the last pack, and any transactions
# before the pack time get marked with this flag.
# 'Y' -- It is okay to undo past this transaction.
#
# userlen is the length in characters of the `user' field as an
# 8-byte unsigned long integer
# desclen is the length in characters of the `desc' field as an
# 8-byte unsigned long integer
# user is the user information passed to tpc_finish()
# desc is the description info passed to tpc_finish()
# ext is the extra info passed to tpc_finish(). It is a
# dictionary that we get already pickled by BaseStorage.
#
# txnOids -- {tid -> [oid]}
# Maps transaction ids to the oids of the objects modified by the
# transaction.
#
# refcounts -- {oid -> count}
# Maps objects to their reference counts.
#
# references -- {oid+tid -> [oid]}
# Maps the concrete object referenced by oid+tid to the list of
# objects it references. This is essentially a cache, since we
# could look up the pickle associated with oid+tid and "sniff" the
# pickle for its references.
#
# pickleRefcounts -- {oid+tid -> count}
# Maps the concrete object referenced by oid+tid to the reference
# count of its pickle.
#
# Tables common to the base framework
self._serials = self._setupDB('serials')
self._pickles = self._setupDB('pickles')
# These are specific to the full implementation
self._vids = self._setupDB('vids')
self._versions = self._setupDB('versions')
self._currentVersions = self._setupDB('currentVersions', db.DB_DUP)
self._metadata = self._setupDB('metadata')
self._txnMetadata = self._setupDB('txnMetadata')
self._txnOids = self._setupDB('txnOids', db.DB_DUP)
self._refcounts = self._setupDB('refcounts')
self._references = self._setupDB('references', db.DB_DUP)
self._pickleRefcounts = self._setupDB('pickleRefcounts')
# Initialize our cache of the next available version id.
record = self._versions.cursor().last()
if record:
# Convert to a Python long integer. Note that cursor.last()
# returns key/value, and we want the key (which for the _version
# table is is the vid).
self.__nextvid = utils.U64(vid[0])
else:
self.__nextvid = 0L
def close(self):
self._serials.close()
self._pickles.close()
self._vids.close()
self._versions.close()
self._currentVersions.close()
self._metadata.close()
self._txnMetadata.close()
self._txnOids.close()
self._refcounts.close()
self._references.close()
self._pickleReferenceCount.close()
BerkeleyBase.close(self)
def _begin(self, tid, u, d, e):
# Begin the current transaction. Currently, this just makes sure that
# the commit log is in the proper state.
if self._commitlog is None:
# JF: Chris was getting some weird errors / bizarre behavior from
# Berkeley when using an existing directory or having non-BSDDB
# files in that directory.
self._commitlog = FullLog(dir=self._env.db_home)
self._commitlog.start()
def _vote(self, transaction):
# From here on out, we promise to commit all the registered changes,
# so rewind and put our commit log in the PROMISED state.
self._commitlog.promise()
def _dbnames(self): def _finish(self, tid, u, d, e):
# Supports Base framework # This is called from the storage interface's tpc_finish() method.
return ('current', 'pickle', 'record', # Its responsibilities are to finish the transaction with the
'transactions', 'transaction_oids', # underlying database.
'vids', 'versions', 'currentVersions', #
'referenceCount', 'pickleReferenceCount', # We have a problem here because tpc_finish() is not supposed to raise
'references', # any exceptions. However because finishing with the backend database
) # /can/ cause exceptions, they may be thrown from here as well. If
# that happens, we abort the transaction.
#
# Because of the locking semantics issue described above, finishing
# the transaction in this case involves:
# - starting a transaction with Berkeley DB
# - replaying our commit log for object updates
# - storing those updates in BSDDB
# - committing those changes to BSDDB
#
# Once the changes are committed successfully to BSDDB, we're done
# with our log file.
#
# tid is the transaction id
# u is the user associated with the transaction
# d is the description of the transaction
# e is the transaction extension
zero = '\0'*8
txn = self._env.txn_begin()
try:
# Update the transaction metadata
userlen = len(u)
desclen = len(d)
lengths = struct.pack('>II', userlen, desclen)
# BAW: it's slightly faster to use '%s%s%s%s%s' % ()
# concatentation than string adds, but that will be dependent on
# string length. Those are both faster than using %c as first in
# format string (even though we know the first one is a
# character), and those are faster still than string.join().
self._txnMetadata.put(tid,
UNDOABLE_TRANSACTION + lengths + u + d + e,
txn=txn)
while 1:
rec = self._commitlog.next_object()
if rec is None:
break
op, data = rec
if op == 'o':
# This is a `versioned' object record. Information about
# this object must be stored in the pickle table, the
# object metadata table, the currentVersions tables , and
# the transactions->oid table.
oid, vid, nvrevid, lrevid, pickle, prevrevid = data
key = oid + tid
if pickle:
# This was the result of a store() call which gives us
# a brand new pickle, so we need to update the pickles
# table. The lrevid will be empty, and we make it the
# tid of this transaction
#
# Otherwise, this was the result of a commitVersion()
# or abortVersion() call, essentially moving the
# object to a new version. We don't need to update
# the pickle table because we aren't creating a new
# pickle.
self._pickles.put(key, pickle, txn=txn)
lrevid = tid
# Update the metadata table
self._metadata.put(key, vid+nvrevid+tid+prevrevid, txn=txn)
# If we're in a real version, update this table too
if vid <> zero:
self._currentVersions.put(vid, oid, txn=txn)
self._serials.put(oid, tid, txn=txn)
self._txnOids.put(tid, oid, txn=txn)
# Boost the refcount of all the objects referred to by
# this pickle. referencesf() scans a pickle and returns
# the list of objects referenced by the pickle. BAW: In
# Zope 2.3.1, which we need to target, the signature of
# this function requires an empty list, but it returns
# that list. In future versions of Zope, there's a
# default argument for that.
for roid in referencesf(pickle, []):
refcount = self._refcounts.get(roid, zero, txn=txn)
refcount = utils.p64(utils.U64(refcount) + 1)
self._refcounts.put(roid, refcount, txn=txn)
# Update the pickle's reference count. Remember, the
# refcount is stored as a string, so we have to do the
# string->long->string dance.
refcount = self._pickleReferenceCount.get(key, zero,
txn=txn)
refcount = utils.p64(utils.U64(refcount) + 1)
self._pickleReferenceCount.put(key, refcount, txn=txn)
elif op == 'v':
# This is a "create-a-version" record
version, vid = data
self._versions.put(vid, version, txn=txn)
self._vids.put(version, vid, txn=txn)
elif op == 'd':
# This is a "delete-a-version" record
vid = data[0]
self._currentVersions.delete(vid, txn=txn)
except:
# If any errors whatsoever occurred, abort the transaction with
# Berkeley, leave the commit log file in the PROMISED state (since
# its changes were never committed), and re-raise the exception.
txn.abort()
raise
else:
# Everything is hunky-dory. Commit the Berkeley transaction, and
# reset the commit log for the next transaction.
txn.commit()
self._closelog()
def abortVersion(self, src, transaction): #
# Do some things in a version
#
def abortVersion(self, version, transaction):
# Abort the version, but retain enough information to make the abort
# undoable.
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
c=0 zero = '\0'*8
c = None # the currentVersions cursor
self._lock_acquire() self._lock_acquire()
try: try:
newtid=self._serial # The transaction id for this abort
vid=self._vids[src] tid = self._serial
# Let KeyErrors percolate up. This is how we ensure that the
oids=[]; save_oid=oids.append # version we're aborting is not the empty string.
c=self._currentVersions.cursor() vid = self._vids[version]
i=c.set(vid) # We need to keep track of the oids that are affected by the abort
get=c.get # so that we can return it to the connection, which must
current=self._current # invalidate the objects so they can be reloaded.
records=self._record oids = []
storeNV=self._tmp.storeNV c = self._currentVersions.cursor()
zero="\0\0\0\0\0\0\0\0" rec = c.set(vid)
while i: # Now cruise through all the records for this version, looking for
v, oid = i # objects modified in this version, but which were not created in
# this version. For each of these objects, we're going to want to
# Get current record data # write a log entry that will cause the non-version revision of
tid=current[oid] # the object to become current. This preserves the version
record=records[oid+tid] # information for undo.
rvid, nv, data = unpack("8s8s8s", record[:24]) while rec:
if rvid != vid: raise "vid inconsistent with currentVersions" oid = rec[1] # ignore the key
if nv == zero: revid = self._serials[oid]
# This object was created in the version, so there's meta = self._metadata[oid+revid]
# nothing to do. We can skip it. curvid, nvrevid = struct.unpack('8s8s8s', meta[:16])
# Make sure that the vid in the metadata record is the same as
# the vid we sucked out of the vids table, otherwise we've got
# an internal database inconsistency.
if curvid <> vid:
raise InternalInconsistencyError
if nvrevid == zero:
# This object was created in the version, so we don't need
# to do anything about it.
continue continue
# Get the non-version data for the object
# Get non-version data nvmeta = self._metadata[oid+nvrevid]
record=records[oid+nv] curvid, nvrevid, lrevid = unpack('8s8s8s', nvmeta[:24])
rvid, nv, data = unpack("8s8s8s", record[:24]) # We expect curvid to be zero because we just got the
if rvid: raise "expected non-version data" # non-version entry.
if curvid <> zero:
storeNV(oid, data, tid) raise InternalInconsistencyError
# Write the object id, live revision id, and this transaction
save_oid(oid) # id (which serves as the previous revid) to the commit log.
self._commitlog.write_nonversion_object(oid, lrevid, tid)
i=get(db.DB_NEXT_DUP) # Remember to return the oid...
oids.append(oid)
self._tmp.versionDiscard(vid) # ...and get the next record for this vid
rec = c.next_dup()
# We've now processed all the objects on the discarded version, so
# write this to the commit log and return the list of oids to
# invalidate.
self._commitlog.write_discard_version(vid)
return oids return oids
finally: finally:
if c != 0: c.close() if c:
c.close()
self._lock_release() self._lock_release()
def commitVersion(self, src, dest, transaction): def commitVersion(self, src, dest, transaction):
# Commit a source version `src' to a destination version `dest'. It's
# perfectly valid to move an object from one version to another. src
# and dest are version strings, and if we're committing to a
# non-version, dest will be empty.
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
c=0 zero = '\0'*8
c = None # the currentVersions cursor
self._lock_acquire() self._lock_acquire()
try: try:
newtid=self._serial # The transaction id for this commit
vid=self._vids[src] tid = self._serial
# Get the version ids associated with the source and destination
oids=[]; save_oid=oids.append # version strings.
c=self._currentVersions.cursor() svid = self._vids[src]
i=c.set(vid) if not dest:
get=c.get dvid = zero
current=self._current else:
records=self._record # Find the vid for the destination version, or create one if
store=self._tmp.store # necessary.
zero="\0\0\0\0\0\0\0\0" dvid = self.__findcreatevid(dest)
# Keep track of the oids affected by this commit.
try: dvid=self._vids[dest] oids = []
except KeyError: c = self._currentVersions.cursor()
dvid=self._newvid() rec = c.set(vid)
self._tmp.newVersion(version, vid) # Now cruise through all the records for this version, writing to
# the commit log all the objects changed in this version.
while i: while rec:
v, oid = i oid = rec[1] # ignore the key
revid = self._serials[oid]
# Get current record data meta = self._metadata[oid+revid]
tid=current[oid] curvid, nvrevid, lrevid = struct.unpack('8s8s8s', meta[:24])
record=records[oid+tid] # Our database better be consistent.
rvid, nv, data = unpack("8s8s8s", record[:24]) if curvid <> svid:
if rvid != vid: raise "vid inconsistent with currentVersions" raise InternalInconsistencyError
# If we're committing to a non-version, then the non-version
if not dest: nv=zero # revision id ought to be zero also, regardless of what it was
store(oid,dvid,nv,data,'',tid) # for the source version.
if not dest:
save_oid(oid) nvrevid = zero
self._commitlog.write_moved_object(
i=get(db.DB_NEXT_DUP) oid, dvid, nvrevid, lrevid, tid)
# Remember to return the oid...
self._tmp.versionDiscard(vid) oids.append(oid)
# ...and get the next record for this vid
rec = c.next_dup()
# Now that we're done, we can discard this version
self._commitlog.write_discard_version(vid)
return oids return oids
finally: finally:
if c != 0: c.close() if c:
c.close()
self._lock_release() self._lock_release()
def modifiedInVersion(self, oid):
# Return the version string of the version that contains the most
# recent change to the object. The empty string means the change
# isn't in a version.
self._lock_acquire()
try:
# Let KeyErrors percolate up
revid = self._serials[oid]
vid = self._metadata[oid+revid][:8]
if vid == '\0'*8:
# Not in a version
return ''
return self._versions[vid]
finally:
self._lock_release()
#
# Public storage interface
#
def load(self, oid, version): def load(self, oid, version):
# BAW: in the face of application level conflict resolution, it's
# /possible/ to load an object that is sitting in the commit log.
# That's bogus though because there's no way to know what to return;
# i.e. returning the not-yet-committed state isn't right (because you
# don't know if the txn will be committed or aborted), and returning
# the last committed state doesn't help. So, don't do this!
#
# The solution is, in the Connection object wait to reload the object
# until the transaction has been committed. Still, we don't check for
# this condition, although maybe we should.
self._lock_acquire() self._lock_acquire()
try: try:
t=self._index[oid] # Get the current revid for the object. As per the protocol, let
vid, nv, data = unpack(">8s8s8s", self._record[oid+t][:24]) # any KeyErrors percolate up.
if vid == '\0\0\0\0\0\0\0\0' or self._versions[vid]==version: revid = self._serials[oid]
return self._pickle[oid+data], t # Get the metadata associated with this revision of the object.
t=nv # All we really need is the vid, the non-version revid and the
data = self._record[oid+t][16:24] # pickle pointer revid.
return self._pickle[oid+data], t rec = self._metadata[oid+revid]
finally: self._lock_release() vid, nvrevid, lrevid = struct.unpack('>8s8s8s', rec[:24])
# If the object isn't living in a version, or if the version the
# object is living in is equal to the version that's being
# requested, then we can simply return the pickle referenced by
# the revid.
if vid == '\0'*8 or self._versions[vid] == version:
return self._pickle[oid+lrevid], revid
# Otherwise, we recognize that an object cannot be stored in more
# than one version at a time (although this may change if/when
# "Unlocked" versions are added). So we return the non-version
# revision of the object. BAW: should we assert that version is
# empty in this case?
lrevid = self._metadata[oid+nvrevid][16:24]
return self._pickle[oid+lrevid], nvrevid
finally:
self._lock_release()
def loadSerial(self, oid, serial): def loadSerial(self, oid, serial):
# Return the revision of the object with the given serial number.
self._lock_acquire() self._lock_acquire()
try: try:
data = self._record[oid+serial][16:24] # Get the pointer to the pickle (i.e. live revid, or lrevid)
return self._pickle[oid+data] # corresponding to the oid and the supplied serial
finally: self._lock_release() # a.k.a. revision.
lrevid = self._metadata[oid+serial][16:24]
return self._pickle[oid+lrevid]
finally:
self._lock_release()
def modifiedInVersion(self, oid): def __findcreatevid(self, version):
self._lock_acquire() # Get the vid associated with a version string, or create one if there
try: # is no vid for the version.
tid=self._current[oid] #
vid=self._record[oid+tid][:8] # First we look for the version in the Berkeley table. If not
if vid == '\0\0\0\0\0\0\0\0': return '' # present, then we look in the commit log to see if a new version
return self._versions[vid] # creation is pending. If still missing, then create the new version
finally: self._lock_release() # and add it to the commit log.
vid = self._vids.get(version)
def _newvid(self): if vid is None:
self._vid=self._vid+1 vid = self._commitlog.get_vid(version)
return utils.p64(self._vid) if vid is None:
self.__nextvid = self.__nextvid + 1
# Convert the int/long version ID into an 8-byte string
vid = utils.p64(self.__nextvid)
self._commitlog.write_new_version(version, vid)
return vid
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction: if transaction is not self._transaction:
...@@ -171,322 +524,305 @@ class Full(Base): ...@@ -171,322 +524,305 @@ class Full(Base):
self._lock_acquire() self._lock_acquire()
try: try:
# Check for conflict errors. JF says: under some circumstances,
# it is possible that we'll get two stores for the same object in
# a single transaction. It's not clear though under what
# situations that can occur or what the semantics ought to be.
# For now, we'll assume this doesn't happen.
oserial = orevid = self._serials.get(oid)
if oserial is None:
# There's never been a previous revision of this object, so
# set its non-version revid to zero.
nvrevid = '\0'*8
elif serial <> oserial:
# The object exists in the database, but the serial number
# given in the call is not the same as the last stored serial
# number. Raise a ConflictError.
raise POSException.ConflictError(
'serial number mismatch (was: %s, has: %s)' %
(oserial, utils.U64(serial)))
# Do we already know about this version? If not, we need to
# record the fact that a new version is being created. `version'
# will be the empty string when the transaction is storing on the
# non-version revision of the object.
if version: if version:
try: vid=self._vids[version] vid = self.__findcreatevid(version)
except:
vid=self._newvid()
self._tmp.newVersion(version, vid)
else:
vid=nv='\0\0\0\0\0\0\0\0'
if self._index.has_key(oid):
old=self._index[oid]
if serial != old: raise POSException.ConflictError
ovid, nv = unpack(">8s8s", self._record[oid+old][:16])
if ovid != vid:
raise POSException.VersionLockError, (`oid`, ovid)
if version and ovid == '\0\0\0\0\0\0\0\0': nv=old
else: else:
nv='\0\0\0\0\0\0\0\0' # vid 0 means no explicit version
vid = '\0'*8
self._tmp.store(oid, vid, nv, '', data, old) # A VersionLockError occurs when a particular object is being
# stored on a version different than the last version it was
finally: self._lock_release() # previously stored on (as long as the previous version wasn't
# zero, of course).
#
# Get the old version, which only makes sense if there was a
# previously stored revision of the object.
if orevid:
rec = self._metadata[oid+orevid]
ovid, onvrevid = struct.unpack('>8s8s', rec[:16])
if ovid == '\0'*8:
# The old revision's vid was zero any version is okay.
# But if we're storing this on a version, then the
# non-version revid will be the previous revid for the
# object.
if version:
nvrevid = orevid
elif ovid <> vid:
# The old revision was on a version different than the
# current version. That's a no no.
raise POSException.VersionLockError(
'version mismatch for object %s (was: %s, got: %s)' %
(oid, ovid, vid))
# Record the update to this object in the commit log.
self._commitlog.write_object(oid, vid, nvrevid, data, oserial)
finally:
self._lock_release()
# Return our cached serial number for the object.
return serial return serial
def supportsUndo(self): return 1 def _decref(self, oid, lrevid, txn):
def supportsVersions(self): return 1 # Decref the reference count of the pickle pointed to by oid+lrevid.
# If the reference count goes to zero, we can garbage collect the
def _finish(self, tid, u, d, e): # pickle, and decref all the object pointed to by the pickle (with of
txn=self._env.txn_begin() # course, cascading garbage collection).
try: key = oid + lrevid
tmp=self._tmp refcount = self._pickleReferenceCount.get(key, txn=txn)
ltmp=tmp.tell() refcount = utils.U64(refcount) - 1
if not ltmp: return if refcount > 0:
load=marshal.load self._pickleReferenceCount.put(key, utils.p64(refcount), txn=txn)
tid=self._serial return
records_put=self._records.put # The refcount of this pickle has gone to zero, so we need to garbage
pickles_put=self._pickle.put # collect it, and decref all the objects it points to.
current_put=self._current.put self._pickleReferenceCount.delete(key, txn=txn)
transaction_oids_put=self._transaction_oids.put pickle = self._pickles.get(key, txn=txn)
currentVersions_put=self._currentVersions.put collectedOids = []
l=pack(">HI",len(u), len(d)) for roid in referencesf(pickle, []):
self._transactions.put(tid, ' '+l+u+d+e, txn) refcount = self._refcounts.get(roid, txn=txn)
while ltmp: refcount = utils.U64(refcount) - 1
try: op, arg = load(tmp) if refcount > 0:
except EOFError: self._refcounts.put(roid, utils.p64(refcount), txn=txn)
if tmp.tell()==ltmp: ltmp=0 else:
else: raise # This sucker gets garbage collected itself.
else: self._refcounts.delete(roid, txn=txn)
if op=='s': collectedOids.append(roid)
# store data # Now, for all the objects whose refcounts just went to zero, we need
oid, vid, nv, back, data, pre = arg # to recursively decref their pickles.
key=oid+tid for roid in collectedOids:
if data: serial = self._serials.get(roid, txn=txn)
pickles_pud(key, data, txn) # The pickle for this metadata record is pointed to by lrevid
data=tid lrevid = self._metadata.get(roid+serial, txn=txn)[16:24]
else: # Now recurse...
data=back self._decref(roid, lrevid, txn)
records_put(key, vid+nv+data+pre, txn)
if vid != '/0/0/0/0/0/0/0/0':
versions_put(vid, oid, txn)
current_put(oid, tid, txn)
transaction_oids_put(tid, oid, txn)
elif op='d':
# discard a version (part of version commit and abort)
self._currentVersions.delete(arg, txn)
elif op='v':
# save a version definition
vid, version = arg
self._versions.put(vid, version, txn)
self._vids.put(version, vid, txn)
except:
txn.abort()
raise
else:
txn.commit()
def _undoable(self, txn):
txn.abort()
raise POSException.UndoError, 'Undoable transaction'
def undo(self, tid): def undo(self, tid):
# Attempt to undo transaction. NOTE: the current storage interface
# documentation says that this method takes a third argument, which is
# a limit on the number of oids to return. JF says, "Let's get rid of
# the third argument."
c = None # txnOids cursor
oids = []
self._lock_acquire() self._lock_acquire()
try: try:
status = self._transactions[tid][:1] # Make sure the transaction is undoable. If this transaction
if status == 'p': # occurred earlier than a pack operation, it is no longer
raise POSException.UndoError, 'Undoable transaction' # undoable. The status flag indicates its undoability.
status = self._txnMetadata[tid][1]
txn=self._env.txn_begin() if status == PROTECTED_TRANSACTION:
raise POSException.UndoError, 'Transaction cannot be undone'
current=self._current # Create the cursor and begin the transaction
record=self._record zero = '\0'*8
pickle=self._pickle txn = self._env.txn_begin()
currentVersions=self._currentVersions c = self._txnOids.cursor()
unpack=struct.unpack
try: try:
for oid in dups(self._transaction_oids raise POSException.UndoError, 'Undoable transaction' rec = c.set(tid)
while rec:
txn=self._env.txn_begin() oid = rec[1]
oids.append(oid)
current=self._current # Make sure the tid is current
record=self._record if self._serials.get(oid, txn=txn) <> tid:
pickle=self._pickle # BAW: we can only undo the most current revision of
currentVersions=self._currentVersions # the object???
unpack=struct.unpack raise POSException.UndoError(
"Not object's current transaction")
try: key = oid + tid
for oid in dups(self._transaction_oids, tid, txn): # Get the metadata for this object revision, and then
if current.get(oid, txn) != tid: self._undoable(txn) # delete the metadata record.
key=oid+tid vid, nvrevid, lrevid, prevrevid = struct.unpack(
vid, nv, data, pre = unpack("8s8s8s8s", '8s8s8s8s', self._metadata.get(key, txn=txn))
record.get(key, txn)) self._metadata.delete(key, txn=txn)
record.delete(key, txn) # Decref the reference count of the pickle that we're
if data==tid: pickle.delete(key, txn) # pointing to and garbage collect it if the refcount falls
if pre == '\0\0\0\0\0\0\0\0': # to zero.
current.delete(oid, txn) self._decref(oid, lrevid, txn)
# If the prevrevid is zero, then we've just undone the
# creation of this object, so we can get rid of its
# serials record. Otherwise, update the serials record to
# point to the previous revision of the object.
if prevrevid == zero:
self._serials.delete(oid, txn=txn)
else: else:
current.put(oid, pre, txn) self._serials.put(oid, prevrevid, txn=txn)
try: pvid=record.get(oid+pre, txn) prec = self._metadata.get(oid+prevrevid, txn=txn)
except KeyError: self._undoable(txn) # BAW: what does it mean if the metadata for the
if pvid != vid: # previous revision of the object doesn't exist???
if vid != '\0\0\0\0\0\0\0\0': if not prec:
del_dup(currentVersions, vid, oid, txn) raise POSException.UndoError(
if pvid != '\0\0\0\0\0\0\0\0': "No previous revision for object")
currentVersions.put(pvid, oid, txn) pvid = prec[:8]
# If the version for the previous revision of the
self._transactions.delete(tid, txn) # object is different than the current revision of the
self._transaction_oids.delete(tid, txn) # object, then we're undoing past a version creation,
# so we can delete the entry for this vid/oid pair in
# the currentVersions table.
if pvid <> vid:
# Don't delete the non-version revision of the
# object.
if vid <> zero:
tmpc = self._currentVersions.cursor(txn=txn)
try:
rec = tmpc.get_both(vid, oid)
if rec:
tmpc.delete()
finally:
tmpc.close()
if pvid <> zero:
# Make the previous version the current one
self._currentVersions.put(pvid, oid, txn=txn)
# Finally, delete the transaction metadata associated with
# the transaction we just undid.
self._txnMetadata.delete(tid, txn=txn)
self._txnOids.delete(tid, txn=txn)
except: except:
txn.abort() txn.abort()
raise raise
else: else:
txn.commit() txn.commit()
return oids
finally: self._lock_release() finally:
if c:
c.close()
self._lock_release()
def undoLog(self, first, last, filter=None): def undoLog(self, first, last, filter=None):
# Get a list of transaction ids that can be undone, based on the
# determination of the filter. filter is a function which takes a
# transaction id and returns true or false.
#
# Note that this method has been deprecated by undoInfo() which itself
# has some flaws, but is the best we have now. We don't actually need
# to implement undoInfo() because BaseStorage (which we eventually
# inherit from) mixes in the UndoLogCompatible class which provides an
# implementation written in terms of undoLog().
#
c = None # tnxMetadata cursor
txnDescriptions = [] # the return value
i = 0 # first <= i < last
self._lock_acquire() self._lock_acquire()
try: try:
c=self._transactions.cursor() c = self._txnMetadata.cursor()
try: # We start at the last transaction and scan backwards because we
i=0; r=[]; a=r.append # can stop early if we find a transaction that is earlier than a
data=c.get(db.DB_LAST) # pack. We still have the potential to scan through all the
while data and i < last: # transactions.
tid, data = data rec = c.get_last()
status = data[:1] while rec and i < last:
if status == 'p': break tid, data = rec
luser, ldesc = unpack("HI", data[1:17]) status = data[0]
user=data[17:luser+17] if status == PROTECTED_TRANSACTION:
desc=data[luser+17:luser+17+ldesc] break
ext=data[luser+17+ldesc:] userlen, desclen = struct.unpack('>II', data[1:17])
user = data[17:17+userlen]
data={'id': tid, desc = data[17+userlen:17+userlen+desclen]
'time': TimeStamp(tid).timeTime(), ext = data[17+userlen+desclen:]
'user_name': user or '', # Create a dictionary for the TransactionDescription
'description': desc or '', txndesc = {'id' : tid,
} 'time' : TimeStamp(tid).timeTime(),
if ext: 'user_name' : user,
try: 'description': desc,
ext=loads(ext) }
data.update(ext) # The extension stuff is a picklable mapping, so if we can
except: pass # unpickle it, we update the TransactionDescription dictionary
# with that data. BAW: The bare except is moderately
if filter is None or filter(data): # disgusting, but I'm too lazy to figure out what exceptions
if i >= first: a(data) # could actually be raised here...
i=i+1 if ext:
try:
data=c.get(db.DB_PREV) txndesc.update(pickle.loads(ext))
except:
return r pass
# Now call the filter to see if this transaction should be
finally: c.close() # added to the return list...
finally: self._lock_release() if filter is None or filter(txndesc):
# ...and see if this is within the requested ordinals
if i >= first:
txnDescriptions.append(txndesc)
i = i + 1
# And get the previous record
rec = c.get_prev()
return txnDescriptions
finally:
if c:
c.close()
self._lock_release()
def versionEmpty(self, version): def versionEmpty(self, version):
# Return true if version is empty.
self._lock_acquire() self._lock_acquire()
try: try:
try: self._currentVersions[self._vids[version]] # Let these KeyError exceptions percolate up
except KeyError: return 1 vid = self._vids[version]
else: return 0 # But catch these, because it means the version is empty
finally: self._lock_release() if self._currentVersions.has_key(vid):
return 1
else:
return 0
finally:
self._lock_release()
def versions(self, max=None): def versions(self, max=None):
# Return the list of current versions, as strings, up to the maximum
# requested.
self._lock_acquire() self._lock_acquire()
c = None
try: try:
c=self._currentVersions.cursor() c = self._currentVersions.cursor()
try: rec = c.first()
try: data=c.get(db.DB_NEXT_NODUP) retval = []
except: return () while rec and (max is None or max > 0):
r=[]; a=r.append # currentVersions maps vids to [oid]'s so dig the key out of
while data: # the returned record and look the vid up in the
a(data[0]) # vids->versions table.
data=c.get(db.DB_NEXT_NODUP) retval.append(self._versions[rec[0]])
# Since currentVersions has duplicates (i.e. multiple vid keys
return r # with different oids), get the next record that has a
# different key than the current one.
finally: c.close() rec = c.next_nodup()
finally: self._lock_release() if max is not None:
max = max - 1
return retval
finally:
if c:
c.close()
self._lock_release()
def history(self, oid, version=None, length=1, filter=None): def history(self, oid, version=None, length=1, filter=None):
# FIXME
self._lock_acquire() self._lock_acquire()
try: try:
tid=self._current[oid] tid=self._current[oid]
finally: self._lock_release() finally: self._lock_release()
def pack(self, t, referencesf): def pack(self, t, referencesf):
# FIXME
self._lock_acquire() self._lock_acquire()
try: try:
pass
finally: self._lock_release() finally: self._lock_release()
# Other interface assertions
def supportsUndo(self):
return 1
class dups: def supportsVersions(self):
"""Iterator for duplicate-record databases" return 1
def __init__(self, db, key, txn=0):
if txn==0:
c=db.cursor()
else:
c=db.cursor(txn)
self._v=c.set(key)
self._g=c.get
self._i=0
def __getitem__(self, index):
i=self._i
if index==i: return self._v
if index < i or i < 0: raise IndexError, index
while i < index:
v=self._g(db.DB_NEXT_DUP)
if v:
i=i+1
else:
self._i=-1
raise IndexError, index
self._i=i
self._v=v
return v
def del_dup(database, key, value, txn):
c=database.cursor(txn)
try:
c.getBoth(key, value)
c.delete()
finally:
c.close()
class Log:
def __init__(self, file):
self._file=file
file.seek(0)
h=file.read(5)
size=0
if len(h) == 5:
state=h[0]
if state=='c':
size=unpack(">i", h[1:])
else:
state='s'
file.seek(0)
file.write('s\0\0\0\0')
self._state=state
self._size=size
def clear(self):
if self._state=='p':
raise "Can't clear state with uncommitted promised log"
self._file.seek(0)
self._file.write('s')
self._size=0
def promise(self):
self._state='p'
size=self._file.tell()-5
self._file.seek(0)
self._file.write('p'+pack(">i", size))
self._size=size
def commit(self):
file=self._file
l=file.tell()-1
if l:
self._file.seek(0)
self._file.write('c')
self._state='c'
def store(self, oid, vid, nv, dataptr, pickle, previous,
dump=marshal.dump):
dump(('s',(oid,vid,nv,data,pickle,previous)), self._file)
def storeNV(self, oid, data, tid,
dump=marshal.dump, zero='\0\0\0\0\0\0\0\0'):
dump(('s',(oid,zero,zero,data,'',tid)), self._file)
def versionDiscard(self, vid,
dump=marshal.dump):
dump(('d',(vid)), self._file)
def newVersion(self, version, vid,
dump=marshal.dump):
dump(('v',(version, vid)), self._file)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment