Commit 3be18ddb authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 558c79cf
...@@ -140,13 +140,19 @@ const ( ...@@ -140,13 +140,19 @@ const (
Magic = "FS21" // every FileStorage file starts with this Magic = "FS21" // every FileStorage file starts with this
// on-disk sizes // on-disk sizes
FileHeaderSize = 4
TxnHeaderFixSize = 8+8+1+2+2+2 // without user/desc/ext strings TxnHeaderFixSize = 8+8+1+2+2+2 // without user/desc/ext strings
txnXHeaderFixSize = 8 + TxnHeaderFixSize // ^^^ with trail LenPrev from previous record txnXHeaderFixSize = 8 + TxnHeaderFixSize // ^^^ with trail LenPrev from previous record
DataHeaderSize = 8+8+8+8+2+8 DataHeaderSize = 8+8+8+8+2+8
// txn/data pos that are < vvv are for sure invalid // txn/data pos that are < vvv are for sure invalid
txnValidFrom = int64(len(Magic)) txnValidFrom = int64(len(Magic)) // XXX = FileHeaderSize
dataValidFrom = txnValidFrom + TxnHeaderFixSize dataValidFrom = txnValidFrom + TxnHeaderFixSize
// invalid length that indicates start of iteration for TxnHeader LoadNext/LoadPrev
// NOTE load routines check loaded fields to be valid and .Len in particular,
// so it can never come to us from outside to be this.
lenIterStart int64 = -0x1111111111111112 // = 0xeeeeeeeeeeeeeeee if unsigned
) )
// ErrTxnRecord is returned on transaction record read / decode errors // ErrTxnRecord is returned on transaction record read / decode errors
...@@ -435,6 +441,19 @@ func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error { ...@@ -435,6 +441,19 @@ func (txnh *TxnHeader) LoadPrev(r io.ReaderAt, flags TxnLoadFlags) error {
bug(txnh, "LoadPrev() when .LenPrev == error") bug(txnh, "LoadPrev() when .LenPrev == error")
case -1: case -1:
return io.EOF return io.EOF
case lenIterStart:
// start of iteration backward:
// read LenPrev @pos, then tail to LoadPrev
pos := txnh.Pos
err := txnh.Load(r, pos, flags)
if txnh.LenPrev == 0 {
if err == nil {
panic("nil err with txnh.LenPrev = error")
}
return err
}
return txnh.LoadPrev(r, flags)
} }
lenCur := txnh.Len lenCur := txnh.Len
...@@ -471,6 +490,10 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error { ...@@ -471,6 +490,10 @@ func (txnh *TxnHeader) LoadNext(r io.ReaderAt, flags TxnLoadFlags) error {
bug(txnh, "LoadNext() when .Len == error") bug(txnh, "LoadNext() when .Len == error")
case -1: case -1:
return io.EOF return io.EOF
case lenIterStart:
// start of iteration forward
return txnh.Load(r, posCur, flags)
} }
// valid .Len means txnh was read ok // valid .Len means txnh was read ok
...@@ -729,6 +752,63 @@ func (dh *DataHeader) LoadData(r io.ReaderAt, buf *[]byte) error { ...@@ -729,6 +752,63 @@ func (dh *DataHeader) LoadData(r io.ReaderAt, buf *[]byte) error {
return nil return nil
} }
// --- raw iteration ---
// Iter is combined 2-level iterator over transaction and data records
type Iter struct {
R io.ReaderAt
Dir IterDir
Txnh TxnHeader // current transaction record information
Datah DataHeader // current data record information
}
type IterDir int
const (
IterForward IterDir = iota
IterBackward
)
// NextTxn iterates to next/previous transaction record according to iteration direction.
// The data header is reset to iterate inside transaction record that becomes current.
func (it *Iter) NextTxn(flags TxnLoadFlags) error {
var err error
switch it.Dir {
case IterForward:
err = it.Txnh.LoadNext(it.R, flags)
case IterBackward:
err = it.Txnh.LoadPrev(it.R, flags)
default:
panic("Iter.Dir invalid")
}
//fmt.Println("loaded:", it.Txnh.Tid)
if err != nil {
// reset .Datah to be invalid (just in case)
it.Datah.Pos = 0
it.Datah.DataLen = 0
} else {
// set .Datah to iterate over .Txnh
it.Datah.Pos = it.Txnh.DataPos()
it.Datah.DataLen = -DataHeaderSize // first iteration will go to first data record
}
return err
}
// NextData iterates to next data record header inside current transaction
func (it *Iter) NextData() error {
return it.Datah.LoadNext(it.R, &it.Txnh)
}
// Iterate creates Iter to iterate over r starting from posStart in direction dir
func Iterate(r io.ReaderAt, posStart int64, dir IterDir) *Iter {
it := &Iter{R: r, Dir: dir, Txnh: TxnHeader{Pos: posStart, Len: lenIterStart}}
return it
}
// --- FileStorage --- // --- FileStorage ---
func (fs *FileStorage) StorageName() string { func (fs *FileStorage) StorageName() string {
...@@ -746,13 +826,10 @@ func open(path string) (*FileStorage, error) { ...@@ -746,13 +826,10 @@ func open(path string) (*FileStorage, error) {
fs.file = f fs.file = f
// check file magic // check file magic
var xxx [len(Magic)]byte fh := FileHeader{}
_, err = f.ReadAt(xxx[:], 0) err = fh.Load(f)
if err != nil { if err != nil {
return nil, err // XXX err more context return nil, err
}
if string(xxx[:]) != Magic {
return nil, fmt.Errorf("%s: invalid magic %q", path, xxx) // XXX err?
} }
/* /*
...@@ -914,75 +991,15 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) ...@@ -914,75 +991,15 @@ func (fs *FileStorage) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error)
return data, tid, nil return data, tid, nil
} }
// --- raw iteration ---
// Iter is combined 2-level iterator over transaction and data records
type Iter struct {
fsSeq *xbufio.SeqReaderAt
Flags iterFlags // XXX iterate forward (> 0) / backward (< 0) / EOF reached (== 0)
Txnh TxnHeader // current transaction record information
Datah DataHeader // current data record information
}
// NextTxn iterates to next/previous transaction record according to iteration direction.
// The data header is reset to iterate inside transaction record that became current.
func (it *Iter) NextTxn(flags TxnLoadFlags) error {
var err error
if it.Flags & iterDir != 0 {
err = it.Txnh.LoadNext(it.fsSeq, flags)
} else {
err = it.Txnh.LoadPrev(it.fsSeq, flags)
}
//fmt.Println("loaded:", it.Txnh.Tid)
if err != nil {
// reset .Datah to be invalid (just in case)
it.Datah.Pos = 0
it.Datah.DataLen = 0
} else {
// set .Datah to iterate over .Txnh
it.Datah.Pos = it.Txnh.DataPos()
it.Datah.DataLen = -DataHeaderSize // first iteration will go to first data record
}
return err
}
// NextData iterates to next data record header inside current transaction
func (it *Iter) NextData() error {
return it.Datah.LoadNext(it.fsSeq, &it.Txnh)
}
// IterateRaw ... XXX
func (fs *FileStorage) IterateRaw(dir/*XXX fwd/back*/) *Iter {
// when iterating use IO optimized for sequential access
fsSeq := xbufio.NewSeqReaderAt(fs.file)
// XXX setup .TxnIter.dir and start
it := &Iter{fsSeq: fsSeq} // XXX ...
//it.DataIter.Txnh = &iter.txnIter.Txnh
return it
}
// --- zodb.IStorage iteration --- // --- zodb.IStorage iteration ---
type iterFlags int // zIter is combined transaction/data-records iterator as specified by zodb.IStorage
const (
iterDir iterFlags = 1 << iota // iterate forward (1) or backward (0)
iterEOF // EOF reached
iterPreloaded // data for this iteration was already preloaded
)
// zIter is transaction/data-records iterator as specified by zodb.IStorage
type zIter struct { type zIter struct {
iter Iter iter Iter
TidStop zodb.Tid // iterate up to tid <= tidStop | tid >= tidStop depending on .dir TidStop zodb.Tid // iterate up to tid <= tidStop | tid >= tidStop depending on .dir
Flags iterFlags zFlags zIterFlags
// data header for data loading // data header for data loading
// ( NOTE: need to use separate dh because x.LoadData() changes x state // ( NOTE: need to use separate dh because x.LoadData() changes x state
...@@ -995,17 +1012,23 @@ type zIter struct { ...@@ -995,17 +1012,23 @@ type zIter struct {
dataBuf []byte dataBuf []byte
} }
type zIterFlags int
const (
zIterEOF zIterFlags = 1 << iota // EOF reached
zIterPreloaded // data for this iteration was already preloaded
)
// NextTxn iterates to next/previous transaction record according to iteration direction // NextTxn iterates to next/previous transaction record according to iteration direction
func (zi *zIter) NextTxn() (*zodb.TxnInfo, zodb.IStorageRecordIterator, error) { func (zi *zIter) NextTxn() (*zodb.TxnInfo, zodb.IStorageRecordIterator, error) {
switch { switch {
case zi.Flags & iterEOF != 0: case zi.zFlags & zIterEOF != 0:
//println("already eof") //println("already eof")
return nil, nil, io.EOF return nil, nil, io.EOF
// XXX needed? // XXX needed?
case zi.Flags & iterPreloaded != 0: case zi.zFlags & zIterPreloaded != 0:
// first element is already there - preloaded by who initialized TxnIter // first element is already there - preloaded by who initialized TxnIter
zi.Flags &= ^iterPreloaded zi.zFlags &= ^zIterPreloaded
//fmt.Println("preloaded:", zi.Txnh.Tid) //fmt.Println("preloaded:", zi.Txnh.Tid)
default: default:
...@@ -1017,10 +1040,10 @@ func (zi *zIter) NextTxn() (*zodb.TxnInfo, zodb.IStorageRecordIterator, error) { ...@@ -1017,10 +1040,10 @@ func (zi *zIter) NextTxn() (*zodb.TxnInfo, zodb.IStorageRecordIterator, error) {
} }
// XXX how to make sure last good txnh is preserved? // XXX how to make sure last good txnh is preserved?
if (zi.Flags&iterDir != 0 && zi.iter.Txnh.Tid > zi.TidStop) || if (zi.iter.Dir == IterForward && zi.iter.Txnh.Tid > zi.TidStop) ||
(zi.Flags&iterDir == 0 && zi.iter.Txnh.Tid < zi.TidStop) { (zi.iter.Dir == IterBackward && zi.iter.Txnh.Tid < zi.TidStop) {
//println("-> EOF") //println("-> EOF")
zi.Flags |= iterEOF zi.zFlags |= zIterEOF
return nil, nil, io.EOF return nil, nil, io.EOF
} }
...@@ -1067,10 +1090,11 @@ type iterStartError struct { ...@@ -1067,10 +1090,11 @@ type iterStartError struct {
err error err error
} }
func (e *iterStartError) NextTxn(*zodb.TxnInfo, zodb.IStorageIterator, error) { func (e *iterStartError) NextTxn() (*zodb.TxnInfo, zodb.IStorageRecordIterator, error) {
return nil, nil, e.err return nil, nil, e.err
} }
// Iterate creates zodb-level iterator for tidMin..tidMax range
func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
//fmt.Printf("iterate %v..%v\n", tidMin, tidMax) //fmt.Printf("iterate %v..%v\n", tidMin, tidMax)
// FIXME case when only 0 or 1 txn present // FIXME case when only 0 or 1 txn present
...@@ -1081,35 +1105,34 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { ...@@ -1081,35 +1105,34 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
tidMax = fs.txnhMax.Tid tidMax = fs.txnhMax.Tid
} }
// XXX naming -> ziter? ziter := &zIter{iter: Iter{}}
ziter := &zIter{iter}
// when iterating use IO optimized for sequential access // when iterating use IO optimized for sequential access
// XXX -> IterateRaw // XXX -> IterateRaw ?
fsSeq := xbufio.NewSeqReaderAt(fs.file) fsSeq := xbufio.NewSeqReaderAt(fs.file)
iter.txnIter.fsSeq = fsSeq ziter.iter.fsSeq = fsSeq
iter.dataIter.fsSeq = fsSeq
iter.dataIter.Txnh = &iter.txnIter.Txnh
if tidMin > tidMax { if tidMin > tidMax {
Iter.txnIter.Flags |= iterEOF // empty ziter.zFlags |= zIterEOF // empty
return &Iter return ziter
} }
// scan either from file start or end, depending which way it is likely closer, to tidMin // scan either from file start or end, depending which way it is likely closer, to tidMin
// XXX put iter into ptr to Iter ^^^ // XXX put iter into ptr to Iter ^^^
iter := &Iter.txnIter iter := &ziter.iter
if (tidMin - fs.txnhMin.Tid) < (fs.txnhMax.Tid - tidMin) { if (tidMin - fs.txnhMin.Tid) < (fs.txnhMax.Tid - tidMin) {
//fmt.Printf("forward %.1f%%\n", 100 * float64(tidMin - fs.txnhMin.Tid) / float64(fs.txnhMax.Tid - fs.txnhMin.Tid)) //fmt.Printf("forward %.1f%%\n", 100 * float64(tidMin - fs.txnhMin.Tid) / float64(fs.txnhMax.Tid - fs.txnhMin.Tid))
iter.Flags = 1*iterDir | iterPreloaded iter.Dir = IterForward
iter.Txnh.CloneFrom(&fs.txnhMin) iter.Txnh.CloneFrom(&fs.txnhMin)
iter.TidStop = tidMin - 1 // XXX overflow ziter.zFlags = zIterPreloaded
ziter.TidStop = tidMin - 1 // XXX overflow
} else { } else {
//fmt.Printf("backward %.1f%%\n", 100 * float64(tidMin - fs.txnhMin.Tid) / float64(fs.txnhMax.Tid - fs.txnhMin.Tid)) //fmt.Printf("backward %.1f%%\n", 100 * float64(tidMin - fs.txnhMin.Tid) / float64(fs.txnhMax.Tid - fs.txnhMin.Tid))
iter.Flags = 0*iterDir | iterPreloaded iter.Dir = IterBackward
iter.Txnh.CloneFrom(&fs.txnhMax) iter.Txnh.CloneFrom(&fs.txnhMax)
iter.TidStop = tidMin ziter.zFlags = zIterPreloaded
ziter.TidStop = tidMin
} }
// XXX recheck how we enter loop // XXX recheck how we enter loop
...@@ -1130,19 +1153,19 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { ...@@ -1130,19 +1153,19 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
// where to start around tidMin found - let's reinitialize iter to // where to start around tidMin found - let's reinitialize iter to
// iterate appropriately forward up to tidMax // iterate appropriately forward up to tidMax
iter.Flags &= ^iterEOF ziter.zFlags &= ^zIterEOF
if iter.Flags&iterDir != 0 { if iter.Dir == IterForward {
// when ^^^ we were searching forward first txn was already found // when ^^^ we were searching forward first txn was already found
err = iter.Txnh.loadStrings(fsSeq) // XXX ok? XXX -> move NextTxn() ? err = iter.Txnh.loadStrings(fsSeq) // XXX ok? XXX -> move NextTxn() ?
if err != nil { if err != nil {
return &iterStartError{err} return &iterStartError{err}
} }
iter.Flags |= iterPreloaded ziter.zFlags |= zIterPreloaded
} }
iter.Flags |= iterDir iter.Dir = IterForward
iter.TidStop = tidMax ziter.TidStop = tidMax
return &Iter return ziter
} }
// computeIndex builds new in-memory index for FileStorage // computeIndex builds new in-memory index for FileStorage
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment