FileStorage.py 71.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
##############################################################################
# 
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
# 
# Copyright (c) Digital Creations.  All rights reserved.
# 
# This license has been certified as Open Source(tm).
# 
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
# 
# 1. Redistributions in source code must retain the above copyright
#    notice, this list of conditions, and the following disclaimer.
# 
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions, and the following disclaimer in
#    the documentation and/or other materials provided with the
#    distribution.
# 
# 3. Digital Creations requests that attribution be given to Zope
#    in any manner possible. Zope includes a "Powered by Zope"
#    button that is installed by default. While it is not a license
#    violation to remove this button, it is requested that the
#    attribution remain. A significant investment has been put
#    into Zope, and this effort will continue if the Zope community
#    continues to grow. This is one way to assure that growth.
# 
# 4. All advertising materials and documentation mentioning
#    features derived from or use of this software must display
#    the following acknowledgement:
# 
#      "This product includes software developed by Digital Creations
#      for use in the Z Object Publishing Environment
#      (http://www.zope.org/)."
# 
#    In the event that the product being advertised includes an
#    intact Zope distribution (with copyright and license included)
#    then this clause is waived.
# 
# 5. Names associated with Zope or Digital Creations must not be used to
#    endorse or promote products derived from this software without
#    prior written permission from Digital Creations.
# 
# 6. Modified redistributions of any form whatsoever must retain
#    the following acknowledgment:
# 
#      "This product includes software developed by Digital Creations
#      for use in the Z Object Publishing Environment
#      (http://www.zope.org/)."
# 
#    Intact (re-)distributions of any official Zope release do not
#    require an external acknowledgement.
# 
# 7. Modifications are encouraged but must be packaged separately as
#    patches to official Zope releases.  Distributions that do not
#    clearly separate the patches from the original work must be clearly
#    labeled as unofficial distributions.  Modifications which do not
#    carry the name Zope may be packaged in any form, as long as they
#    conform to all of the clauses above.
# 
# 
# Disclaimer
# 
#   THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
#   EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
#   IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
#   PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
#   CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
#   USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
#   ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
#   OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
#   OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
#   SUCH DAMAGE.
# 
# 
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations.  Specific
# attributions are listed in the accompanying credits file.
# 
##############################################################################
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
# 
#  File-based ZODB storage
# 
# Files are arranged as follows.
# 
#   - The first 4 bytes are a file identifier.
#   
#   - The rest of the file consists of a sequence of transaction
#     "records".
# 
# A transaction record consists of:
# 
#   - 8-byte transaction id, which is also a time stamp.
#   
#   - 8-byte transaction record length - 8.
#   
#   - 1-byte status code
#   
#   - 2-byte length of user name
#   
#   - 2-byte length of description 
#   
107
#   - 2-byte length of extension attributes 
108 109 110 111
#   
#   -   user name
#   
#   -   description
112 113
#
#   -   extension attributes
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
# 
#   * A sequence of data records
#   
#   - 8-byte redundant transaction length -8
# 
# A data record consists of
# 
#   - 8-byte oid.
# 
#   - 8-byte serial, which is a type stamp that matches the
#     transaction timestamp.
# 
#   - 8-byte previous-record file-position.
# 
#   - 8-byte beginning of transaction record file position.
# 
#   - 2-byte version length
# 
#   - 8-byte data length
# 
#   ? 8-byte position of non-version data
#     (if version length > 0)
# 
#   ? 8-byte position of previous record in this version
#     (if version length > 0)
# 
#   ?   version string 
#     (if version length > 0)
# 
#   ?   data
#     (data length > 0)
# 
#   ? 8-byte position of data record containing data
#     (data length == 0)
# 
# Note that the lengths and positions are all big-endian.
# Also, the object ids time stamps are big-endian, so comparisons
# are meaningful.
# 
# Version handling
# 
#   There isn't a separate store for versions.  Each record has a
#   version field, indicating what version it is in.  The records in a
#   version form a linked list.  Each record that has a non-empty
#   version string has a pointer to the previous record in the version.
#   Version back pointers are retained *even* when versions are
#   committed or aborted or when transactions are undone.
# 
#   There is a notion of "current" version records, which are the
#   records in a version that are the current records for their
#   respective objects.  When a version is comitted, the current records
#   are committed to the destination version.  When a version is
#   aborted, the current records are aborted.
# 
#   When committing or aborting, we search backward through the linked
#   list until we find a record for an object that does not have a
#   current record in the version.  If we find a record for which the
#   non-version pointer is the same as the previous pointer, then we
#   forget that the corresponding object had a current record in the
#   version. This strategy allows us to avoid searching backward through
#   previously committed or aborted version records.
# 
#   Of course, we ignore records in undone transactions when committing
#   or aborting.
#
# Backpointers
#
#   When we commit or abort a version, we don't copy (or delete)
#   and data.  Instead, we write records with back pointers.
#
#   A version record *never* has a back pointer to a non-version
#   record, because we never abort to a version.  A non-version record
#   may have a back pointer to a version record or to a non-version
#   record.
#
189
__version__='$Revision: 1.68 $'[11:-2]
Jim Fulton's avatar
alpha1  
Jim Fulton committed
190

191
import struct, time, os, bpthread, string, base64, sys
Jim Fulton's avatar
alpha1  
Jim Fulton committed
192
from struct import pack, unpack
193
from cPickle import loads
194
import POSException
Jeremy Hylton's avatar
Jeremy Hylton committed
195
from POSException import UndoError
Jim Fulton's avatar
Jim Fulton committed
196
from TimeStamp import TimeStamp
197
from lock_file import lock_file
198
from utils import t32, p64, U64, cp
199 200
from zLOG import LOG, WARNING, ERROR, PANIC, register_subsystem
register_subsystem('ZODB FS')
201
import BaseStorage
202
from cPickle import Pickler, Unpickler
203
import ConflictResolution
Jim Fulton's avatar
Jim Fulton committed
204

205 206 207
try: from posix import fsync
except: fsync=None

Jeremy Hylton's avatar
Jeremy Hylton committed
208
from types import StringType
209

Jim Fulton's avatar
Jim Fulton committed
210 211
z64='\0'*8

212 213
def warn(message, *data):
    LOG('ZODB FS',WARNING, "%s  warn: %s\n" % (packed_version, (message % data)))
Jim Fulton's avatar
Jim Fulton committed
214

215 216
def error(message, *data):
    LOG('ZODB FS',ERROR,"%s ERROR: %s\n" % (packed_version, (message % data)))
Jim Fulton's avatar
Jim Fulton committed
217

218 219 220
def nearPanic(message, *data):
    LOG('ZODB FS',PANIC,"%s ERROR: %s\n" % (packed_version, (message % data)))

221
def panic(message, *data):
Jim Fulton's avatar
Jim Fulton committed
222
    message=message%data
223
    LOG('ZODB FS',PANIC,"%s ERROR: %s\n" % (packed_version, message))
224
    raise CorruptedTransactionError, message
Jim Fulton's avatar
alpha1  
Jim Fulton committed
225

226
class FileStorageError(POSException.StorageError): pass
Jim Fulton's avatar
alpha1  
Jim Fulton committed
227

228
class FileStorageFormatError(FileStorageError):
Jim Fulton's avatar
alpha1  
Jim Fulton committed
229 230 231 232 233 234 235 236 237 238 239 240 241
    """Invalid file format

    The format of the given file is not valid
    """

class CorruptedFileStorageError(FileStorageError,
                                POSException.StorageSystemError):
    """Corrupted file storage
    """

class CorruptedTransactionError(CorruptedFileStorageError): pass
class CorruptedDataError(CorruptedFileStorageError): pass

242
class FileStorageQuotaError(FileStorageError,
Jeremy Hylton's avatar
Jeremy Hylton committed
243
                            POSException.StorageSystemError):
244 245 246
    """File storage quota exceeded
    """

247
packed_version='FS21'
Jim Fulton's avatar
Jim Fulton committed
248

249 250
class FileStorage(BaseStorage.BaseStorage,
                  ConflictResolution.ConflictResolvingStorage):
251
    _packt=z64
Jim Fulton's avatar
Jim Fulton committed
252

253 254
    def __init__(self, file_name, create=0, read_only=0, stop=None,
                 quota=None):
Jim Fulton's avatar
alpha1  
Jim Fulton committed
255

256 257
        if not os.path.exists(file_name): create = 1

Jim Fulton's avatar
Jim Fulton committed
258
        if read_only:
259
            if create: raise ValueError, "can\'t create a read-only file"
Jim Fulton's avatar
Jim Fulton committed
260 261 262 263
        elif stop is not None:
            raise ValueError, "time-travel is only supported in read-only mode"

        if stop is None: stop='\377'*8
264

265
        # Lock the database and set up the temp file.
266 267 268 269 270 271 272 273 274
        if not read_only:
            try: f=open(file_name+'.lock', 'r+')
            except: f=open(file_name+'.lock', 'w+')
            lock_file(f)
            try:
                f.write(str(os.getpid()))
                f.flush()
            except: pass
            self._lock_file=f # so it stays open
275

276 277 278 279 280 281
            self._tfile=open(file_name+'.tmp','w+b')

        else:

            self._tfile=None

282 283
        self._file_name=file_name

284 285
        BaseStorage.BaseStorage.__init__(self, file_name)

286
        index, vindex, tindex, tvindex = self._newIndexes()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
287

288 289
        self._initIndex(index, vindex, tindex, tvindex)
        
Jim Fulton's avatar
alpha1  
Jim Fulton committed
290 291 292
        # Now open the file
        
        if create:
293 294
            if os.path.exists(file_name):
                os.remove(file_name)
Jim Fulton's avatar
Jim Fulton committed
295
            file=open(file_name,'w+b')
296 297 298
            file.write(packed_version)
        else:
            file=open(file_name, read_only and 'rb' or 'r+b')
Jim Fulton's avatar
Jim Fulton committed
299

Jim Fulton's avatar
alpha1  
Jim Fulton committed
300
        self._file=file
301 302 303 304 305 306 307 308

        r=self._restore_index()
        if r:
            index, vindex, start, maxoid, ltid = r
            self._initIndex(index, vindex, tindex, tvindex)
            self._pos, self._oid, tid = read_index(
                file, file_name, index, vindex, tindex, stop,
                ltid=ltid, start=start, maxoid=maxoid,
Jim Fulton's avatar
Jim Fulton committed
309
                read_only=read_only,
310
                )
311
        else:
312
            self._pos, self._oid, tid = read_index(
Jim Fulton's avatar
Jim Fulton committed
313 314 315
                file, file_name, index, vindex, tindex, stop,
                read_only=read_only,
                )
Jim Fulton's avatar
Jim Fulton committed
316 317 318 319

        self._ts=tid=TimeStamp(tid)
        t=time.time()
        t=apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
320 321 322 323 324
        if tid > t:
            warn("%s Database records in the future", file_name);
            if tid.timeTime() - t.timeTime() > 86400*30:
                # a month in the future? This is bogus, use current time
                self._ts=t
325 326

        self._quota=quota
Jim Fulton's avatar
Jim Fulton committed
327
            
Jim Fulton's avatar
alpha1  
Jim Fulton committed
328

329 330 331 332 333 334 335 336
    def _initIndex(self, index, vindex, tindex, tvindex):
        self._index=index
        self._vindex=vindex
        self._tindex=tindex
        self._tvindex=tvindex
        self._index_get=index.get
        self._vindex_get=vindex.get

Jim Fulton's avatar
alpha1  
Jim Fulton committed
337 338
    def __len__(self): return len(self._index)

339
    def _newIndexes(self): return {}, {}, {}, {}
340 341
        
    def abortVersion(self, src, transaction):
342
        return self.commitVersion(src, '', transaction, abort=1)
Jim Fulton's avatar
Jim Fulton committed
343

344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
    def _save_index(self):
        """Write the database index to a file to support quick startup
        """
        
        index_name=self.__name__+'.index'
        tmp_name=index_name+'.index_tmp'

        f=open(tmp_name,'wb')
        p=Pickler(f,1)

        info={'index': self._index, 'pos': self._pos,
              'oid': self._oid, 'vindex': self._vindex}

        p.dump(info)
        f.flush()
        f.close()
        try:
361 362 363 364
            try:
                os.remove(index_name)
            except OSError:
                pass
365 366 367
            os.rename(tmp_name, index_name)
        except: pass

368 369 370
    def _clear_index(self):
        index_name=self.__name__+'.index'
        if os.path.exists(index_name):
371 372 373 374
            try:
                os.remove(index_name)
            except OSError:
                pass
375

376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
    def _sane(self, index, pos):
        """Sanity check saved index data by reading the last undone trans

        Basically, we read the last not undone transaction and
        check to see that the included records are consistent
        with the index.  Any invalid record records or inconsistent
        object positions cause zero to be returned.

        """
        if pos < 100: return 0
        file=self._file
        seek=file.seek
        read=file.read
        seek(0,2)
        if file.tell() < pos: return 0
        ltid=None

        while 1:
            seek(pos-8)
            rstl=read(8)
396
            tl=U64(rstl)
397 398 399 400 401 402 403 404
            pos=pos-tl-8
            if pos < 4: return 0
            seek(pos)
            tid, stl, status, ul, dl, el = unpack(">8s8scHHH", read(23))
            if not ltid: ltid=tid
            if stl != rstl: return 0 # inconsistent lengths
            if status == 'u': continue # undone trans, search back
            if status not in ' p': return 0
405
            if tl < (23+ul+dl+el): return 0
406
            tend=pos+tl
407
            opos=pos+(23+ul+dl+el)
408 409 410 411 412 413 414
            if opos==tend: continue # empty trans

            while opos < tend:
                # Read the data records for this transaction    
                seek(opos)
                h=read(42)
                oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
415 416
                tloc=U64(stloc)
                plen=U64(splen)
417 418
                
                dlen=42+(plen or 8)
419
                if vlen: dlen=dlen+(16+vlen)
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
    
                if opos+dlen > tend or tloc != pos: return 0

                if index.get(oid,0) != opos: return 0
    
                opos=opos+dlen

            return ltid

    def _restore_index(self):
        """Load the database index from a file to support quick startup
        """
        file_name=self.__name__
        index_name=file_name+'.index'
        
        try: f=open(index_name,'rb')
        except: return None
        
        p=Unpickler(f)

440 441 442 443 444 445 446
        try:
            info=p.load()
        except:
            exc, err, tb = sys.exc_info()
            warn("Failed to load database index: %s: %s" %
                 (exc, err))
            return None
447
        index=info.get('index', None)
448
        pos=info.get('pos', None)
449 450 451 452
        oid=info.get('oid', None)
        vindex=info.get('vindex', None)
        if index is None or pos is None or oid is None or vindex is None:
            return None
453
        pos = long(pos)
454 455 456 457 458 459

        tid=self._sane(index, pos)
        if not tid: return None
        
        return index, vindex, pos, oid, tid

Jim Fulton's avatar
Jim Fulton committed
460 461
    def close(self):
        self._file.close()
462 463
        if hasattr(self,'_lock_file'):  self._lock_file.close()
        if self._tfile:                 self._tfile.close()
464 465
        try: self._save_index()
        except: pass # We don't care if this fails.
Jim Fulton's avatar
alpha1  
Jim Fulton committed
466
        
467
    def commitVersion(self, src, dest, transaction, abort=None):
468
        # We are going to commit by simply storing back pointers.
469 470 471 472 473
        if (not src or
            type(src) is not StringType or type(dest) is not StringType
            ):
            raise POSException.VersionCommitError('Invalid source version')

474 475 476 477
        if src == dest:
            raise POSException.VersionCommitError(
                "Can't commit to same version: %s" % repr(src))

478
        if dest and abort:
479 480
            raise POSException.VersionCommitError(
                "Internal error, can't abort to a version")
481
        
Jim Fulton's avatar
Jim Fulton committed
482 483
        if transaction is not self._transaction:
            raise POSException.StorageTransactionError(self, transaction)
484
        
485
        self._lock_acquire()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
486 487 488
        try:
            file=self._file
            read=file.read
489 490 491
            seek=file.seek
            tfile=self._tfile
            write=tfile.write
492
            tindex=self._tindex
Jim Fulton's avatar
alpha1  
Jim Fulton committed
493
            index=self._index
494
            index_get=index.get
Jim Fulton's avatar
alpha1  
Jim Fulton committed
495

496
            srcpos=self._vindex_get(src, 0)
497 498 499 500
            spos=p64(srcpos)
            middle=struct.pack(">8sH8s", p64(self._pos), len(dest), z64)

            if dest:
501
                sd=p64(self._vindex_get(dest, 0))
502 503 504 505 506
                heredelta=66+len(dest)
            else:
                sd=''
                heredelta=50
                        
507
            here=self._pos+(tfile.tell()+self._thl)
508 509
            oids=[]
            appoids=oids.append
510
            tvindex=self._tvindex
511 512 513 514 515
            current_oids={}
            current=current_oids.has_key
            t=None
            tstatus=' '

516 517 518 519
            while srcpos:
                seek(srcpos)
                h=read(58) # oid, serial, prev(oid), tloc, vlen, plen, pnv, pv
                oid=h[:8]
520
                pnv=h[-16:-8]
521
                if index_get(oid, None) == srcpos:
522
                    # This is a current record!
523
                    tindex[oid]=here
524 525 526
                    appoids(oid)
                    write(h[:16] + spos + middle)
                    if dest:
527
                        tvindex[dest]=here
528
                        write(pnv+sd+dest)
529 530
                        sd=p64(here)

531
                    write(abort and pnv or spos) # data backpointer to src data
532
                    here=here+heredelta
533

534
                    current_oids[oid]=1
535 536 537 538 539 540 541 542 543 544 545

                else:
                    # Hm.  This is a non-current record.  Is there a
                    # current record for this oid?
                    if not current(oid):
                        # Nope. We're done *if* this transaction wasn't undone.
                        tloc=h[24:32]
                        if t != tloc:
                            # We haven't checked this transaction before,
                            # get it's status.
                            t=tloc
546
                            seek(U64(t)+16)
547 548 549 550 551
                            tstatus=read(1)
                            
                        if tstatus != 'u':
                            # Yee ha! We can quit
                            break
Jim Fulton's avatar
Jim Fulton committed
552

Jim Fulton's avatar
Jim Fulton committed
553
                spos=h[-8:]
554
                srcpos=U64(spos)
555 556

            return oids
Jim Fulton's avatar
Jim Fulton committed
557

558
        finally: self._lock_release()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
559 560

    def getSize(self): return self._pos
561 562 563 564 565 566 567 568 569

    def _loada(self, oid, _index, file):
        "Read any version and return the version"
        pos=_index[oid]
        file.seek(pos)
        read=file.read
        h=read(42)
        doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
        if vlen:
570 571
            nv = read(8) != z64
            file.seek(8,1) # Skip previous version record pointer
572 573 574
            version=read(vlen)
        else:
            version=''
575
            nv=0
576

577
        if plen != z64: return read(U64(plen)), version, nv
578
        return _loadBack(file, oid, read(8))[0], version, nv
579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596

    def _load(self, oid, version, _index, file):
        pos=_index[oid]
        file.seek(pos)
        read=file.read
        h=read(42)
        doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
        if doid != oid: raise CorruptedDataError, h
        if vlen:
            pnv=read(8) # Read location of non-version data
            if (not version or len(version) != vlen or
                (read(8) # skip past version link
                 and version != read(vlen))
                ):
                return _loadBack(file, oid, pnv)

        # If we get here, then either this was not a version record,
        # or we've already read past the version data!
597
        if plen != z64: return read(U64(plen)), serial
598 599 600 601
        pnv=read(8)
        # We use the current serial, since that is the one that
        # will get checked when we store.
        return _loadBack(file, oid, pnv)[0], serial
Jim Fulton's avatar
alpha1  
Jim Fulton committed
602 603

    def load(self, oid, version, _stuff=None):
604
        self._lock_acquire()
605
        try: return self._load(oid, version, self._index, self._file)
606
        finally: self._lock_release()
607 608

    def loadSerial(self, oid, serial):
609 610 611 612 613 614 615 616 617 618 619 620 621 622
        self._lock_acquire()
        try:
            _index=self._index
            file=self._file
            seek=file.seek
            read=file.read
            pos=_index[oid]
            while 1:
                seek(pos)
                h=read(42)
                doid,dserial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
                if doid != oid: raise CorruptedDataError, h
                if dserial == serial: break # Yeee ha!
                # Keep looking for serial
623
                pos=U64(prev)
624 625
                if not pos: raise KeyError, serial
                continue
626

627 628 629 630
            if vlen:
                pnv=read(8) # Read location of non-version data
                read(8) # skip past version link
                read(vlen) # skip version
631

632
            if plen != z64: return read(U64(plen))
633 634 635 636 637

            # We got a backpointer, probably from a commit.
            pnv=read(8)
            return _loadBack(file, oid, pnv)[0]
        finally: self._lock_release()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
638 639
                    
    def modifiedInVersion(self, oid):
640
        self._lock_acquire()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
641 642 643
        try:
            pos=self._index[oid]
            file=self._file
644 645
            seek=file.seek
            seek(pos)
Jim Fulton's avatar
Jim Fulton committed
646 647
            doid,serial,prev,tloc,vlen = unpack(">8s8s8s8sH", file.read(34))
            if doid != oid:
Jim Fulton's avatar
Jim Fulton committed
648
                raise CorruptedDataError, pos
Jim Fulton's avatar
alpha1  
Jim Fulton committed
649
            if vlen:
650 651
                seek(24,1) # skip plen, pnv, and pv
                return file.read(vlen)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
652
            return ''
653
        finally: self._lock_release()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
654

Jim Fulton's avatar
Jim Fulton committed
655
    def store(self, oid, serial, data, version, transaction):
Jim Fulton's avatar
alpha1  
Jim Fulton committed
656 657 658
        if transaction is not self._transaction:
            raise POSException.StorageTransactionError(self, transaction)

659
        self._lock_acquire()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
660
        try:
661
            old=self._index_get(oid, 0)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
662 663 664 665
            pnv=None
            if old:
                file=self._file
                file.seek(old)
666 667
                read=file.read
                h=read(42)
Jim Fulton's avatar
Jim Fulton committed
668
                doid,oserial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
669 670
                if doid != oid: raise CorruptedDataError, h
                if vlen:
Jim Fulton's avatar
Jim Fulton committed
671
                    pnv=read(8) # non-version data pointer
672 673 674 675 676
                    read(8) # skip past version link
                    locked_version=read(vlen)
                    if version != locked_version:
                        raise POSException.VersionLockError, (
                            `oid`, locked_version)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
677

678 679 680 681 682 683 684 685
                if serial != oserial:
                    data=self.tryToResolveConflict(oid, oserial, serial, data)
                    if not data:
                        raise POSException.ConflictError, (
                            serial, oserial)
            else:
                oserial=serial
                    
Jim Fulton's avatar
alpha1  
Jim Fulton committed
686 687 688
            tfile=self._tfile
            write=tfile.write
            pos=self._pos
689
            here=pos+(tfile.tell()+self._thl)
690
            self._tindex[oid]=here
691
            newserial=self._serial
Jim Fulton's avatar
Jim Fulton committed
692
            write(pack(">8s8s8s8sH8s",
693 694
                       oid, newserial, p64(old), p64(pos),
                       len(version), p64(len(data))
Jim Fulton's avatar
Jim Fulton committed
695 696
                       )
                  )
Jim Fulton's avatar
alpha1  
Jim Fulton committed
697 698
            if version:
                if pnv: write(pnv)
Jim Fulton's avatar
Jim Fulton committed
699
                else:   write(p64(old))
Jim Fulton's avatar
alpha1  
Jim Fulton committed
700
                # Link to last record for this version:
701
                tvindex=self._tvindex
702
                pv=tvindex.get(version, 0) or self._vindex_get(version, 0)
703 704
                write(p64(pv))
                tvindex[version]=here
Jim Fulton's avatar
alpha1  
Jim Fulton committed
705
                write(version)
706

Jim Fulton's avatar
alpha1  
Jim Fulton committed
707
            write(data)
Jim Fulton's avatar
Jim Fulton committed
708

709 710
            # Check quota
            quota=self._quota
711
            if quota is not None and pos+(tfile.tell()+self._thl) > quota:
712 713 714
                raise FileStorageQuotaError, (
                    'The storage quota has been exceeded.')

715 716
            return (serial == oserial and newserial
                    or ConflictResolution.ResolvedSerial)
Jim Fulton's avatar
Jim Fulton committed
717
        
Jim Fulton's avatar
Jim Fulton committed
718 719
        finally:
            self._lock_release()
720

721
    def supportsUndo(self): return 1
Jim Fulton's avatar
alpha1  
Jim Fulton committed
722 723
    def supportsVersions(self): return 1

724
    def _clear_temp(self):
725
        self._tindex.clear()
726 727 728 729 730
        self._tvindex.clear()
        self._tfile.seek(0)

    def _begin(self, tid, u, d, e):
        self._thl=23+len(u)+len(d)+len(e)
731 732 733 734 735
        self._nextpos=0

    def tpc_vote(self, transaction):
        self._lock_acquire()
        try:
736
            if transaction is not self._transaction: return
737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786
            tfile=self._tfile
            dlen=tfile.tell()
            if not dlen: return # No data in this trans
            file=self._file
            write=file.write
            tfile.seek(0)
            id=self._serial
            user, desc, ext = self._ude
            luser=len(user)
            ldesc=len(desc)
            lext=len(ext)

            # We have to check lengths here because struct.pack
            # doesn't raise an exception on overflow!
            if luser > 65535: raise FileStorageError, 'user name too long'
            if ldesc > 65535: raise FileStorageError, 'description too long'
            if lext  > 65535: raise FileStorageError, 'too much extension data'

            tlen=self._thl
            pos=self._pos
            file.seek(pos)
            tl=tlen+dlen
            stl=p64(tl)

            try:
                # Note that we use a status of 'c', for checkpoint.
                # If this flag isn't cleared, anything after this is
                # suspect.
                write(pack(
                    ">8s" "8s" "c"  "H"        "H"        "H"
                     ,id, stl, 'c', luser,     ldesc,     lext,
                    ))
                if user: write(user)
                if desc: write(desc)
                if ext: write(ext)

                cp(tfile, file, dlen)

                write(stl)
                file.flush()
            except:
                # Hm, an error occured writing out the data. Maybe the
                # disk is full. We don't want any turd at the end.
                file.truncate(pos)
                raise
            
            self._nextpos=pos+(tl+8)
            
        finally: self._lock_release()
 
787
    def _finish(self, tid, u, d, e):
788 789 790
        nextpos=self._nextpos
        if nextpos:
            file=self._file
791

792 793 794 795
            # Clear the checkpoint flag
            file.seek(self._pos+16)
            file.write(self._tstatus)        
            file.flush()
796

797
            if fsync is not None: fsync(file.fileno())
798

799
            self._pos=nextpos
800

801
            self._index.update(self._tindex)
802
            self._vindex.update(self._tvindex)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
803

804
    def _abort(self):
805 806 807
        if self._nextpos:
            self._file.truncate(self._pos)
            self._nextpos=0
808

Jim Fulton's avatar
alpha1  
Jim Fulton committed
809
    def undo(self, transaction_id):
810
        self._lock_acquire()
811
        try:
812
            self._clear_index()
813
            transaction_id=base64.decodestring(transaction_id+'==\n')
814
            tid, tpos = transaction_id[:8], U64(transaction_id[8:])
815 816
            packt=self._packt
            if packt is None or packt > tid:
817
                raise UndoError, (
818 819
                    'Undo is currently disabled for database maintenance.<p>')

Jim Fulton's avatar
Jim Fulton committed
820 821
            file=self._file
            seek=file.seek
822
            read=file.read
823
            index_get=self._index_get
Jim Fulton's avatar
Jim Fulton committed
824 825 826 827
            unpack=struct.unpack
            seek(tpos)
            h=read(23)
            if len(h) != 23 or h[:8] != tid: 
828
                raise UndoError('Invalid undo transaction id')
Jim Fulton's avatar
Jim Fulton committed
829
            if h[16] == 'u': return
830
            if h[16] != ' ': raise UndoError
831
            tl=U64(h[8:16])
Jim Fulton's avatar
Jim Fulton committed
832 833
            ul,dl,el=unpack(">HHH", h[17:23])
            tend=tpos+tl
834
            pos=tpos+(23+ul+dl+el)
835
            t={}
Jim Fulton's avatar
Jim Fulton committed
836 837 838 839 840
            while pos < tend:
                # Read the data records for this transaction
                seek(pos)
                h=read(42)
                oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
841 842
                plen=U64(splen)
                prev=U64(sprev)
Jim Fulton's avatar
Jim Fulton committed
843
                dlen=42+(plen or 8)
844
                if vlen: dlen=dlen+(16+vlen)
845
                if index_get(oid,0) != pos: raise UndoError
Jim Fulton's avatar
Jim Fulton committed
846
                pos=pos+dlen
847
                if pos > tend: raise UndoError
848
                t[oid]=prev
Jim Fulton's avatar
Jim Fulton committed
849 850 851

            seek(tpos+16)
            file.write('u')
Jim Fulton's avatar
Jim Fulton committed
852
            file.flush()
Jim Fulton's avatar
Jim Fulton committed
853
            index=self._index
854 855 856
            for oid, pos in t.items(): index[oid]=pos
            return t.keys()            
        finally: self._lock_release()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
857

858
    def supportsTransactionalUndo(self): return 1
859

860 861
    def _undoDataInfo(self, oid, pos, tpos):
        """Return the serial, data pointer, data, and version for the oid
862
        record at pos"""
863 864 865 866 867 868 869
        if tpos:
            file=self._tfile
            pos = tpos - self._pos - self._thl
            tpos=file.tell()
        else:
            file=self._file

870 871 872 873
        read=file.read
        file.seek(pos)
        h=read(42)
        roid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
874
        if roid != oid: raise UndoError('Invalid undo transaction id')
875 876 877 878 879 880
        if vlen:
            read(16) # skip nv pointer and version previous pointer
            version=read(vlen)
        else:
            version=''

881 882 883
        plen = U64(splen)
        if plen:
            data = read(plen)
884
        else:
885 886
            data=''
            pos=U64(read(8))
887

888 889 890 891
        if tpos: file.seek(tpos) # Restore temp file to end

        return serial, pos, data, version
        
892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918
    def _getVersion(self, oid, pos):
        self._file.seek(pos)
        read=self._file.read
        h=read(42)
        doid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
        if vlen:
            h=read(16)
            return read(vlen), h[:8]
        else:
            return '',''
        
    def _getSerial(self, oid, pos):
        self._file.seek(pos+8)
        return self._file.read(8)


    def _transactionalUndoRecord(self, oid, pos, serial, pre, version):
        """Get the indo information for a data record

        Return a 5-tuple consisting of a pickle, data pointer,
        version, packed non-version data pointer, and current
        position.  If the pickle is true, then the data pointer must
        be 0, but the pickle can be empty *and* the pointer 0.

        """
        
        copy=1 # Can we just copy a data pointer
919
        tpos=self._tindex.get(oid, 0)        
920
        ipos=self._index.get(oid,0)
921 922
        tipos=tpos or ipos
        if tipos != pos:
923 924
            # Eek, a later transaction modified the data, but,
            # maybe it is pointing at the same data we are.
925
            cserial, cdataptr, cdata, cver = self._undoDataInfo(
926
                oid, ipos, tpos)
927 928
            # Versions of undone record and current record *must* match!
            if cver != version:
929
                raise UndoError('Current and undone versions differ')
930

931
            if cdataptr != pos:
932
                # We aren't sure if we are talking about the same data
933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951
                try:
                    if (
                        # The current record wrote a new pickle
                        cdataptr == tipos
                        or
                        # Backpointers are different
                        _loadBackPOS(self._file, oid, p64(pos)) !=
                        _loadBackPOS(self._file, oid, p64(cdataptr))
                        ):
                        if pre and not tpos:
                            copy=0 # we'll try to do conflict resolution
                        else:
                            # We bail if:
                            # - We don't have a previous record, which should
                            #   be impossible.
                            raise UndoError
                except KeyError:
                    # LoadBack gave us a key error. Bail.
                    raise UndoError
952 953 954 955 956 957

        version, snv = self._getVersion(oid, pre)
        if copy:
            # we can just copy our previous-record pointer forward
            return '', pre, version, snv, ipos

958 959 960 961 962 963 964
        try:
            # returns data, serial tuple
            bdata = _loadBack(self._file, oid, p64(pre))[0]
        except KeyError:
            # couldn't find oid; what's the real explanation for this?
            raise UndoError("_loadBack() failed for %s" % repr(oid))
        data=self.tryToResolveConflict(oid, cserial, serial, bdata, cdata)  
965 966 967 968

        if data:
            return data, 0, version, snv, ipos

969
        raise UndoError('Some data were modified by a later transaction')
970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000

    def transactionalUndo(self, transaction_id, transaction):
        """Undo a transaction, given by transaction_id.

        Do so by writing new data that reverses tyhe action taken by
        the transaction."""        
        # Usually, we can get by with just copying a data pointer, by
        # writing a file position rather than a pickle. Sometimes, we
        # may do conflict resolution, in which case we actually copy
        # new data that results from resolution.
        
        if transaction is not self._transaction:
            raise POSException.StorageTransactionError(self, transaction)
        
        self._lock_acquire()
        try:
            transaction_id=base64.decodestring(transaction_id+'==\n')
            tid, tpos = transaction_id[:8], U64(transaction_id[8:])

            seek=self._file.seek
            read=self._file.read
            unpack=struct.unpack
            write=self._tfile.write

            ostloc = p64(self._pos)
            newserial=self._serial
            here=self._pos+(self._tfile.tell()+self._thl)

            seek(tpos)
            h=read(23)
            if len(h) != 23 or h[:8] != tid: 
1001
                raise UndoError, 'Invalid undo transaction id'
1002 1003
            if h[16] == 'u': return
            if h[16] != ' ':
1004
                raise UndoError, 'non-undoable transaction'
1005 1006 1007 1008 1009
            tl=U64(h[8:16])
            ul,dl,el=unpack(">HHH", h[17:23])
            tend=tpos+tl
            pos=tpos+(23+ul+dl+el)
            tindex={}
1010 1011
            failures={} # keep track of failures, cause we may succeed later
            failed=failures.has_key
1012 1013 1014 1015 1016
            # Read the data records for this transaction
            while pos < tend:
                seek(pos)
                h=read(42)
                oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
1017
                if failed(oid): del failures[oid] # second chance! 
1018 1019 1020 1021 1022 1023 1024 1025 1026 1027
                plen=U64(splen)
                prev=U64(sprev)
                if vlen:
                    dlen=58+vlen+(plen or 8)
                    read(16)
                    version=read(vlen)
                else:
                    dlen=42+(plen or 8)
                    version=''

1028 1029 1030 1031 1032 1033
                try:
                    p, prev, v, snv, ipos = self._transactionalUndoRecord(
                        oid, pos, serial, prev, version)
                except UndoError, v:
                    # Don't fail right away. We may be redeemed later!
                    failures[oid]=v
1034
                else:
1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045
                    plen=len(p)                
                    write(pack(">8s8s8s8sH8s",
                               oid, newserial, p64(ipos), ostloc,
                               len(v), p64(plen)))
                    if v:
                        vprev=self._tvindex.get(v, 0) or self._vindex.get(v, 0)
                        write(snv+p64(vprev)+v)
                        self._tvindex[v]=here
                        odlen = 58+len(v)+(plen or 8)
                    else:
                        odlen = 42+(plen or 8)
1046

1047 1048 1049 1050
                    if p: write(p)
                    else: write(p64(prev))
                    tindex[oid]=here
                    here=here+odlen
1051 1052 1053

                pos=pos+dlen
                if pos > tend:
1054
                    raise UndoError, 'non-undoable transaction'
1055

1056 1057
            if failures: raise UndoError(failures)
            self._tindex.update(tindex)
1058 1059 1060 1061 1062 1063
            return tindex.keys()            

        finally: self._lock_release()

    def undoLog(self, first=0, last=-20, filter=None):
        if last < 0: last=first-last+1
1064
        self._lock_acquire()
1065
        try:
1066 1067
            packt=self._packt
            if packt is None:
1068
                raise UndoError(
1069
                    'Undo is currently disabled for database maintenance.<p>')
Jim Fulton's avatar
Jim Fulton committed
1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082
            pos=self._pos
            if pos < 39: return []
            file=self._file
            seek=file.seek
            read=file.read
            unpack=struct.unpack
            strip=string.strip
            encode=base64.encodestring
            r=[]
            append=r.append
            i=0
            while i < last and pos > 39:
                seek(pos-8)
1083
                pos=pos-U64(read(8))-8
Jim Fulton's avatar
Jim Fulton committed
1084 1085 1086
                seek(pos)
                h=read(23)
                tid, tl, status, ul, dl, el = unpack(">8s8scHHH", h)
1087
                if tid < packt: break
1088
                if status != ' ': continue
Jim Fulton's avatar
Jim Fulton committed
1089 1090 1091 1092 1093 1094 1095 1096 1097 1098
                u=ul and read(ul) or ''
                d=dl and read(dl) or ''
                d={'id': encode(tid+p64(pos))[:22],
                   'time': TimeStamp(tid).timeTime(),
                   'user_name': u, 'description': d}
                if el:
                    try: 
                        e=loads(read(el))
                        d.update(e)
                    except: pass
1099
                if filter is None or filter(d):
1100
                    if i >= first: append(d)
1101
                    i=i+1
Jim Fulton's avatar
Jim Fulton committed
1102 1103
                
            return r
1104
        finally: self._lock_release()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1105 1106

    def versionEmpty(self, version):
1107 1108 1109 1110 1111 1112 1113
        if not version:
            # The interface is silent on this case. I think that this should
            # be an error, but Barry thinks this should return 1 if we have
            # any non-version data. This would be excruciatingly painful to
            # test, so I must be right. ;)
            raise POSException.VersionError(
                'The version must be an non-empty string')
1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131
        self._lock_acquire()
        try:
            index=self._index
            file=self._file
            seek=file.seek
            read=file.read
            srcpos=self._vindex_get(version, 0)
            t=tstatus=None
            while srcpos:
                seek(srcpos)
                oid=read(8)
                if index[oid]==srcpos: return 0
                h=read(50) # serial, prev(oid), tloc, vlen, plen, pnv, pv
                tloc=h[16:24]
                if t != tloc:
                    # We haven't checked this transaction before,
                    # get it's status.
                    t=tloc
1132
                    seek(U64(t)+16)
1133 1134 1135 1136 1137
                    tstatus=read(1)

                if tstatus != 'u': return 1

                spos=h[-8:]
1138
                srcpos=U64(spos)
1139 1140 1141

            return 1
        finally: self._lock_release()
Jim Fulton's avatar
Jim Fulton committed
1142

1143
    def versions(self, max=None):
1144 1145
        r=[]
        a=r.append
1146 1147 1148
        keys=self._vindex.keys()
        if max is not None: keys=keys[:max]
        for version in keys:
1149 1150 1151 1152 1153 1154
            if self.versionEmpty(version): continue
            a(version)
            if max and len(r) >= max: return r

        return r

1155
    def history(self, oid, version=None, size=1, filter=None):
1156 1157 1158 1159 1160 1161 1162 1163
        self._lock_acquire()
        try:
            r=[]
            file=self._file
            seek=file.seek
            read=file.read
            pos=self._index[oid]
            wantver=version
1164

1165
            while 1:
1166
                if len(r) >= size: return r
1167 1168 1169
                seek(pos)
                h=read(42)
                doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
1170
                prev=U64(prev)
1171

1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185
                if vlen:
                    nv = read(8) != z64
                    file.seek(8,1) # Skip previous version record pointer
                    version=read(vlen)
                    if wantver is not None and version != wantver:
                        if prev:
                            pos=prev
                            continue
                        else:
                            return r
                else:
                    version=''
                    wantver=None

1186
                seek(U64(tloc))
1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198
                h=read(23)
                tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
                user_name=read(ul)
                description=read(dl)
                if el: d=loads(read(el))
                else: d={}

                d['time']=TimeStamp(serial).timeTime()
                d['user_name']=user_name
                d['description']=description
                d['serial']=serial
                d['version']=version
1199
                d['size']=U64(plen)
1200 1201 1202 1203 1204 1205 1206

                if filter is None or filter(d):
                    r.append(d)

                if prev: pos=prev
                else: return r
        finally: self._lock_release()
1207

1208 1209
    def _redundant_pack(self, file, pos):
        file.seek(pos-8)
1210
        p=U64(file.read(8))
1211
        file.seek(pos-p+8)
1212
        return file.read(1) not in ' u'
1213

1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229
    def pack(self, t, referencesf):
        """Copy data from the current database file to a packed file
    
        Non-current records from transactions with time-stamp strings less
        than packtss are ommitted. As are all undone records.
    
        Also, data back pointers that point before packtss are resolved and
        the associated data are copied, since the old records are not copied.
        """

        # Ugh, this seems long
        
        packing=1 # are we in the packing phase (or the copy phase)
        locked=0
        _lock_acquire=self._lock_acquire
        _lock_release=self._lock_release
Jim Fulton's avatar
Jim Fulton committed
1230 1231
        _commit_lock_acquire=self._commit_lock_acquire
        _commit_lock_release=self._commit_lock_release
1232 1233
        index, vindex, tindex, tvindex = self._newIndexes()
        name=self.__name__
Jim Fulton's avatar
Jim Fulton committed
1234
        file=open(name, 'rb')
1235
        stop=`apply(TimeStamp, time.gmtime(t)[:5]+(t%60,))`
1236
        if stop==z64: raise FileStorageError, 'Invalid pack time'
1237

1238 1239
        # Record pack time so we don't undo while packing
        _lock_acquire()
1240
        try:
1241
            if self._packt != z64:
1242
                # Already packing.
1243
                raise FileStorageError, 'Already packing'
1244
            self._packt=stop
1245
        finally:
1246
            _lock_release()
1247 1248 1249 1250 1251 1252

        try:
            ##################################################################
            # Step 1, get index as of pack time that
            # includes only referenced objects.

1253
            packpos, maxoid, ltid = read_index(
Jim Fulton's avatar
Jim Fulton committed
1254 1255 1256
                file, name, index, vindex, tindex, stop,
                read_only=1,
                )
1257 1258 1259

            if self._redundant_pack(file, packpos):
                raise FileStorageError, (
1260 1261
                    'The database has already been packed to a later time\n'
                    'or no changes have been made since the last pack')
1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273
    
            rootl=[z64]
            pop=rootl.pop
            pindex={}
            referenced=pindex.has_key
            _load=self._load
            _loada=self._loada
            v=None
            while rootl:
                oid=pop()
                if referenced(oid): continue
                try:
1274
                    p, v, nv = _loada(oid, index, file)
1275
                    referencesf(p, rootl)
1276
                    if nv:
1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308
                        p, serial = _load(oid, '', index, file)
                        referencesf(p, rootl)
    
                    pindex[oid]=index[oid]
                except:
                    pindex[oid]=0
                    error('Bad reference to %s', `(oid,v)`)
    
            spackpos=p64(packpos)
    
            ##################################################################
            # Step 2, copy data and compute new index based on new positions.
            index, vindex, tindex, tvindex = self._newIndexes()
    
            ofile=open(name+'.pack', 'w+b')
    
            # Index for non-version data.  This is a temporary structure
            # to reduce I/O during packing
            nvindex={}
    
            # Cache a bunch of methods
            seek=file.seek
            read=file.read
            oseek=ofile.seek
            write=ofile.write
    
            index_get=index.get
            vindex_get=vindex.get
            pindex_get=pindex.get
    
            # Initialize, 
            pv=z64
1309 1310
            offset=0L  # the amount of space freed by packing
            pos=opos=4L
1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338
            oseek(0)
            write(packed_version)

            # Copy the data in two stages.  In the packing stage,
            # we skip records that are non-current or that are for
            # unreferenced objects. We also skip undone transactions.
            #
            # After the packing stage, we copy everything but undone
            # transactions, however, we have to update various back pointers.
            # We have to have the storage lock in the second phase to keep
            # data from being changed while we're copying.
            pnv=None
            while 1:

                # Check for end of packed records
                if packing and pos >= packpos:
                    # OK, we're done with the old stuff, now we have
                    # to get the lock so we can copy the new stuff!
                    offset=pos-opos
                    if offset <= 0:
                        # we didn't free any space, there's no point in
                        # continuing
                        ofile.close()
                        file.close()
                        os.remove(name+'.pack')
                        return
                    
                    packing=0
Jim Fulton's avatar
Jim Fulton committed
1339
                    _commit_lock_acquire()
1340 1341 1342 1343 1344 1345 1346 1347 1348
                    _lock_acquire()
                    locked=1
                    self._packt=None # Prevent undo until we're done

                # Read the transaction record
                seek(pos)
                h=read(23)
                if len(h) < 23: break
                tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
1349 1350 1351
                if status=='c':
                    # Oops. we found a checkpoint flag.
                    break
1352
                tl=U64(stl)
1353 1354 1355 1356
                tpos=pos
                tend=tpos+tl

                if status=='u':
1357 1358 1359 1360 1361 1362 1363 1364 1365 1366
                    if not packing:
                        # We rely below on a constant offset for unpacked
                        # records. This assumption holds only if we copy
                        # undone unpacked data. This is lame, but necessary
                        # for now to squash a bug.
                        write(h)
                        tl=tl+8
                        write(read(tl-23))
                        opos=opos+tl
                        
1367 1368 1369 1370 1371 1372 1373
                    # Undone transaction, skip it
                    pos=tend+8
                    continue

                otpos=opos # start pos of output trans

                # write out the transaction record
1374 1375
                status=packing and 'p' or ' '
                write(h[:16]+status+h[17:])
1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391
                thl=ul+dl+el
                h=read(thl)
                if len(h) != thl:
                    raise 'Pack Error', opos
                write(h)
                thl=23+thl
                pos=tpos+thl
                opos=otpos+thl

                while pos < tend:
                    # Read the data records for this transaction

                    seek(pos)
                    h=read(42)
                    oid,serial,sprev,stloc,vlen,splen = unpack(
                        ">8s8s8s8sH8s", h)
1392
                    plen=U64(splen)
1393 1394 1395
                    dlen=42+(plen or 8)

                    if vlen:
1396
                        dlen=dlen+(16+vlen)
1397 1398 1399 1400 1401 1402
                        if packing and pindex_get(oid,0) != pos:
                            # This is not the most current record, or
                            # the oid is no longer referenced so skip it.
                            pos=pos+dlen
                            continue

1403
                        pnv=U64(read(8))
1404
                        # skip position of previous version record
1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439
                        seek(8,1)
                        version=read(vlen)
                        pv=p64(vindex_get(version, 0))
                        vindex[version]=opos
                    else:
                        if packing:
                            ppos=pindex_get(oid, 0)
                            if ppos != pos:
                                
                                if not ppos:
                                    # This object is no longer referenced
                                    # so skip it.
                                    pos=pos+dlen
                                    continue
                                
                                # This is not the most current record
                                # But maybe it's the most current committed
                                # record.
                                seek(ppos)
                                ph=read(42)
                                pdoid,ps,pp,pt,pvlen,pplen = unpack(
                                    ">8s8s8s8sH8s", ph)
                                if not pvlen:
                                    # The most current record is committed, so
                                    # we can toss this one
                                    pos=pos+dlen
                                    continue
                                pnv=read(8)
                                pnv=_loadBackPOS(file, oid, pnv)
                                if pnv > pos:
                                    # The current non version data is later,
                                    # so this isn't the current record
                                    pos=pos+dlen
                                    continue

1440 1441 1442 1443 1444 1445 1446 1447
                                # Ok, we've gotten this far, so we have
                                # the current record and we're ready to
                                # read the pickle, but we're in the wrong
                                # place, after wandering around to figure
                                # out is we were current. Seek back
                                # to pickle data:
                                seek(pos+42)

1448 1449
                            nvindex[oid]=opos

1450
                    tindex[oid]=opos
1451 1452 1453
                    
                    opos=opos+dlen
                    pos=pos+dlen
1454

1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465
                    if plen:
                        p=read(plen)
                    else:
                        p=read(8)
                        if packing:
                            # When packing we resolve back pointers!
                            p, serial = _loadBack(file, oid, p)
                            plen=len(p)
                            opos=opos+plen-8
                            splen=p64(plen)
                        else:
1466
                            p=U64(p)
1467 1468 1469 1470 1471 1472 1473
                            if p < packpos:
                                # We have a backpointer to a
                                # non-packed record. We have to be
                                # careful.  If we were pointing to a
                                # current record, then we should still
                                # point at one, otherwise, we should
                                # point at the last non-version record.
1474 1475 1476 1477 1478 1479 1480 1481
                                ppos=pindex_get(oid,0)
                                if ppos:
                                    if ppos==p:
                                        # we were pointing to the
                                        # current record
                                        p=index[oid]
                                    else:
                                        p=nvindex[oid]
1482
                                else:
1483 1484 1485 1486 1487
                                    # Oops, this object was modified
                                    # in a version in which it was deleted.
                                    # Hee hee. It doesn't matter what we
                                    # use cause it's not reachable any more.
                                    p=0
1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521
                            else:
                                # This points back to a non-packed record.
                                # Just adjust for the offset
                                p=p-offset
                            p=p64(p)
                            
                    sprev=p64(index_get(oid,0))
                    write(pack(">8s8s8s8sH8s",
                               oid,serial,sprev,p64(otpos),vlen,splen))
                    if vlen:
                        if not pnv:
                            write(z64)
                        else:
                            if pnv < packpos:
                                # we need to point to the packed
                                # non-version rec
                                pnv=nvindex[oid]
                            else:
                                # we just need to adjust the pointer
                                # with the offset
                                pnv=pnv-offset
                                
                            write(p64(pnv))
                        write(pv)
                        write(version)

                    write(p)

                # skip the (intentionally redundant) transaction length
                pos=pos+8

                if locked:
                    # temporarily release the lock to give other threads
                    # a chance to do some work!
Jim Fulton's avatar
Jim Fulton committed
1522
                    _commit_lock_release()
1523 1524 1525
                    _lock_release()
                    locked=0

1526 1527
                index.update(tindex) # Record the position
                tindex.clear()
1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554

                # Now, maybe we need to hack or delete the transaction
                otl=opos-otpos
                if otl != tl:
                    # Oops, what came out is not what came in!

                    # Check for empty:
                    if otl==thl:
                        # Empty, slide back over the header:
                        opos=otpos
                        oseek(opos)
                    else:
                        # Not empty, but we need to adjust transaction length
                        # and update the status
                        oseek(otpos+8)
                        otl=p64(otl)
                        write(otl+status)
                        oseek(opos)
                        write(otl)
                        opos=opos+8

                else:
                    write(p64(otl))
                    opos=opos+8


                if not packing:
Jim Fulton's avatar
Jim Fulton committed
1555 1556 1557
                    # We are in the copying phase. We need to get the lock
                    # again to avoid someone writing data while we read it.
                    _commit_lock_acquire()
1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571
                    _lock_acquire()
                    locked=1


            # OK, we've copied everything. Now we need to wrap things
            # up.

            # Hack the files around.
            name=self.__name__

            ofile.flush()
            ofile.close()
            file.close()
            self._file.close()
1572 1573 1574 1575
            try:
                if os.path.exists(name+'.old'):
                    os.remove(name+'.old')
                os.rename(name, name+'.old')
1576 1577 1578 1579 1580 1581 1582 1583 1584 1585
            except:
                # Waaa
                self._file=open(name,'r+b')
                raise

            # OK, we're beyond the point of no return
            os.rename(name+'.pack', name)
            self._file=open(name,'r+b')
            self._initIndex(index, vindex, tindex, tvindex)
            self._pos=opos
1586
            self._save_index()
1587 1588 1589

        finally:

Jim Fulton's avatar
Jim Fulton committed
1590 1591 1592
            if locked:
                _commit_lock_release()
                _lock_release()
1593 1594 1595 1596

            _lock_acquire()
            self._packt=z64
            _lock_release()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1597

1598 1599 1600
    def iterator(self): return FileIterator(self._file_name)
        

Jim Fulton's avatar
Jim Fulton committed
1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638
def shift_transactions_forward(index, vindex, tindex, file, pos, opos):
    """Copy transactions forward in the data file

    This might be done as part of a recovery effort
    """

    # Cache a bunch of methods
    seek=file.seek
    read=file.read
    write=file.write

    index_get=index.get
    vindex_get=vindex.get

    # Initialize, 
    pv=z64
    p1=opos
    p2=pos
    offset=p2-p1
    packpos=opos

    # Copy the data in two stages.  In the packing stage,
    # we skip records that are non-current or that are for
    # unreferenced objects. We also skip undone transactions.
    #
    # After the packing stage, we copy everything but undone
    # transactions, however, we have to update various back pointers.
    # We have to have the storage lock in the second phase to keep
    # data from being changed while we're copying.
    pnv=None
    while 1:

        # Read the transaction record
        seek(pos)
        h=read(23)
        if len(h) < 23: break
        tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
        if status=='c': break # Oops. we found a checkpoint flag.            
1639
        tl=U64(stl)
Jim Fulton's avatar
Jim Fulton committed
1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662
        tpos=pos
        tend=tpos+tl

        otpos=opos # start pos of output trans

        thl=ul+dl+el
        h2=read(thl)
        if len(h2) != thl: raise 'Pack Error', opos

        # write out the transaction record
        seek(opos)
        write(h)
        write(h2)

        thl=23+thl
        pos=tpos+thl
        opos=otpos+thl

        while pos < tend:
            # Read the data records for this transaction
            seek(pos)
            h=read(42)
            oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
1663
            plen=U64(splen)
Jim Fulton's avatar
Jim Fulton committed
1664 1665 1666
            dlen=42+(plen or 8)

            if vlen:
1667 1668
                dlen=dlen+(16+vlen)
                pnv=U64(read(8))
Jim Fulton's avatar
Jim Fulton committed
1669 1670 1671 1672 1673 1674
                # skip position of previous version record
                seek(8,1)
                version=read(vlen)
                pv=p64(vindex_get(version, 0))
                if status != 'u': vindex[version]=opos

1675
            tindex[oid]=opos
Jim Fulton's avatar
Jim Fulton committed
1676 1677 1678 1679

            if plen: p=read(plen)
            else:
                p=read(8)
1680
                p=U64(p)
Jim Fulton's avatar
Jim Fulton committed
1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711
                if p >= p2: p=p-offset
                elif p >= p1:
                    # Ick, we're in trouble. Let's bail
                    # to the index and hope for the best
                    p=index_get(oid,0)
                p=p64(p)

            # WRITE
            seek(opos)
            sprev=p64(index_get(oid,0))
            write(pack(">8s8s8s8sH8s",
                       oid,serial,sprev,p64(otpos),vlen,splen))
            if vlen:
                if not pnv: write(z64)
                else:
                    if pnv >= p2: pnv=pnv-offset
                    elif pnv >= p1:
                        pnv=index_get(oid,0)
                        
                    write(p64(pnv))
                write(pv)
                write(version)

            write(p)
            
            opos=opos+dlen
            pos=pos+dlen

        # skip the (intentionally redundant) transaction length
        pos=pos+8

1712 1713
        if status != 'u':
            index.update(tindex) # Record the position
Jim Fulton's avatar
Jim Fulton committed
1714

1715
        tindex.clear()
Jim Fulton's avatar
Jim Fulton committed
1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728

        write(stl)
        opos=opos+8

    return opos

def search_back(file, pos):
    seek=file.seek
    read=file.read
    seek(0,2)
    s=p=file.tell()
    while p > pos:
        seek(p-8)
1729
        l=U64(read(8))
Jim Fulton's avatar
Jim Fulton committed
1730 1731 1732 1733 1734 1735 1736 1737 1738
        if l <= 0: break
        p=p-l-8

    return p, s

def recover(file_name):
    file=open(file_name, 'r+b')
    index={}
    vindex={}
1739
    tindex={}
Jim Fulton's avatar
Jim Fulton committed
1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760
    
    pos, oid, tid = read_index(
        file, file_name, index, vindex, tindex, recover=1)
    if oid is not None:
        print "Nothing to recover"
        return

    opos=pos
    pos, sz = search_back(file, pos)
    if pos < sz:
        npos = shift_transactions_forward(
            index, vindex, tindex, file, pos, opos,
            )

    file.truncate(npos)

    print "Recovered file, lost %s, ended up with %s bytes" % (
        pos-opos, npos)

    

1761
def read_index(file, name, index, vindex, tindex, stop='\377'*8,
Jim Fulton's avatar
Jim Fulton committed
1762
               ltid=z64, start=4L, maxoid=z64, recover=0, read_only=0):
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1763 1764 1765 1766 1767
    
    read=file.read
    seek=file.seek
    seek(0,2)
    file_size=file.tell()
1768

Jim Fulton's avatar
alpha1  
Jim Fulton committed
1769
    if file_size:
1770 1771 1772 1773
        if file_size < start: raise FileStorageFormatError, file.name
        seek(0)
        if read(4) != packed_version: raise FileStorageFormatError, name
    else:
Jim Fulton's avatar
Jim Fulton committed
1774
        if not read_only: file.write(packed_version)
1775
        return 4L, maxoid, ltid
1776 1777 1778

    index_get=index.get
    vndexpos=vindex.get
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1779

1780 1781
    pos=start
    seek(start)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1782
    unpack=struct.unpack
Jim Fulton's avatar
Jim Fulton committed
1783
    tid='\0'*7+'\1'
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1784 1785 1786

    while 1:
        # Read the transaction record
1787
        h=read(23)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1788
        if not h: break
1789
        if len(h) != 23:
Jim Fulton's avatar
Jim Fulton committed
1790 1791 1792 1793
            if not read_only:
                warn('%s truncated at %s', name, pos)
                seek(pos)
                file.truncate()
Jim Fulton's avatar
Jim Fulton committed
1794 1795
            break

1796
        tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
Jim Fulton's avatar
Jim Fulton committed
1797 1798 1799
        if el < 0: el=t32-el

        if tid <= ltid:
1800
            warn("%s time-stamp reduction at %s", name, pos)
Jim Fulton's avatar
Jim Fulton committed
1801 1802
        ltid=tid

1803
        tl=U64(stl)
Jim Fulton's avatar
Jim Fulton committed
1804

1805
        if pos+(tl+8) > file_size or status=='c':
1806 1807
            # Hm, the data were truncated or the checkpoint flag wasn't
            # cleared.  They may also be corrupted,
Jim Fulton's avatar
Jim Fulton committed
1808
            # in which case, we don't want to totally lose the data.
Jim Fulton's avatar
Jim Fulton committed
1809 1810 1811 1812
            if not read_only:
                warn("%s truncated, possibly due to damaged records at %s",
                     name, pos)
                _truncate(file, name, pos)
Jim Fulton's avatar
Jim Fulton committed
1813 1814 1815
            break

        if status not in ' up':
1816
            warn('%s has invalid status, %s, at %s', name, status, pos)
Jim Fulton's avatar
Jim Fulton committed
1817

1818
        if tl < (23+ul+dl+el):
1819 1820 1821 1822 1823 1824
            # We're in trouble. Find out if this is bad data in the
            # middle of the file, or just a turd that Win 9x dropped
            # at the end when the system crashed.
            # Skip to the end and read what should be the transaction length
            # of the last transaction.
            seek(-8, 2)
1825
            rtl=U64(read(8))
1826 1827 1828 1829
            # Now check to see if the redundant transaction length is
            # reasonable:
            if file_size - rtl < pos or rtl < 23:
                nearPanic('%s has invalid transaction header at %s', name, pos)
Jim Fulton's avatar
Jim Fulton committed
1830 1831 1832 1833 1834 1835
                if not read_only:
                    warn("It appears that there is invalid data at the end of "
                         "the file, possibly due to a system crash.  %s "
                         "truncated to recover from bad data at end."
                         % name)
                    _truncate(file, name, pos)
1836 1837
                break
            else:
1838
                if recover: return pos, None, None
1839
                panic('%s has invalid transaction header at %s', name, pos)
Jim Fulton's avatar
Jim Fulton committed
1840

Jim Fulton's avatar
alpha1  
Jim Fulton committed
1841
        if tid >= stop: break
Jim Fulton's avatar
Jim Fulton committed
1842

Jim Fulton's avatar
alpha1  
Jim Fulton committed
1843 1844 1845 1846 1847
        tpos=pos
        tend=tpos+tl
        
        if status=='u':
            # Undone transaction, skip it
1848
            seek(tend)
Jim Fulton's avatar
Jim Fulton committed
1849 1850
            h=read(8)
            if h != stl:
Jim Fulton's avatar
Jim Fulton committed
1851
                if recover: return tpos, None, None
1852
                panic('%s has inconsistent transaction length at %s',
Jim Fulton's avatar
Jim Fulton committed
1853
                      name, pos)
1854
            pos=tend+8
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1855 1856
            continue

1857
        pos=tpos+(23+ul+dl+el)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1858 1859 1860 1861
        while pos < tend:
            # Read the data records for this transaction

            seek(pos)
Jim Fulton's avatar
Jim Fulton committed
1862 1863
            h=read(42)
            oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
1864 1865 1866
            prev=U64(sprev)
            tloc=U64(stloc)
            plen=U64(splen)
Jim Fulton's avatar
Jim Fulton committed
1867
            
1868
            dlen=42+(plen or 8)
1869
            tindex[oid]=pos
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1870 1871
            
            if vlen:
1872
                dlen=dlen+(16+vlen)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1873
                seek(8,1)
1874
                pv=U64(read(8))
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1875
                version=read(vlen)
1876 1877
                # Jim says: "It's just not worth the bother."
                #if vndexpos(version, 0) != pv:
1878
                #    panic("%s incorrect previous version pointer at %s",
1879
                #          name, pos)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1880
                vindex[version]=pos
Jim Fulton's avatar
Jim Fulton committed
1881 1882

            if pos+dlen > tend or tloc != tpos:
Jim Fulton's avatar
Jim Fulton committed
1883
                if recover: return tpos, None, None
1884
                panic("%s data record exceeds transaction record at %s",
Jim Fulton's avatar
Jim Fulton committed
1885
                      name, pos)
Jim Fulton's avatar
Jim Fulton committed
1886
                
1887
            if index_get(oid,0) != prev:
1888
                if prev:
Jim Fulton's avatar
Jim Fulton committed
1889
                    if recover: return tpos, None, None
Jim Fulton's avatar
Jim Fulton committed
1890
                    error("%s incorrect previous pointer at %s", name, pos)
1891 1892
                else:
                    warn("%s incorrect previous pointer at %s", name, pos)
Jim Fulton's avatar
Jim Fulton committed
1893

Jim Fulton's avatar
alpha1  
Jim Fulton committed
1894 1895
            pos=pos+dlen

Jim Fulton's avatar
Jim Fulton committed
1896
        if pos != tend:
Jim Fulton's avatar
Jim Fulton committed
1897
            if recover: return tpos, None, None
1898
            panic("%s data records don't add up at %s",name,tpos)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1899 1900 1901

        # Read the (intentionally redundant) transaction length
        seek(pos)
Jim Fulton's avatar
Jim Fulton committed
1902 1903
        h=read(8)
        if h != stl:
Jim Fulton's avatar
Jim Fulton committed
1904
            if recover: return tpos, None, None
1905
            panic("%s redundant transaction length check failed at %s",
Jim Fulton's avatar
Jim Fulton committed
1906 1907
                  name, pos)
        pos=pos+8
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1908
        
1909
        for oid, p in tindex.items():
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1910 1911 1912
            maxoid=max(maxoid,oid)
            index[oid]=p # Record the position

1913
        tindex.clear()
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1914

1915
    return pos, maxoid, ltid
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1916 1917 1918


def _loadBack(file, oid, back):
1919 1920 1921
    seek=file.seek
    read=file.read
    
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1922
    while 1:
1923
        old=U64(back)
Jim Fulton's avatar
alpha1  
Jim Fulton committed
1924
        if not old: raise KeyError, oid
1925 1926
        seek(old)
        h=read(42)
Jim Fulton's avatar
Jim Fulton committed
1927 1928
        doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)

1929
        if vlen: seek(vlen+16,1)
1930
        if plen != z64: return read(U64(plen)), serial
1931
        back=read(8) # We got a back pointer!
1932

1933
def _loadBackPOS(file, oid, back):
1934 1935
    """Return the position of the record containing the data used by
    the record at the given position (back)."""
1936 1937 1938 1939
    seek=file.seek
    read=file.read
    
    while 1:
1940
        old=U64(back)
1941 1942 1943 1944 1945 1946 1947
        if not old: raise KeyError, oid
        seek(old)
        h=read(42)
        doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
        if vlen: seek(vlen+16,1)
        if plen != z64: return old
        back=read(8) # We got a back pointer!
1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972

def _truncate(file, name, pos):
    seek=file.seek
    seek(0,2)
    file_size=file.tell()
    try:
        i=0
        while 1:
            oname='%s.tr%s' % (name, i)
            if os.path.exists(oname):
                i=i+1
            else:
                warn("Writing truncated data from %s to %s", name, oname)
                o=open(oname,'wb')
                seek(pos)
                cp(file, o, file_size-pos)
                o.close()
                break
    except:
        error("couldn\'t write truncated data for %s", name)
        raise POSException.StorageSystemError, (
            "Couldn't save truncated data")
            
    seek(pos)
    file.truncate()
1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995

class Iterator:
    """A General simple iterator that uses the Python for-loop index protocol
    """
    __index=-1
    __current=None

    def __getitem__(self, i):
        __index=self.__index
        while i > __index:
            __index=__index+1
            self.__current=self.next(__index)

        self.__index=__index
        return self.__current


class FileIterator(Iterator):
    """Iterate over the transactions in a FileStorage file.
    """
    _ltid=z64
    
    def __init__(self, file):
1996
        if type(file) is type(''): file=open(file, 'rb')
1997 1998 1999 2000
        self._file=file
        if file.read(4) != packed_version: raise FileStorageFormatError, name
        file.seek(0,2)
        self._file_size=file.tell()
2001
        self._pos=4L
2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021

    def next(self, index=0):
        file=self._file
        seek=file.seek
        read=file.read
        pos=self._pos

        while 1:
            # Read the transaction record
            seek(pos)
            h=read(23)
            if len(h) < 23: break

            tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
            if el < 0: el=t32-el

            if tid <= self._ltid:
                warn("%s time-stamp reduction at %s", name, pos)
            self._ltid=tid

2022
            tl=U64(stl)
2023

2024
            if pos+(tl+8) > self._file_size or status=='c':
2025 2026 2027 2028 2029 2030 2031 2032 2033 2034
                # Hm, the data were truncated or the checkpoint flag wasn't
                # cleared.  They may also be corrupted,
                # in which case, we don't want to totally lose the data.
                warn("%s truncated, possibly due to damaged records at %s",
                     name, pos)
                break

            if status not in ' up':
                warn('%s has invalid status, %s, at %s', name, status, pos)

2035
            if tl < (23+ul+dl+el):
2036 2037 2038 2039 2040
                # We're in trouble. Find out if this is bad data in
                # the middle of the file, or just a turd that Win 9x
                # dropped at the end when the system crashed.  Skip to
                # the end and read what should be the transaction
                # length of the last transaction.
2041
                seek(-8, 2)
2042
                rtl=U64(read(8))
2043 2044 2045 2046 2047
                # Now check to see if the redundant transaction length is
                # reasonable:
                if self._file_size - rtl < pos or rtl < 23:
                    nearPanic('%s has invalid transaction header at %s',
                              name, pos)
2048 2049 2050
                    warn("It appears that there is invalid data at the end of "
                         "the file, possibly due to a system crash.  %s "
                         "truncated to recover from bad data at end."
2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071
                         % name)
                    break
                else:
                    warn('%s has invalid transaction header at %s', name, pos)
                    break

            # if tid >= stop: raise IndexError, index

            tpos=pos
            tend=tpos+tl

            if status=='u':
                # Undone transaction, skip it
                seek(tend)
                h=read(8)
                if h != stl:
                    panic('%s has inconsistent transaction length at %s',
                          name, pos)
                pos=tend+8
                continue

2072
            pos=tpos+(23+ul+dl+el)
2073 2074 2075
            user=read(ul)
            description=read(dl)
            if el:
2076 2077 2078 2079
                try: e=loads(read(el))
                except: e={}
            else: e={}

2080
            result=RecordIterator(
2081
                tid, status, user, description, e,
2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104
                pos, (tend, file, seek, read,
                      tpos,
                      )
                )

            pos=tend

            # Read the (intentionally redundant) transaction length
            seek(pos)
            h=read(8)
            if h != stl:
                warn("%s redundant transaction length check failed at %s",
                     name, pos)
                break
            self._pos=pos+8

            return result

        raise IndexError, index
    
class RecordIterator(Iterator):
    """Iterate over the transactions in a FileStorage file.
    """
2105
    def __init__(self, tid, status, user, desc, ext, pos, stuff):
2106
        self.tid=tid
2107
        self.status=status
2108 2109 2110 2111 2112 2113 2114 2115 2116
        self.user=user
        self.description=desc
        self._extension=ext
        self._pos=pos
        self._stuff = stuff

    def next(self, index=0):
        name=''
        pos = self._pos
2117
        tend, file, seek, read, tpos = self._stuff
2118 2119 2120 2121 2122 2123
        while pos < tend:
            # Read the data records for this transaction

            seek(pos)
            h=read(42)
            oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
2124 2125 2126
            prev=U64(sprev)
            tloc=U64(stloc)
            plen=U64(splen)
2127 2128 2129 2130

            dlen=42+(plen or 8)

            if vlen:
2131
                dlen=dlen+(16+vlen)
2132
                seek(8,1)
2133
                pv=U64(read(8))
2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148
                version=read(vlen)
            else:
                version=''

            if pos+dlen > tend or tloc != tpos:
                warn("%s data record exceeds transaction record at %s",
                     name, pos)
                break

            self._pos=pos+dlen
            if plen: p=read(plen)
            else:
                p=read(8)
                p=_loadBack(file, oid, p)[0]
                
2149
            r=Record(oid, serial, version, p)
2150 2151 2152 2153
            
            return r
        
        raise IndexError, index
2154
    
2155 2156 2157 2158 2159

class Record:
    """An abstract database record
    """
    def __init__(self, *args):
2160
        self.oid, self.serial, self.version, self.data = args