Commit 5adfa0c8 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a66fd108
...@@ -1256,7 +1256,7 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { ...@@ -1256,7 +1256,7 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
// --- rebuilding index --- // --- rebuilding index ---
// updateIndex updates index from r's data in byte-range index.TopPos..topPos // UpdateIndex updates in-memory index from r's data in byte-range index.TopPos..topPos
// //
// The use case is: we have index computed till some position; we open // The use case is: we have index computed till some position; we open
// FileStorage and see there is more data; we update index from data range // FileStorage and see there is more data; we update index from data range
...@@ -1265,9 +1265,14 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { ...@@ -1265,9 +1265,14 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
// topPos=-1 means range to update from is index.TopPos..EOF // topPos=-1 means range to update from is index.TopPos..EOF
// //
// XXX on error existing index is in invalid state? // XXX on error existing index is in invalid state?
func updateIndex(ctx context.Context, index *Index, r io.ReaderAt, topPos int64) (err error) { func UpdateIndex(ctx context.Context, index *Index, r io.ReaderAt, topPos int64) (err error) {
defer xerr.Contextf(&err, "%s: reindex", xio.Name(r))
// XXX err ctx // XXX err ctx
defer func() { defer func() {
// XXX it is possible to rollback index to consistent state to
// last wholly-processed txn (during every txn keep updating {}
// and if txn was not completed - rollback from that {})
if err != nil { if err != nil {
index.Clear() index.Clear()
index.TopPos = txnValidFrom index.TopPos = txnValidFrom
...@@ -1278,9 +1283,12 @@ func updateIndex(ctx context.Context, index *Index, r io.ReaderAt, topPos int64) ...@@ -1278,9 +1283,12 @@ func updateIndex(ctx context.Context, index *Index, r io.ReaderAt, topPos int64)
// error // error
// } // }
// XXX another way to compute index: iterate backwards - then
// 1. index entry for oid is ready right after we see oid the first time
// 2. we can be sure we build the whole index if we saw all oids
it := Iterate(r, index.TopPos, IterForward) it := Iterate(r, index.TopPos, IterForward)
loop:
for { for {
// check ctx cancel once per transaction // check ctx cancel once per transaction
select { select {
...@@ -1291,15 +1299,14 @@ loop: ...@@ -1291,15 +1299,14 @@ loop:
err = it.NextTxn(LoadNoStrings) err = it.NextTxn(LoadNoStrings)
if err != nil { if err != nil {
err = okEOF(err) return okEOF(err)
break
} }
// XXX check txnh.Status != TxnInprogress // XXX check txnh.Status != TxnInprogress
// check for overlapping txn & whether we are done. // check for overlapping txn & whether we are done.
// topPos=-1 will never match here // topPos=-1 will never match here
if it.Txnh.Pos < topPos && (it.Txnh.Pos + it.Txnh.Len) >= topPos { if it.Txnh.Pos < topPos && (it.Txnh.Pos + it.Txnh.Len) > topPos {
return fmt.Errorf("transaction %v @%v overlaps requested topPos %v", return fmt.Errorf("transaction %v @%v overlaps requested topPos %v",
it.Txnh.Tid, it.Txnh.Pos, topPos) it.Txnh.Tid, it.Txnh.Pos, topPos)
} }
...@@ -1314,7 +1321,7 @@ loop: ...@@ -1314,7 +1321,7 @@ loop:
if err != nil { if err != nil {
err = okEOF(err) err = okEOF(err)
if err != nil { if err != nil {
break loop return err
} }
break break
} }
...@@ -1323,35 +1330,28 @@ loop: ...@@ -1323,35 +1330,28 @@ loop:
} }
} }
if err != nil {
return err
}
return nil return nil
} }
// computeIndex builds new in-memory index for FileStorage // BuildIndex builds new in-memory index for data in r
// XXX naming
// XXX in case of error return partially built index? (index has .TopPos until which it covers the data) // XXX in case of error return partially built index? (index has .TopPos until which it covers the data)
func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err error) { func BuildIndex(ctx context.Context, r io.ReaderAt) (index *Index, err error) {
defer xerr.Contextf(&err, "%s: reindex", fs.file.Name())
index = IndexNew() index = IndexNew()
index.TopPos = txnValidFrom index.TopPos = txnValidFrom
// XXX another way to compute index: iterate backwards - then err = UpdateIndex(ctx, index, r, -1)
// 1. index entry for oid is ready right after we see oid the first time
// 2. we can be sure we build the whole index if we saw all oids
fsSeq := xbufio.NewSeqReaderAt(fs.file)
err = updateIndex(ctx, index, fsSeq, -1)
if err != nil { if err != nil {
return nil, err // XXX ok? return nil, err
} }
return index, nil return index, nil
} }
func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err error) {
fsSeq := xbufio.NewSeqReaderAt(fs.file)
return BuildIndex(ctx, fsSeq)
}
// checkIndexSane quickly checks index sanity. // checkIndexSane quickly checks index sanity.
// It scans few transactions starting from index.TopPos backwards and verifies // It scans few transactions starting from index.TopPos backwards and verifies
// whether oid there have correct entries in the index. // whether oid there have correct entries in the index.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment