Commit ea0016e4 authored by Jeremy Hylton's avatar Jeremy Hylton

Use new fspack.

parent 3ace55e7
......@@ -115,7 +115,7 @@
# may have a back pointer to a version record or to a non-version
# record.
#
__version__='$Revision: 1.130 $'[11:-2]
__version__='$Revision: 1.131 $'[11:-2]
import base64
from cPickle import Pickler, Unpickler, loads
......@@ -135,6 +135,7 @@ from ZODB.POSException import UndoError, POSKeyError, MultipleUndoErrors
from ZODB.TimeStamp import TimeStamp
from ZODB.lock_file import LockFile
from ZODB.utils import p64, u64, cp, z64
from ZODB.fspack import FileStoragePacker
try:
from ZODB.fsIndex import fsIndex
......@@ -1430,382 +1431,57 @@ class FileStorage(BaseStorage.BaseStorage,
Also, data back pointers that point before packtss are resolved and
the associated data are copied, since the old records are not copied.
"""
if self._is_read_only:
raise POSException.ReadOnlyError()
# Ugh, this seems long
packing=1 # are we in the packing phase (or the copy phase)
locked=0
_lock_acquire=self._lock_acquire
_lock_release=self._lock_release
_commit_lock_acquire=self._commit_lock_acquire
_commit_lock_release=self._commit_lock_release
index, vindex, tindex, tvindex = self._newIndexes()
name=self.__name__
file=open(name, 'rb')
stop=`apply(TimeStamp, time.gmtime(t)[:5]+(t%60,))`
if stop==z64: raise FileStorageError, 'Invalid pack time'
# If the storage is empty, there's nothing to do.
if not self._index:
return
# Record pack time so we don't undo while packing
_lock_acquire()
self._lock_acquire()
try:
if self._packt != z64:
# Already packing.
raise FileStorageError, 'Already packing'
self._packt = stop
self._packt = None
finally:
_lock_release()
self._lock_release()
p = FileStoragePacker(self._file_name, stop,
self._lock_acquire, self._lock_release,
self._commit_lock_acquire,
self._commit_lock_release)
try:
##################################################################
# Step 1, get index as of pack time that
# includes only referenced objects.
packpos, maxoid, ltid = read_index(
file, name, index, vindex, tindex, stop,
read_only=1,
)
if packpos == 4:
return
if self._redundant_pack(file, packpos):
opos = p.pack()
if opos is None:
return
rootl=[z64]
pop=rootl.pop
pindex=fsIndex()
referenced=pindex.has_key
_load=self._load
_loada=self._loada
v=None
while rootl:
oid=pop()
if referenced(oid): continue
try:
p, v, nv = _loada(oid, index, file)
referencesf(p, rootl)
if nv:
p, serial = _load(oid, '', index, file)
referencesf(p, rootl)
pindex[oid]=index[oid]
except KeyError:
pindex[oid]=0
error('Bad reference to %s', `(oid,v)`)
# XXX This try/except frequently masks bugs in the
# implementation.
##################################################################
# Step 2, copy data and compute new index based on new positions.
index, vindex, tindex, tvindex = self._newIndexes()
ofile=open(name+'.pack', 'w+b')
# Index for non-version data. This is a temporary structure
# to reduce I/O during packing
nvindex=fsIndex()
# Cache a bunch of methods
seek=file.seek
read=file.read
oseek=ofile.seek
write=ofile.write
index_get=index.get
vindex_get=vindex.get
pindex_get=pindex.get
# Initialize,
pv=z64
offset=0L # the amount of space freed by packing
pos=opos=4L
oseek(0)
write(packed_version)
# Copy the data in two stages. In the packing stage,
# we skip records that are non-current or that are for
# unreferenced objects. We also skip undone transactions.
#
# After the packing stage, we copy everything but undone
# transactions, however, we have to update various back pointers.
# We have to have the storage lock in the second phase to keep
# data from being changed while we're copying.
pnv=None
while 1:
# Check for end of packed records
if packing and pos >= packpos:
# OK, we're done with the old stuff, now we have
# to get the lock so we can copy the new stuff!
offset=pos-opos
if offset <= 0:
# we didn't free any space, there's no point in
# continuing
ofile.close()
file.close()
os.remove(name+'.pack')
return
packing=0
_commit_lock_acquire()
_lock_acquire()
locked=1
self._packt=None # Prevent undo until we're done
# Read the transaction record
seek(pos)
h=read(TRANS_HDR_LEN)
if len(h) < TRANS_HDR_LEN: break
tid, stl, status, ul, dl, el = unpack(TRANS_HDR,h)
if status=='c':
# Oops. we found a checkpoint flag.
break
tl=u64(stl)
tpos=pos
tend=tpos+tl
if status=='u':
if not packing:
# We rely below on a constant offset for unpacked
# records. This assumption holds only if we copy
# undone unpacked data. This is lame, but necessary
# for now to squash a bug.
write(h)
tl=tl+8
write(read(tl-TRANS_HDR_LEN))
opos=opos+tl
# Undone transaction, skip it
pos=tend+8
continue
otpos=opos # start pos of output trans
# write out the transaction record
status=packing and 'p' or ' '
write(h[:16]+status+h[17:])
thl=ul+dl+el
h=read(thl)
if len(h) != thl:
raise PackError(opos)
write(h)
thl=TRANS_HDR_LEN+thl
pos=tpos+thl
opos=otpos+thl
while pos < tend:
# Read the data records for this transaction
seek(pos)
h=read(DATA_HDR_LEN)
oid,serial,sprev,stloc,vlen,splen = unpack(
DATA_HDR, h)
plen=u64(splen)
dlen=DATA_HDR_LEN+(plen or 8)
if vlen:
dlen=dlen+(16+vlen)
if packing and pindex_get(oid, 0) != pos:
# This is not the most current record, or
# the oid is no longer referenced so skip it.
pos=pos+dlen
continue
pnv=u64(read(8))
# skip position of previous version record
seek(8,1)
version=read(vlen)
pv=p64(vindex_get(version, 0))
vindex[version]=opos
else:
if packing:
ppos=pindex_get(oid, 0)
if ppos != pos:
if not ppos:
# This object is no longer referenced
# so skip it.
pos=pos+dlen
continue
# This is not the most current record
# But maybe it's the most current committed
# record.
seek(ppos)
ph=read(DATA_HDR_LEN)
pdoid,ps,pp,pt,pvlen,pplen = unpack(
DATA_HDR, ph)
if not pvlen:
# The most current record is committed, so
# we can toss this one
pos=pos+dlen
continue
pnv=read(8)
pnv=_loadBackPOS(file, oid, pnv)
if pnv > pos:
# The current non version data is later,
# so this isn't the current record
pos=pos+dlen
continue
# Ok, we've gotten this far, so we have
# the current record and we're ready to
# read the pickle, but we're in the wrong
# place, after wandering around to figure
# out is we were current. Seek back
# to pickle data:
seek(pos+DATA_HDR_LEN)
nvindex[oid]=opos
tindex[oid]=opos
opos=opos+dlen
pos=pos+dlen
if plen:
p=read(plen)
else:
p=read(8)
if packing:
# When packing we resolve back pointers!
p, serial = _loadBack(file, oid, p)
plen=len(p)
opos=opos+plen-8
splen=p64(plen)
else:
p=u64(p)
if p < packpos:
# We have a backpointer to a
# non-packed record. We have to be
# careful. If we were pointing to a
# current record, then we should still
# point at one, otherwise, we should
# point at the last non-version record.
ppos=pindex_get(oid, 0)
if ppos:
if ppos==p:
# we were pointing to the
# current record
p=index[oid]
else:
p=nvindex[oid]
else:
# Oops, this object was modified
# in a version in which it was deleted.
# Hee hee. It doesn't matter what we
# use cause it's not reachable any more.
p=0
else:
# This points back to a non-packed record.
# Just adjust for the offset
p=p-offset
p=p64(p)
sprev=p64(index_get(oid, 0))
write(pack(DATA_HDR,
oid,serial,sprev,p64(otpos),vlen,splen))
if vlen:
if not pnv:
write(z64)
else:
if pnv < packpos:
# we need to point to the packed
# non-version rec
pnv=nvindex[oid]
else:
# we just need to adjust the pointer
# with the offset
pnv=pnv-offset
write(p64(pnv))
write(pv)
write(version)
write(p)
# skip the (intentionally redundant) transaction length
pos=pos+8
if locked:
# temporarily release the lock to give other threads
# a chance to do some work!
_commit_lock_release()
_lock_release()
locked=0
index.update(tindex) # Record the position
tindex.clear()
# Now, maybe we need to hack or delete the transaction
otl=opos-otpos
if otl != tl:
# Oops, what came out is not what came in!
# Check for empty:
if otl==thl:
# Empty, slide back over the header:
opos=otpos
oseek(opos)
else:
# Not empty, but we need to adjust transaction length
# and update the status
oseek(otpos+8)
otl=p64(otl)
write(otl+status)
oseek(opos)
write(otl)
opos=opos+8
else:
write(p64(otl))
opos=opos+8
if not packing:
# We are in the copying phase. We need to get the lock
# again to avoid someone writing data while we read it.
_commit_lock_acquire()
_lock_acquire()
locked=1
# OK, we've copied everything. Now we need to wrap things
# up.
# Hack the files around.
name=self.__name__
ofile.flush()
ofile.close()
file.close()
oldpath = self._file_name + ".old"
self._file.close()
try:
if os.path.exists(name+'.old'):
os.remove(name+'.old')
os.rename(name, name+'.old')
except:
# Waaa
self._file=open(name,'r+b')
if os.path.exists(oldpath):
os.remove(oldpath)
os.rename(self._file_name, oldpath)
except Exception, msg:
self._file = open(self._file_name, 'r+b')
raise
# OK, we're beyond the point of no return
os.rename(name+'.pack', name)
self._file=open(name,'r+b')
self._initIndex(index, vindex, tindex, tvindex)
self._pos=opos
os.rename(self._file_name + '.pack', self._file_name)
self._file = open(self._file_name, 'r+b')
self._initIndex(p.index, p.vindex, p.tindex, p.tvindex)
self._pos = opos
self._save_index()
finally:
if locked:
_commit_lock_release()
_lock_release()
_lock_acquire()
self._packt=z64
_lock_release()
if p.locked:
self._commit_lock_release()
self._lock_release()
self._lock_acquire()
self._packt = z64
self._lock_release()
def iterator(self, start=None, stop=None):
return FileIterator(self._file_name, start, stop)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment