Commit 08f2c436 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5adfa0c8
...@@ -1256,141 +1256,11 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator { ...@@ -1256,141 +1256,11 @@ func (fs *FileStorage) Iterate(tidMin, tidMax zodb.Tid) zodb.IStorageIterator {
// --- rebuilding index --- // --- rebuilding index ---
// UpdateIndex updates in-memory index from r's data in byte-range index.TopPos..topPos
//
// The use case is: we have index computed till some position; we open
// FileStorage and see there is more data; we update index from data range
// not-yet covered by the index.
//
// topPos=-1 means range to update from is index.TopPos..EOF
//
// XXX on error existing index is in invalid state?
func UpdateIndex(ctx context.Context, index *Index, r io.ReaderAt, topPos int64) (err error) {
defer xerr.Contextf(&err, "%s: reindex", xio.Name(r))
// XXX err ctx
defer func() {
// XXX it is possible to rollback index to consistent state to
// last wholly-processed txn (during every txn keep updating {}
// and if txn was not completed - rollback from that {})
if err != nil {
index.Clear()
index.TopPos = txnValidFrom
}
}()
// XXX if topPos >= 0 && topPos < index.TopPos {
// error
// }
// XXX another way to compute index: iterate backwards - then
// 1. index entry for oid is ready right after we see oid the first time
// 2. we can be sure we build the whole index if we saw all oids
it := Iterate(r, index.TopPos, IterForward)
for {
// check ctx cancel once per transaction
select {
case <-ctx.Done():
return ctx.Err()
default:
}
err = it.NextTxn(LoadNoStrings)
if err != nil {
return okEOF(err)
}
// XXX check txnh.Status != TxnInprogress
// check for overlapping txn & whether we are done.
// topPos=-1 will never match here
if it.Txnh.Pos < topPos && (it.Txnh.Pos + it.Txnh.Len) > topPos {
return fmt.Errorf("transaction %v @%v overlaps requested topPos %v",
it.Txnh.Tid, it.Txnh.Pos, topPos)
}
if it.Txnh.Pos == topPos {
return nil
}
index.TopPos = it.Txnh.Pos + it.Txnh.Len
for {
err = it.NextData()
if err != nil {
err = okEOF(err)
if err != nil {
return err
}
break
}
index.Set(it.Datah.Oid, it.Datah.Pos)
}
}
return nil
}
// BuildIndex builds new in-memory index for data in r
// XXX in case of error return partially built index? (index has .TopPos until which it covers the data)
func BuildIndex(ctx context.Context, r io.ReaderAt) (index *Index, err error) {
index = IndexNew()
index.TopPos = txnValidFrom
err = UpdateIndex(ctx, index, r, -1)
if err != nil {
return nil, err
}
return index, nil
}
func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err error) { func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err error) {
fsSeq := xbufio.NewSeqReaderAt(fs.file) fsSeq := xbufio.NewSeqReaderAt(fs.file)
return BuildIndex(ctx, fsSeq) return BuildIndex(ctx, fsSeq)
} }
// checkIndexSane quickly checks index sanity.
// It scans few transactions starting from index.TopPos backwards and verifies
// whether oid there have correct entries in the index.
// XXX return: ? (how calling code should distinguish IO error on main file from consistency check error)
func checkIndexSane(index *Index, r io.ReaderAt) (err error) {
defer xerr.Contextf(&err, "index quick check") // XXX +main file
it := Iterate(r, index.TopPos, IterBackward)
for i := 0; i < 10; i++ {
err := it.NextTxn(LoadNoStrings)
if err != nil {
err = okEOF(err)
return err // XXX err ctx
}
for {
err = it.NextData()
if err != nil {
if err == io.EOF {
break
}
return err // XXX err ctx
}
dataPos, ok := index.Get(it.Datah.Oid)
if !ok {
return fmt.Errorf("oid %v @%v: no index entry", it.Datah.Oid, it.Datah.Pos)
}
if dataPos != it.Datah.Pos {
return fmt.Errorf("oid %v @%v: index has wrong pos (%v)", it.Datah.Oid, it.Datah.Pos, dataPos)
}
}
}
return nil
}
// loadIndex loads on-disk index to RAM // loadIndex loads on-disk index to RAM
func (fs *FileStorage) loadIndex() (err error) { func (fs *FileStorage) loadIndex() (err error) {
// XXX lock? // XXX lock?
...@@ -1431,19 +1301,19 @@ func (fs *FileStorage) saveIndex() (err error) { ...@@ -1431,19 +1301,19 @@ func (fs *FileStorage) saveIndex() (err error) {
// IndexCorruptError is the error returned when index verification fails // IndexCorruptError is the error returned when index verification fails
// XXX but io errors during verification return not this // XXX but io errors during verification return not this
type IndexCorruptError struct { type indexCorruptError struct {
index *Index index *Index
indexOk *Index indexOk *Index
} }
func (e *IndexCorruptError) Error() string { func (e *indexCorruptError) Error() string {
// TODO show delta ? // TODO show delta ?
return "index corrupt" return "index corrupt"
} }
// VerifyIndex verifies that index is correct // VerifyIndex verifies that index is correct
// XXX -> not exported @ fs1 // XXX -> not exported @ fs1
func (fs *FileStorage) VerifyIndex(ctx context.Context) error { func (fs *FileStorage) verifyIndex(ctx context.Context) error {
// XXX lock appends? // XXX lock appends?
// XXX if .index is not yet loaded - load it // XXX if .index is not yet loaded - load it
...@@ -1454,7 +1324,7 @@ func (fs *FileStorage) VerifyIndex(ctx context.Context) error { ...@@ -1454,7 +1324,7 @@ func (fs *FileStorage) VerifyIndex(ctx context.Context) error {
} }
if !indexOk.Equal(fs.index) { if !indexOk.Equal(fs.index) {
err = &IndexCorruptError{index: fs.index, indexOk: indexOk} err = &indexCorruptError{index: fs.index, indexOk: indexOk}
} }
return err return err
...@@ -1463,7 +1333,7 @@ func (fs *FileStorage) VerifyIndex(ctx context.Context) error { ...@@ -1463,7 +1333,7 @@ func (fs *FileStorage) VerifyIndex(ctx context.Context) error {
// Reindex rebuilds the index // Reindex rebuilds the index
// XXX -> not exported @ fs1 // XXX -> not exported @ fs1
func (fs *FileStorage) Reindex(ctx context.Context) error { func (fs *FileStorage) reindex(ctx context.Context) error {
// XXX lock appends? // XXX lock appends?
index, err := fs.computeIndex(ctx) index, err := fs.computeIndex(ctx)
......
...@@ -40,8 +40,8 @@ import ( ...@@ -40,8 +40,8 @@ import (
"lab.nexedi.com/kirr/neo/go/xcommon/xio" "lab.nexedi.com/kirr/neo/go/xcommon/xio"
) )
// Index is Oid -> Data record position mapping used to associate Oid with // Index is in-RAM Oid -> Data record position mapping used to associate Oid
// Data record in latest transaction which changed it. // with Data record in latest transaction which changed it.
type Index struct { type Index struct {
// this index covers data file up to < .TopPos // this index covers data file up to < .TopPos
// usually for whole-file index TopPos is position pointing just past // usually for whole-file index TopPos is position pointing just past
...@@ -56,9 +56,11 @@ func IndexNew() *Index { ...@@ -56,9 +56,11 @@ func IndexNew() *Index {
return &Index{Tree: fsb.TreeNew()} return &Index{Tree: fsb.TreeNew()}
} }
// NOTE Get/Set/... are taken as-is from fsb.Tree // NOTE Get/Set/... are taken as-is from fsb.Tree
// --- index load/save ---
// on-disk index format // on-disk index format
// (changed in 2010 in https://github.com/zopefoundation/ZODB/commit/1bb14faf) // (changed in 2010 in https://github.com/zopefoundation/ZODB/commit/1bb14faf)
// //
...@@ -164,7 +166,7 @@ out: ...@@ -164,7 +166,7 @@ out:
return &IndexSaveError{err} return &IndexSaveError{err}
} }
// SaveFile saves index to a file // SaveFile saves index to a file @ path
func (fsi *Index) SaveFile(path string) (err error) { func (fsi *Index) SaveFile(path string) (err error) {
f, err := os.Create(path) f, err := os.Create(path)
if err != nil { if err != nil {
...@@ -391,3 +393,184 @@ func treeEqual(a, b *fsb.Tree) bool { ...@@ -391,3 +393,184 @@ func treeEqual(a, b *fsb.Tree) bool {
return true return true
} }
// --- build/verify index from FileStorage data ---
// Update updates in-memory index from r's FileStorage data in byte-range index.TopPos..topPos
//
// The use case is: we have index computed till some position; we open
// FileStorage and see there is more data; we update index from data range
// not-yet covered by the index.
//
// topPos=-1 means range to update from is index.TopPos..EOF
//
// XXX on error existing index is in invalid state?
// XXX naming (more specific -> UpdateXXX as just Update() is confusing)
func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64) (err error) {
defer xerr.Contextf(&err, "%s: reindex", xio.Name(r))
// XXX err ctx
defer func() {
// XXX it is possible to rollback index to consistent state to
// last wholly-processed txn (during every txn keep updating {}
// and if txn was not completed - rollback from that {})
if err != nil {
index.Clear()
index.TopPos = txnValidFrom
}
}()
// XXX if topPos >= 0 && topPos < index.TopPos {
// error
// }
// XXX another way to compute index: iterate backwards - then
// 1. index entry for oid is ready right after we see oid the first time
// 2. we can be sure we build the whole index if we saw all oids
it := Iterate(r, index.TopPos, IterForward)
for {
// check ctx cancel once per transaction
select {
case <-ctx.Done():
return ctx.Err()
default:
}
err = it.NextTxn(LoadNoStrings)
if err != nil {
// XXX if EOF earlier topPos -> error
return okEOF(err)
}
// XXX check txnh.Status != TxnInprogress
// check for overlapping txn & whether we are done.
// topPos=-1 will never match here
if it.Txnh.Pos < topPos && (it.Txnh.Pos + it.Txnh.Len) > topPos {
return fmt.Errorf("transaction %v @%v overlaps requested topPos %v",
it.Txnh.Tid, it.Txnh.Pos, topPos)
}
if it.Txnh.Pos == topPos {
return nil
}
index.TopPos = it.Txnh.Pos + it.Txnh.Len
for {
err = it.NextData()
if err != nil {
err = okEOF(err)
if err != nil {
return err
}
break
}
index.Set(it.Datah.Oid, it.Datah.Pos)
}
}
return nil
}
// BuildIndex builds new in-memory index for data in r
// XXX in case of error return partially built index? (index has .TopPos until which it covers the data)
func BuildIndex(ctx context.Context, r io.ReaderAt) (index *Index, err error) {
index = IndexNew()
index.TopPos = txnValidFrom
err = UpdateIndex(ctx, index, r, -1)
if err != nil {
return nil, err
}
return index, nil
}
// VerifyNTxn checks index correctness against several transactions of FileStorage data in r.
//
// It scans ntxn transactions starting from index.TopPos backwards and verifies
// whether oid there have correct entries in the index.
// XXX return: ? (how calling code should distinguish IO error on main file from consistency check error)
// XXX naming?
func (index *Index) VerifyNTxn(ctx context.Context, r io.ReaderAt, ntxn int) (oidChecked map[zodb.Oid]struct{}, err error) {
defer xerr.Contextf(&err, "index quick check") // XXX +main file
it := Iterate(r, index.TopPos, IterBackward)
oidSeen := map[zodb.Oid]struct{}{} // Set<zodb.Oid>
// XXX ntxn=-1 - check all
for i := 0; i < ntxn; i++ {
// XXX handle ctx cancel
err := it.NextTxn(LoadNoStrings)
if err != nil {
err = okEOF(err)
return err // XXX err ctx
}
for {
err = it.NextData()
if err != nil {
if err == io.EOF {
break
}
return err // XXX err ctx
}
// if oid was already checked - do not check index anymore
// (index has info only about latest entries)
if _, seen := oidSeen[it.Datah.Oid]; seen {
continue
}
oidSeen[id.Datah.Oid] = struct{}{}
dataPos, ok := index.Get(it.Datah.Oid)
if !ok {
return fmt.Errorf("oid %v @%v: no index entry", it.Datah.Oid, it.Datah.Pos)
}
if dataPos != it.Datah.Pos {
return fmt.Errorf("oid %v @%v: index has wrong pos (%v)", it.Datah.Oid, it.Datah.Pos, dataPos)
}
}
}
return nil
}
// Verify checks index correctness against FileStorage data in r.
//
// it verifies whether index is exactly the same as if it was build anew for
// data in range ..index.TopPos .
func (index *Index) Verify(ctx context.Context, r io.ReaderAt) error {
oidChecked, err := index.VerifyNTxn(ctx, r, -1)
if err != nil {
return err
}
// all oids from data were checked to be in index
// now verify that there is no extra oids in index
if len(oidChecked) == index.Len() {
return nil
}
e, _ = index.SeekFirst() // !nil as nil means index.Len=0 and len(oidChecked) <= index.Len
defer e.Close()
for {
oid, pos, errStop := e.Next()
if errStop != nil {
break
}
if _, ok := oidChecked[oid]; !ok {
return fmt.Errorf("oid %v: present in index but not in data", oid)
}
}
return nil
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment