Commit b04cd609 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 517db5de
......@@ -892,8 +892,74 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error)
return data, tid, nil
}
// --- raw iteration ---
// TxnIter is iterator over transaction records
type TxnIter struct {
fsSeq *xbufio.SeqReaderAt
Txnh TxnHeader // current transaction record information
Flags iterFlags // XXX iterate forward (> 0) / backward (< 0) / EOF reached (== 0)
}
// DataIter is iterator over data records inside one transaction
type DataIter struct {
fsSeq *xbufio.SeqReaderAt
Txnh *TxnHeader // header of transaction we are iterating inside
Datah DataHeader // current data record information
}
// Iter is combined 2-level iterator over transaction and data records
type Iter struct {
TxnIter
DataIter
}
// NextTxn iterates to next/previous transaction record according to iteration direction
func (ti *TxnIter) NextTxn(flags TxnLoadFlags) error {
var err error
if ti.Flags & iterDir != 0 {
err = ti.Txnh.LoadNext(ti.fsSeq, flags)
} else {
err = ti.Txnh.LoadPrev(ti.fsSeq, flags)
}
//fmt.Println("loaded:", ti.Txnh.Tid)
return err
}
// NextData iterates to next data record header
func (di *DataIter) NextData() error {
return di.Datah.LoadNext(di.fsSeq, di.Txnh)
}
// NextTxn iterates to next transaction record and resets data iterator to iterate inside it
func (iter *Iter) NextTxn() error {
err := iter.TxnIter.NextTxn()
if err != nil {
return err
}
// set .DataIter to iterate over .TxnIter.Txnh
iter.DataIter.Datah.Pos = fsi.txnIter.Txnh.DataPos()
iter.DataIter.Datah.DataLen = -DataHeaderSize // first iteration will go to first data record
}
// IterateRaw ... XXX
func (fs *FileStorage) IterateRaw(dir/*XXX fwd/back*/) *Iter {
// when iterating use IO optimized for sequential access
fsSeq := xbufio.NewSeqReaderAt(fs.file)
// XXX setup .TxnIter.dir and start
iter.TxnIter.fsSeq = fsSeq
iter.DataIter.fsSeq = fsSeq
iter.DataIter.Txnh = &iter.txnIter.Txnh
return iter
}
// zodb.IStorage iteration
// --- zodb.IStorage iteration ---
type iterFlags int
const (
......@@ -902,61 +968,41 @@ const (
iterPreloaded // data for this iteration was already preloaded
)
// txnIter is iterator over transaction records
type txnIter struct {
fsSeq *xbufio.SeqReaderAt
// zIter is transaction/data-records iterator as specified by zodb.IStorage
type zIter struct {
iter Iter
Txnh TxnHeader // current transaction information
TidStop zodb.Tid // iterate up to tid <= tidStop | tid >= tidStop depending on .dir
Flags iterFlags // iterate forward (> 0) / backward (< 0) / EOF reached (== 0)
}
// dataIter is iterator over data records inside one transaction
type dataIter struct {
fsSeq *xbufio.SeqReaderAt
Txnh *TxnHeader // header of transaction we are iterating inside
Datah DataHeader
Flags iterFlags
// data header for data loading
// XXX need to use separate dh because x.LoadData() changes x state while going through backpointers.
// XXX here to avoid allocations
// ( NOTE: need to use separate dh because x.LoadData() changes x state
// while going through backpointers.
//
// here to avoid allocations )
dhLoading DataHeader
sri zodb.StorageRecordInformation // ptr to this will be returned by NextData
dataBuf []byte
}
// iterator is transaction/data-records iterator as specified by zodb.IStorage
type iterator struct {
txnIter txnIter
dataIter dataIter
}
func (ti *txnIter) NextTxn(flags TxnLoadFlags) error {
// NextTxn iterates to next/previous transaction record according to iteration direction
func (zi *zIter) NextTxn() (*zodb.TxnInfo, zodb.IStorageRecordIterator, error) {
switch {
case ti.Flags & iterEOF != 0:
//println("already eof")
return io.EOF
// XXX needed?
case ti.Flags & iterPreloaded != 0:
// first element is already there - preloaded by who initialized txnIter
// first element is already there - preloaded by who initialized TxnIter
ti.Flags &= ^iterPreloaded
//fmt.Println("preloaded:", ti.Txnh.Tid)
default:
var err error
if ti.Flags & iterDir != 0 {
err = ti.Txnh.LoadNext(ti.fsSeq, flags)
} else {
err = ti.Txnh.LoadPrev(ti.fsSeq, flags)
}
err := zi.iter.NextTxn(LoadAll)
// XXX EOF ^^^ is not expected (range pre-cut to valid tids) ?
//fmt.Println("loaded:", ti.Txnh.Tid)
if err != nil {
return err
}
......@@ -970,48 +1016,51 @@ func (ti *txnIter) NextTxn(flags TxnLoadFlags) error {
return io.EOF
}
return nil
return &zi.iter.Txnh.TxnInfo, zi, nil
}
func (di *dataIter) NextData() (*zodb.StorageRecordInformation, error) {
err := di.Datah.LoadNext(di.fsSeq, di.Txnh)
// NextData iterates to next data record and loads data content
func (zi *zIter) NextData() (*zodb.StorageRecordInformation, error) {
err := zi.iter.NextData()
if err != nil {
return nil, err // XXX recheck
}
di.sri.Oid = di.Datah.Oid
di.sri.Tid = di.Datah.Tid
zi.sri.Oid = zi.iter.Datah.Oid
zi.sri.Tid = zi.iter.Datah.Tid
// NOTE dh.LoadData() changes dh state while going through backpointers -
// - need to use separate dh because of this
di.dhLoading = di.Datah
di.sri.Data = di.dataBuf
err = di.dhLoading.LoadData(di.fsSeq, &di.sri.Data)
zi.dhLoading = zi.iter.Datah
zi.sri.Data = zi.dataBuf
err = zi.dhLoading.LoadData(zi.iter.DataIter.fsSeq, &zi.sri.Data)
if err != nil {
return nil, err // XXX recheck
}
// if memory was reallocated - use it next time
if cap(di.sri.Data) > cap(di.dataBuf) {
di.dataBuf = di.sri.Data
if cap(zi.sri.Data) > cap(zi.dataBuf) {
zi.dataBuf = zi.sri.Data
}
di.sri.DataTid = di.dhLoading.Tid
return &di.sri, nil
zi.sri.DataTid = zi.dhLoading.Tid
return &zi.sri, nil
}
func (fsi *iterator) NextTxn() (*zodb.TxnInfo, zodb.IStorageRecordIterator, error) {
err := fsi.txnIter.NextTxn(LoadAll)
if err != nil {
return nil, nil, err // XXX recheck
}
// set .dataIter to iterate over .txnIter.Txnh
fsi.dataIter.Datah.Pos = fsi.txnIter.Txnh.DataPos()
fsi.dataIter.Datah.DataLen = -DataHeaderSize // first iteration will go to first data record
return &fsi.txnIter.Txnh.TxnInfo, &fsi.dataIter, nil
// iterStartError is the iterator created when there are preparatory errors
// this way we offload clients, besides handling NextTxn errors, from also
// handling error cases from Iterate.
//
// XXX bad idea? (e.g. it will prevent from devirtualizing what Iterate returns)
type iterStartError struct {
err error
}
func (e *iterStartError) NextTxn(*zodb.TxnInfo, zodb.IStorageIterator, error) {
return nil, nil, e.err
}
func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
......@@ -1024,14 +1073,15 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
tidMax = fs.txnhMax.Tid
}
// XXX naming
Iter := iterator{}
// XXX naming -> ziter?
ziter := &zIter{iter}
// when iterating use IO optimized for sequential access
// XXX -> IterateRaw
fsSeq := xbufio.NewSeqReaderAt(fs.file)
Iter.txnIter.fsSeq = fsSeq
Iter.dataIter.fsSeq = fsSeq
Iter.dataIter.Txnh = &Iter.txnIter.Txnh
iter.txnIter.fsSeq = fsSeq
iter.dataIter.fsSeq = fsSeq
iter.dataIter.Txnh = &iter.txnIter.Txnh
if tidMin > tidMax {
Iter.txnIter.Flags |= iterEOF // empty
......@@ -1065,7 +1115,7 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
}
if err != nil {
panic(err) // XXX
return &iterStartError{err} // XXX err ctx
}
//fmt.Printf("tidRange: %v..%v -> found %v @%v\n", tidMin, tidMax, iter.Txnh.Tid, iter.Txnh.Pos)
......@@ -1075,9 +1125,9 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
iter.Flags &= ^iterEOF
if iter.Flags&iterDir != 0 {
// when ^^^ we were searching forward first txn was already found
err = iter.Txnh.loadStrings(fs.file) // XXX ok? XXX -> move NextTxn() ?
err = iter.Txnh.loadStrings(fsSeq) // XXX ok? XXX -> move NextTxn() ?
if err != nil {
panic(err) // XXX
return &iterStartError{err}
}
iter.Flags |= iterPreloaded
}
......
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package fs1tools
//go:generate sh -c "python2 -c 'from ZODB.FileStorage import fsdump; fsdump.main()' ../testdata/1.fs >testdata/1.fsdump.ok"
//go:generate sh -c "python2 -c 'from ZODB.FileStorage.fsdump import Dumper; import sys; d = Dumper(sys.argv[1]); d.dump()' ../testdata/1.fs >testdata/1.fsdumpv.ok"
This diff is collapsed.
This diff is collapsed.
......@@ -165,6 +165,7 @@ type IStorage interface {
// tpc_finish(txn, callback) XXX clarify about callback
// tpc_abort(txn)
// XXX text
Iterate(tidMin, tidMax Tid) IStorageIterator // XXX , error ?
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment