Commit d3bf6538 authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb/fs1: Add routines to (re)build and verify index from/wrt original FileStorage data

parent 8fa9fdaf
......@@ -23,6 +23,7 @@ package fs1
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
......@@ -398,3 +399,319 @@ func treeEqual(a, b *fsb.Tree) bool {
return true
}
// --- build index from FileStorage data ---
// IndexUpdateProgress is data sent by Index.Update to notify about progress
type IndexUpdateProgress struct {
TopPos int64 // data range to update to; if = -1 -- till EOF
TxnIndexed int // # transactions read/indexed so far
Index *Index // index built so far
Iter *Iter // iterator through data XXX needed?
}
// Update updates in-memory index from r's FileStorage data in byte-range index.TopPos..topPos
//
// The use case is: we have index computed till some position; we open
// FileStorage and see there is more data; we update index from data range
// not-yet covered by the index.
//
// topPos=-1 means data range to update from is index.TopPos..EOF
//
// The index stays valid even in case of error - then index is updated but only
// partially. The index always stays consistent as updates to it are applied as
// a whole once per data transaction. On return index.TopPos indicates till
// which position in data the index could be updated.
//
// On success returned error is nil and index.TopPos is set to either:
// - topPos (if it is != -1), or
// - r's position at which read got EOF (if topPos=-1).
func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, progress func(*IndexUpdateProgress)) (err error) {
defer xerr.Contextf(&err, "%sreindex %v..%v", ioprefix(r), index.TopPos, topPos)
if topPos >= 0 && index.TopPos > topPos {
return fmt.Errorf("backward update requested")
}
// XXX another way to compute index: iterate backwards - then
// 1. index entry for oid is ready right after we see oid the first time
// 2. we can be sure we build the whole index if we saw all oids
it := Iterate(r, index.TopPos, IterForward)
pd := &IndexUpdateProgress{
TopPos: topPos,
Index: index,
Iter: it,
}
for {
// check ctx cancel once per transaction
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// iter to next txn
err = it.NextTxn(LoadNoStrings)
if err != nil {
err = okEOF(err)
if err == nil {
// if EOF earlier topPos -> error
if topPos >= 0 && index.TopPos < topPos {
err = fmt.Errorf("unexpected EOF @%v", index.TopPos)
}
}
return err
}
// XXX check txnh.Status != TxnInprogress
// check for topPos overlapping txn & whether we are done.
// topPos=-1 will never match here
if it.Txnh.Pos < topPos && (it.Txnh.Pos+it.Txnh.Len) > topPos {
return fmt.Errorf("transaction %v @%v overlaps topPos boundary",
it.Txnh.Tid, it.Txnh.Pos)
}
if it.Txnh.Pos == topPos {
return nil
}
// collect data for index update in temporary place.
// do not update the index immediately so that in case of error
// in the middle of txn's data, index stays consistent and
// correct for topPos pointing to previous transaction.
update := map[zodb.Oid]int64{} // XXX malloc every time -> better reuse
for {
err = it.NextData()
if err != nil {
err = okEOF(err)
if err != nil {
return err
}
break
}
update[it.Datah.Oid] = it.Datah.Pos
}
// update index "atomically" with data from just read transaction
index.TopPos = it.Txnh.Pos + it.Txnh.Len
for oid, pos := range update {
index.Set(oid, pos)
}
// notify progress
if progress != nil {
pd.TxnIndexed++
progress(pd)
}
}
return nil
}
// BuildIndex builds new in-memory index for data in r.
//
// non-nil valid and consistent index is always returned - even in case of error
// the index will describe data till top-position of highest transaction that
// could be read without error.
//
// In such cases the index building could be retried to be finished with
// index.Update().
func BuildIndex(ctx context.Context, r io.ReaderAt, progress func(*IndexUpdateProgress)) (*Index, error) {
index := IndexNew()
err := index.Update(ctx, r, -1, progress)
return index, err
}
// BuildIndexForFile builds new in-memory index for data in file @ path.
//
// See BuildIndex for semantic description.
func BuildIndexForFile(ctx context.Context, path string, progress func(*IndexUpdateProgress)) (index *Index, err error) {
f, err := os.Open(path)
if err != nil {
return IndexNew(), err
}
defer func() {
err2 := f.Close()
err = xerr.First(err, err2)
}()
// use IO optimized for sequential access when building index
fSeq := seqReadAt(f)
return BuildIndex(ctx, fSeq, progress)
}
// --- verify index against data in FileStorage ---
// IndexCorruptError is the error type returned by index verification routines
// when index was found to not match original FileStorage data.
type IndexCorruptError struct {
DataFileName string // present if data IO object was with .Name()
Detail string
}
func (e *IndexCorruptError) Error() string {
prefix := e.DataFileName
if prefix != "" {
prefix += ": "
}
return fmt.Sprintf("%sverify index: %s", prefix, e.Detail)
}
func indexCorrupt(f interface{}, format string, argv ...interface{}) *IndexCorruptError {
return &IndexCorruptError{DataFileName: ioname(f), Detail: fmt.Sprintf(format, argv...)}
}
// IndexVerifyProgress is data sent by Index.Verify to notify about progress
type IndexVerifyProgress struct {
TxnTotal int // total # of transactions to verify; if = -1 -- whole data
TxnChecked int
Index *Index // index verification runs for
Iter *Iter // iterator through data
OidChecked map[zodb.Oid]struct{} // oid checked so far
}
// Verify checks index correctness against FileStorage data in r.
//
// For ntxn transactions starting from index.TopPos backwards, it verifies
// whether oid there have correct entries in the index.
//
// ntxn=-1 means data range to verify is till start of the file.
//
// If whole data file was covered (either ntxn is big enough or was set = -1)
// additional checks are performed to make sure there is no extra entries in
// the index. For whole-data file cases Verify thus checks whether index is
// exactly the same as if it was build anew for data in range ..index.TopPos .
//
// Returned error is either:
// - of type *IndexCorruptError, when data in index was found not to match original data, or
// - any other error type representing e.g. IO error when reading original data or something else.
func (index *Index) Verify(ctx context.Context, r io.ReaderAt, ntxn int, progress func(*IndexVerifyProgress)) (oidChecked map[zodb.Oid]struct{}, err error) {
defer func() {
if _, ok := err.(*IndexCorruptError); ok {
return // leave it as is
}
xerr.Contextf(&err, "%sverify index @%v~{%v}", ioprefix(r), index.TopPos, ntxn)
}()
oidChecked = map[zodb.Oid]struct{}{} // Set<zodb.Oid>
wholeData := false
it := Iterate(r, index.TopPos, IterBackward)
pd := &IndexVerifyProgress{
TxnTotal: ntxn,
Index: index,
Iter: it,
OidChecked: oidChecked,
}
for i := 0; ntxn == -1 || i < ntxn; i++ {
// check ctx cancel once per transaction
select {
case <-ctx.Done():
return oidChecked, ctx.Err()
default:
}
err := it.NextTxn(LoadNoStrings)
if err != nil {
if err == io.EOF {
wholeData = true
break
}
return oidChecked, err
}
for {
err = it.NextData()
if err != nil {
if err == io.EOF {
break
}
return oidChecked, err
}
// if oid was already checked - do not check index anymore
// (index has info only about latest entries)
if _, ok := oidChecked[it.Datah.Oid]; ok {
continue
}
oidChecked[it.Datah.Oid] = struct{}{}
dataPos, ok := index.Get(it.Datah.Oid)
if !ok {
return oidChecked, indexCorrupt(r, "oid %v @%v: no index entry",
it.Datah.Oid, it.Datah.Pos)
}
if dataPos != it.Datah.Pos {
return oidChecked, indexCorrupt(r, "oid %v @%v: index has wrong pos (%v)",
it.Datah.Oid, it.Datah.Pos, dataPos)
}
}
// notify progress
if progress != nil {
pd.TxnChecked++
progress(pd)
}
}
// all oids from data were checked to be in index
// now verify that there is no extra oids in index
if wholeData && len(oidChecked) != index.Len() {
// !nil as nil means index.Len=0 and len(oidChecked) < index.Len here
e, _ := index.SeekFirst()
defer e.Close()
for {
oid, pos, errStop := e.Next()
if errStop != nil {
break
}
if _, ok := oidChecked[oid]; !ok {
return oidChecked, indexCorrupt(r, "oid %v @%v: present in index but not in data", oid, pos)
}
}
}
return oidChecked, nil
}
// VerifyForFile checks index correctness against FileStorage data in file @ path
//
// See Verify for semantic description.
func (index *Index) VerifyForFile(ctx context.Context, path string, ntxn int, progress func(*IndexVerifyProgress)) (oidChecked map[zodb.Oid]struct{}, err error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer func() {
err2 := f.Close()
err = xerr.First(err, err2)
}()
fi, err := f.Stat()
if err != nil {
return nil, err
}
topPos := fi.Size() // XXX there might be last TxnInprogress transaction
if index.TopPos != topPos {
return nil, indexCorrupt(f, "topPos mismatch: data=%v index=%v", topPos, index.TopPos)
}
// use IO optimized for sequential access when verifying index
fSeq := seqReadAt(f)
return index.Verify(ctx, fSeq, ntxn, progress)
}
......@@ -22,6 +22,7 @@ package fs1
//go:generate ./py/gen-testdata
import (
"context"
"fmt"
"io/ioutil"
"log"
......@@ -218,6 +219,30 @@ func TestIndexSaveToPy(t *testing.T) {
}
}
func TestIndexBuildVerify(t *testing.T) {
index, err := BuildIndexForFile(context.Background(), "testdata/1.fs", nil)
if err != nil {
t.Fatalf("index build: %v", err)
}
if !index.Equal(_1fs_index) {
t.Fatal("computed index differ from expected")
}
_, err = index.VerifyForFile(context.Background(), "testdata/1.fs", -1, nil)
if err != nil {
t.Fatalf("index verify: %v", err)
}
pos0, _ := index.Get(0)
index.Set(0, pos0+1)
_, err = index.VerifyForFile(context.Background(), "testdata/1.fs", -1, nil)
if err == nil {
t.Fatalf("index verify: expected error after tweak")
}
}
func BenchmarkIndexLoad(b *testing.B) {
// FIXME small testdata/1.fs is not representative for benchmarks
for i := 0; i < b.N; i++ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment