Commit db852511 authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb/fs1: Teach it to support notifications on database change

Following-up on 4d2c8b1d (go/zodb: Require drivers to provide
notifications for database change events) let's teach FileStorage to
support watching for database changes:

- add watcher who observes changes to data file via fsnotify.

- when we see that the file was updated - there is a tricky case to distinguish
  in-progress writes from just some garbage added at tail. See comments in
  _watcher for details.

- if the watcher sees there is indeed new transactions, the watcher
  updates index & txnh{Min,Max} and sends corresponding event to watchq.

- since index / txnh{Min,Max} can now be changed (by watcher) -> they
  are now protected by mu.

- consequently operations like LastTid, LastOid, Load, ... are all taught to
  observe index / txnh{Min,Max} state in read-snapshot way.

- add down & friends to indicate that the storage is no longer
  operational, if watcher sees there is a problem with data file.

- Open is reworked to start from loading/rebuilding index first, and then tail
  to watcher to detect whether what's after found topPos is garbage or another
  in-progress transaction. Consequently it is now possible to correctly open
  filestorage that is being currently written to and has in-progress transaction
  at tail.

The patch is a bit big, but the changes here are all tightly
interrelated.
parent 8eb0988e
...@@ -71,11 +71,15 @@ import ( ...@@ -71,11 +71,15 @@ import (
"log" "log"
"os" "os"
"sync" "sync"
"time"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
) )
// FileStorage is a ZODB storage which stores data in simple append-only file // FileStorage is a ZODB storage which stores data in simple append-only file
...@@ -84,15 +88,23 @@ import ( ...@@ -84,15 +88,23 @@ import (
// It is on-disk compatible with FileStorage from ZODB/py. // It is on-disk compatible with FileStorage from ZODB/py.
type FileStorage struct { type FileStorage struct {
file *os.File file *os.File
index *Index // oid -> data record position in transaction which last changed oid
// transaction headers for min/max transactions committed // mu protects updates to index and to txnh{Min,Max} - in other words
// (both with .Len=0 & .Tid=0 if database is empty) // to everything that depends on what current last transaction is.
txnhMin TxnHeader // mu also protects downErr.
txnhMax TxnHeader mu sync.RWMutex
index *Index // oid -> data record position in transaction which last changed oid
txnhMin TxnHeader // transaction headers for min/max transactions committed
txnhMax TxnHeader // (both with .Len=0 & .Tid=0 if database is empty)
downErr error // !nil when the storage is no longer operational
// driver client <- watcher: database commits. // driver client <- watcher: database commits.
watchq chan<- zodb.CommitEvent // FIXME stub watchq chan<- zodb.CommitEvent
down chan struct{} // ready when storage is no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher
errClose error // error from .file.Close()
watchWg sync.WaitGroup // to wait for watcher finish
} }
// IStorageDriver // IStorageDriver
...@@ -104,12 +116,24 @@ func (fs *FileStorage) zerr(op string, args interface{}, err error) *zodb.OpErro ...@@ -104,12 +116,24 @@ func (fs *FileStorage) zerr(op string, args interface{}, err error) *zodb.OpErro
} }
func (fs *FileStorage) LastTid(_ context.Context) (zodb.Tid, error) { func (fs *FileStorage) LastTid(_ context.Context) (zodb.Tid, error) {
// XXX must be under lock fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.downErr != nil {
return zodb.InvalidTid, fs.zerr("last_tid", nil, fs.downErr)
}
return fs.txnhMax.Tid, nil // txnhMax.Tid = 0, if empty return fs.txnhMax.Tid, nil // txnhMax.Tid = 0, if empty
} }
func (fs *FileStorage) LastOid(_ context.Context) (zodb.Oid, error) { func (fs *FileStorage) LastOid(_ context.Context) (zodb.Oid, error) {
// XXX must be under lock fs.mu.RLock()
defer fs.mu.RUnlock()
if fs.downErr != nil {
return zodb.InvalidOid, fs.zerr("last_oid", nil, fs.downErr)
}
lastOid, _ := fs.index.Last() // returns zero-value, if empty lastOid, _ := fs.index.Last() // returns zero-value, if empty
return lastOid, nil return lastOid, nil
} }
...@@ -148,7 +172,13 @@ func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (buf *mem.Buf, seri ...@@ -148,7 +172,13 @@ func (fs *FileStorage) Load(_ context.Context, xid zodb.Xid) (buf *mem.Buf, seri
func (fs *FileStorage) load(xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err error) { func (fs *FileStorage) load(xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err error) {
// lookup in index position of oid data record within latest transaction which changed this oid // lookup in index position of oid data record within latest transaction which changed this oid
fs.mu.RLock()
if err := fs.downErr; err != nil {
fs.mu.RUnlock()
return nil, 0, err
}
dataPos, ok := fs.index.Get(xid.Oid) dataPos, ok := fs.index.Get(xid.Oid)
fs.mu.RUnlock()
if !ok { if !ok {
return nil, 0, &zodb.NoObjectError{Oid: xid.Oid} return nil, 0, &zodb.NoObjectError{Oid: xid.Oid}
} }
...@@ -308,11 +338,18 @@ func (e *iterStartError) NextTxn(_ context.Context) (*zodb.TxnInfo, zodb.IDataIt ...@@ -308,11 +338,18 @@ func (e *iterStartError) NextTxn(_ context.Context) (*zodb.TxnInfo, zodb.IDataIt
// //
// if there is no such transaction returned error will be EOF. // if there is no such transaction returned error will be EOF.
func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, error) { func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, error) {
// XXX read snapshot under lock fs.mu.RLock()
// no longer operational
if err := fs.downErr; err != nil {
fs.mu.RUnlock()
return TxnHeader{}, err
}
// check for empty database // check for empty database
if fs.txnhMin.Len == 0 { if fs.txnhMin.Len == 0 {
// empty database - no such record // empty database - no such record
fs.mu.RUnlock()
return TxnHeader{}, io.EOF return TxnHeader{}, io.EOF
} }
...@@ -322,6 +359,8 @@ func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, er ...@@ -322,6 +359,8 @@ func (fs *FileStorage) findTxnRecord(r io.ReaderAt, tid zodb.Tid) (TxnHeader, er
tmin.CloneFrom(&fs.txnhMin) tmin.CloneFrom(&fs.txnhMin)
tmax.CloneFrom(&fs.txnhMax) tmax.CloneFrom(&fs.txnhMax)
fs.mu.RUnlock()
if tmax.Tid < tid { if tmax.Tid < tid {
return TxnHeader{}, io.EOF // no such record return TxnHeader{}, io.EOF // no such record
} }
...@@ -411,158 +450,402 @@ func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb. ...@@ -411,158 +450,402 @@ func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb.
return ziter return ziter
} }
// --- open + rebuild index --- // --- watcher ---
// open opens FileStorage without loading index // watcher watches updates to .file and notifies client about new transactions.
func open(path string) (_ *FileStorage, err error) { //
fs := &FileStorage{} // watcher is the only place that mutates index and txnh{Min,Max}.
// XXX ^^^ will change after commit is implemented.
f, err := os.Open(path) //
if err != nil { // if errFirstRead is !nil, the error of reading first transaction header is sent to it.
return nil, err func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) {
} defer fs.watchWg.Done()
fs.file = f defer w.Close() // XXX lclose
defer func() { err := fs._watcher(w, errFirstRead)
if err != nil { // it is ok if we got read error due to file being closed
f.Close() // XXX -> lclose if e, _ := errors.Cause(err).(*os.PathError); e != nil && (e.Err == os.ErrClosed ||
// XXX it can also be internal.poll.ErrFileClosing
e.Err.Error() == "use of closed file") {
select {
case <-fs.down:
err = nil
default:
} }
}() }
// check file magic
fh := FileHeader{}
err = fh.Load(f)
if err != nil { if err != nil {
return nil, err log.Print(err)
} }
// FIXME rework opening logic to support case when last txn was committed only partially // if watcher failed with e.g. IO error, we no longer know what is real
// last_tid and which objects were modified after it.
// -> storage operations have to fail from now on.
fs.shutdown(err)
// determine topPos from file size if fs.watchq != nil {
fi, err := f.Stat() close(fs.watchq)
if err != nil {
return nil, err
} }
topPos := fi.Size() }
// read tidMin/tidMax const watchTrace = false
err = fs.txnhMin.Load(f, txnValidFrom, LoadAll)
err = okEOF(err) // e.g. it is EOF when file is empty func traceWatch(format string, argv ...interface{}) {
if err != nil { if !watchTrace {
return nil, err return
} }
err = fs.txnhMax.Load(f, topPos, LoadAll) log.Printf(" fs1: watcher: " + format, argv...)
// expect EOF forward }
// FIXME ^^^ it will be no EOF if a txn was committed only partially
if err != io.EOF { // _watcher serves watcher and returns either when fs is closed (ok), or when
if err == nil { // it hits any kind of non-recoverable error.
err = fmt.Errorf("%s: no EOF after topPos", f.Name()) func (fs *FileStorage) _watcher(w *fsnotify.Watcher, errFirstRead chan<- error) (err error) {
traceWatch(">>>")
f := fs.file
idx := fs.index
defer xerr.Contextf(&err, "%s: watcher", f.Name())
defer func() {
if errFirstRead != nil {
errFirstRead <- err
errFirstRead = nil
} }
return nil, fmt.Errorf("%s: %s", f.Name(), err) }()
}
// .LenPrev must be good or EOF backward // loop checking f.size vs topPos
switch fs.txnhMax.LenPrev { //
case -1: // besides relying on fsnotify we also check file periodically to avoid
return nil, fmt.Errorf("%s: could not read LenPrev @%d (last transaction)", f.Name(), fs.txnhMax.Pos) // stalls due to e.g. OS notification errors.
case 0: tick := time.NewTicker(1*time.Second)
// ok - EOF backward defer tick.Stop()
var t0partial time.Time
first := true
mainloop:
for {
if !first {
traceWatch("select ...")
select {
case <-fs.down:
// closed
traceWatch("down")
return nil
case err := <-w.Errors:
traceWatch("error: %s", err)
if err != fsnotify.ErrEventOverflow {
return err
}
// events lost, but it is safe since we are always rechecking file size
case ev := <-w.Events:
// we got some kind of "file was modified" event (e.g.
// write, truncate, chown ...) -> it is time to check the file again.
traceWatch("event: %s", ev)
case <-tick.C:
// recheck the file periodically.
traceWatch("tick")
}
default: // we will be advancing through the file as much as we can.
// .LenPrev is ok - read last previous record // drain everything what is currently left in fs watcher queue.
err = fs.txnhMax.LoadPrev(f, LoadAll) drain:
for {
select {
case err := <-w.Errors:
traceWatch("drain: error: %s", err)
if err != fsnotify.ErrEventOverflow {
// unexpected error -> shutdown
return err
}
case ev := <-w.Events:
traceWatch("drain: event: %s", ev)
default:
break drain
}
}
}
first = false
// check f size, to see whether there could be any updates.
fi, err := f.Stat()
if err != nil { if err != nil {
return nil, err return err
}
fsize := fi.Size()
traceWatch("toppos: %d\tfsize: %d\n", idx.TopPos, fsize)
switch {
case fsize == idx.TopPos:
continue // same as before
case fsize < idx.TopPos:
// XXX add pack support?
return fmt.Errorf("file truncated (%d -> %d)", idx.TopPos, fsize)
}
// there is some data after toppos - try to advance as much as we can.
// start iterating afresh with new empty buffer.
traceWatch("scanning ...")
it := Iterate(seqReadAt(f), idx.TopPos, IterForward)
for {
err = it.NextTxn(LoadNoStrings)
if err != nil {
// transaction header could not be fully read.
//
// even though FileStorage code always calls write with full txn
// header, the kernel could do the write in parts, e.g. if written
// region overlaps page boundary.
//
// we check for some time to distinguish in-progress write from just
// trailing garbage.
if errors.Cause(err) == io.ErrUnexpectedEOF {
now := time.Now()
if t0partial.IsZero() {
t0partial = now
} else if now.Sub(t0partial) > 3*time.Second {
return err // garbage
}
} else {
// only EOF is ok - it can happen when transaction was aborted,
// or when we reach file end after scanning several txns.
if err != io.EOF {
// not ok - e.g. IO or consistency check error
return err
}
// EOF - reset t₀(partial)
t0partial = time.Time{}
}
// after any error (EOF, partial read) we have to resync
continue mainloop
}
// read ok - reset t₀(partial)
t0partial = time.Time{}
traceWatch("@%d tid=%s st=%q", it.Txnh.Pos, it.Txnh.Tid, it.Txnh.Status)
if errFirstRead != nil {
errFirstRead <- nil // ok
errFirstRead = nil
}
// we could successfully read the transaction header. Try to see now,
// whether it is finished transaction or not.
if it.Txnh.Status == zodb.TxnInprogress {
// not yet. We have to resync because transaction finish writes
// to what we have already buffered.
continue mainloop
}
// it is fully-committed transaction. Scan its data records to update
// our index & notify client watchers. There is no expected errors here.
//
// (keep in sync with Index.Update)
var δoid []zodb.Oid
δidx := map[zodb.Oid]int64{} // oid -> pos(data record)
for {
err = it.NextData()
if err != nil {
err = okEOF(err)
if err != nil {
return err
}
break
}
δidx[it.Datah.Oid] = it.Datah.Pos
δoid = append(δoid, it.Datah.Oid)
}
// update index & txnh{Min,Max}
fs.mu.Lock()
idx.TopPos = it.Txnh.Pos + it.Txnh.Len
for oid, pos := range δidx {
idx.Set(oid, pos)
}
fs.txnhMax.CloneFrom(&it.Txnh)
if fs.txnhMin.Len == 0 { // was empty
fs.txnhMin.CloneFrom(&it.Txnh)
}
fs.mu.Unlock()
traceWatch("-> tid=%s δoidv=%v", it.Txnh.Tid, δoid)
// notify client
if fs.watchq != nil {
select {
case <-fs.down:
return nil
case fs.watchq <- zodb.CommitEvent{it.Txnh.Tid, δoid}:
// ok
}
}
} }
} }
}
return fs, nil // --- open + rebuild index ---
// shutdown marks storage as no longer operational with specified reason.
//
// only the first call takes the effect.
func (fs *FileStorage) shutdown(reason error) {
fs.downOnce.Do(func() {
fs.errClose = fs.file.Close()
close(fs.down)
fs.mu.Lock()
defer fs.mu.Unlock()
fs.downErr = fmt.Errorf("not operational due: %s", reason)
})
}
func (fs *FileStorage) Close() error {
fs.shutdown(fmt.Errorf("closed"))
fs.watchWg.Wait()
if fs.errClose != nil {
return fs.zerr("close", nil, fs.errClose)
}
// TODO if opened !ro -> .saveIndex()
return nil
} }
// Open opens FileStorage @path. // Open opens FileStorage @path.
// func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileStorage, err error) {
// TODO read-write support // TODO read-write support
func Open(ctx context.Context, path string) (_ *FileStorage, err error) { if !opt.ReadOnly {
// open data file return nil, fmt.Errorf("fs1: %s: TODO write mode not implemented", path)
fs, err := open(path) }
fs := &FileStorage{
watchq: opt.Watchq,
down: make(chan struct{}),
}
f, err := os.Open(path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
fs.file = f
defer func() { defer func() {
if err != nil { if err != nil {
fs.file.Close() // XXX lclose fs.shutdown(err)
} }
}() }()
// load-verify / rebuild index // XXX wrap err with "open <path>" ?
err = fs.loadIndex(ctx)
// check file magic
fh := FileHeader{}
err = fh.Load(f)
if err != nil { if err != nil {
log.Print(err) return nil, err
log.Printf("%s: index recompute...", path) }
fs.index, err = fs.computeIndex(ctx)
// load index
fseq := seqReadAt(f)
index, err := LoadIndexFile(f.Name() + ".index")
if err == nil {
// index exists & loaded ok - quickly verify its sanity for last 100 transactions
_, err = index.Verify(ctx, fseq, 100, nil/*no progress*/)
if err != nil { if err != nil {
return nil, err index = nil // not sane - we will rebuild
} }
}
// TODO if opened !ro -> .saveIndex() if err != nil {
// index either did not exist, or corrupt or IO error - rebuild it from scratch
log.Print(err)
log.Printf("%s: index rebuild...", path)
index, err = BuildIndex(ctx, fseq, nil/*no progress; XXX log it? */)
} else {
// index loaded. In particular this gives us index.TopPos that is, possibly
// outdated, valid position for start of a transaction in the data file.
// Update the index starting from that till latest transaction.
err = index.Update(ctx, fseq, -1, nil/*no progress; XXX log it? */)
} }
return fs, nil // it can be either garbage or in-progress transaction.
} // defer to watcher to clarify this for us.
checkTailGarbage := false
func (fs *FileStorage) Close() error { if errors.Cause(err) == io.ErrUnexpectedEOF {
err := fs.file.Close() err = nil
if fs.watchq != nil { checkTailGarbage = true
close(fs.watchq)
} }
if err != nil { if err != nil {
return fs.zerr("close", nil, err) return nil, err
} }
// TODO if opened !ro -> .saveIndex() fs.index = index
return nil
}
func (fs *FileStorage) computeIndex(ctx context.Context) (index *Index, err error) { // now we have the index covering till last transaction in data file.
// XXX lock? // fill-in min/max txnh
return BuildIndex(ctx, seqReadAt(fs.file), nil/*no progress; XXX somehow log it? */) if index.TopPos > txnValidFrom {
} err = fs.txnhMin.Load(f, txnValidFrom, LoadAll)
err = noEOF(err)
if err != nil {
return nil, err
}
// loadIndex loads on-disk index to RAM and verifies it against data lightly _ = fs.txnhMax.Load(f, index.TopPos, LoadAll)
func (fs *FileStorage) loadIndex(ctx context.Context) (err error) { // NOTE it will be EOF on stable storage, but it might be non-EOF
// XXX lock? // if a txn-in-progress was committed only partially. We care only
defer xerr.Contextf(&err, "%s", fs.file.Name()) // that we read .LenPrev ok.
switch fs.txnhMax.LenPrev {
case -1:
return nil, fmt.Errorf("%s: could not read LenPrev @%d (last transaction)", f.Name(), fs.txnhMax.Pos)
case 0:
return nil, fmt.Errorf("%s: could not read LenPrev @%d (last transaction): unexpected EOF backward", f.Name(), fs.txnhMax.Pos)
default:
// .LenPrev is ok - read last previous record
err = fs.txnhMax.LoadPrev(f, LoadAll)
if err != nil {
return nil, err
}
}
}
index, err := LoadIndexFile(fs.file.Name() + ".index") // there might be simultaneous updates to the data file from outside.
// launch the watcher who will observe them.
//
// the filesystem watcher is setup before fs returned to user to avoid
// race of missing early file writes.
w, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
return err return nil, err
} }
err = w.Add(f.Name())
topPos := fs.txnhMax.Pos + fs.txnhMax.Len if err != nil {
if index.TopPos != topPos { w.Close() // XXX lclose
return fmt.Errorf("inconsistent index topPos: data=%d index=%d", topPos, index.TopPos) return nil, err
} }
// quickly verify index sanity for last 100 transactions var errFirstRead chan error
_, err = index.Verify(ctx, seqReadAt(fs.file), 100, nil/*no progress*/) if checkTailGarbage {
if err != nil { defer xerr.Contextf(&err, "open %s: checking whether it is garbage @%d", path, index.TopPos)
return err errFirstRead = make(chan error, 1)
} }
fs.index = index fs.watchWg.Add(1)
return nil go fs.watcher(w, errFirstRead)
}
// saveIndex flushes in-RAM index to disk if checkTailGarbage {
func (fs *FileStorage) saveIndex() (err error) { select {
// XXX lock? case <-ctx.Done():
defer xerr.Contextf(&err, "%s", fs.file.Name()) return nil, ctx.Err()
err = fs.index.SaveFile(fs.file.Name() + ".index") case err = <-errFirstRead:
if err != nil { if err != nil {
return err return nil, err // it was garbage
}
}
} }
// XXX fsync here? return fs, nil
return nil
} }
// Copyright (C) 2017 Nexedi SA and Contributors. // Copyright (C) 2017-2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your // it under the terms of the GNU General Public License version 3, or (at your
...@@ -20,15 +20,21 @@ ...@@ -20,15 +20,21 @@
package fs1 package fs1
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"os"
"os/exec"
"reflect" "reflect"
"testing" "testing"
"lab.nexedi.com/kirr/neo/go/internal/xtesting"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/go123/exc" "lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/xerr"
) )
// one database transaction record // one database transaction record
...@@ -109,7 +115,13 @@ func checkLoad(t *testing.T, fs *FileStorage, xid zodb.Xid, expect objState) { ...@@ -109,7 +115,13 @@ func checkLoad(t *testing.T, fs *FileStorage, xid zodb.Xid, expect objState) {
} }
func xfsopen(t testing.TB, path string) *FileStorage { func xfsopen(t testing.TB, path string) *FileStorage {
fs, err := Open(context.Background(), path) t.Helper()
return xfsopenopt(t, path, &zodb.DriverOptions{ReadOnly: true})
}
func xfsopenopt(t testing.TB, path string, opt *zodb.DriverOptions) *FileStorage {
t.Helper()
fs, err := Open(context.Background(), path, opt)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -117,7 +129,7 @@ func xfsopen(t testing.TB, path string) *FileStorage { ...@@ -117,7 +129,7 @@ func xfsopen(t testing.TB, path string) *FileStorage {
} }
func TestLoad(t *testing.T) { func TestLoad(t *testing.T) {
fs := xfsopen(t, "testdata/1.fs") // TODO open read-only fs := xfsopen(t, "testdata/1.fs")
defer exc.XRun(fs.Close) defer exc.XRun(fs.Close)
// current knowledge of what was "before" for an oid as we scan over // current knowledge of what was "before" for an oid as we scan over
...@@ -266,7 +278,7 @@ func testIterate(t *testing.T, fs *FileStorage, tidMin, tidMax zodb.Tid, expectv ...@@ -266,7 +278,7 @@ func testIterate(t *testing.T, fs *FileStorage, tidMin, tidMax zodb.Tid, expectv
} }
func TestIterate(t *testing.T) { func TestIterate(t *testing.T) {
fs := xfsopen(t, "testdata/1.fs") // TODO open ro fs := xfsopen(t, "testdata/1.fs")
defer exc.XRun(fs.Close) defer exc.XRun(fs.Close)
// all []tids in test database // all []tids in test database
...@@ -302,7 +314,7 @@ func TestIterate(t *testing.T) { ...@@ -302,7 +314,7 @@ func TestIterate(t *testing.T) {
} }
func BenchmarkIterate(b *testing.B) { func BenchmarkIterate(b *testing.B) {
fs := xfsopen(b, "testdata/1.fs") // TODO open ro fs := xfsopen(b, "testdata/1.fs")
defer exc.XRun(fs.Close) defer exc.XRun(fs.Close)
ctx := context.Background() ctx := context.Background()
...@@ -341,3 +353,188 @@ func BenchmarkIterate(b *testing.B) { ...@@ -341,3 +353,188 @@ func BenchmarkIterate(b *testing.B) {
b.StopTimer() b.StopTimer()
} }
// TestWatch verifies that watcher can observe commits done from outside.
func TestWatch(t *testing.T) {
X := exc.Raiseif
xtesting.NeedPy(t, "zodbtools")
workdir := xworkdir(t)
tfs := workdir + "/t.fs"
// Object represents object state to be committed.
type Object struct {
oid zodb.Oid
data string
}
// zcommit commits new transaction into tfs with data specified by objv.
zcommit := func(at zodb.Tid, objv ...Object) (_ zodb.Tid, err error) {
defer xerr.Contextf(&err, "zcommit @%s", at)
// prepare text input for `zodb commit`
zin := &bytes.Buffer{}
fmt.Fprintf(zin, "user %q\n", "author")
fmt.Fprintf(zin, "description %q\n", fmt.Sprintf("test commit; at=%s", at))
fmt.Fprintf(zin, "extension %q\n", "")
for _, obj := range objv {
fmt.Fprintf(zin, "obj %s %d null:00\n", obj.oid, len(obj.data))
zin.WriteString(obj.data)
zin.WriteString("\n")
}
zin.WriteString("\n")
// run py `zodb commit`
cmd:= exec.Command("python2", "-m", "zodbtools.zodb", "commit", tfs, at.String())
cmd.Stdin = zin
cmd.Stderr = os.Stderr
out, err := cmd.Output()
if err != nil {
return zodb.InvalidTid, err
}
out = bytes.TrimSuffix(out, []byte("\n"))
tid, err := zodb.ParseTid(string(out))
if err != nil {
return zodb.InvalidTid, fmt.Errorf("committed, but invalid output: %s", err)
}
return tid, nil
}
xcommit := func(at zodb.Tid, objv ...Object) zodb.Tid {
t.Helper()
tid, err := zcommit(at, objv...); X(err)
return tid
}
// force tfs creation & open tfs at go side
at := xcommit(0, Object{0, "data0"})
watchq := make(chan zodb.CommitEvent)
fs := xfsopenopt(t, tfs, &zodb.DriverOptions{ReadOnly: true, Watchq: watchq})
ctx := context.Background()
checkLastTid := func(lastOk zodb.Tid) {
t.Helper()
head, err := fs.LastTid(ctx); X(err)
if head != lastOk {
t.Fatalf("check last_tid: got %s; want %s", head, lastOk)
}
}
checkLastTid(at)
checkLoad := func(at zodb.Tid, oid zodb.Oid, dataOk string, serialOk zodb.Tid) {
t.Helper()
xid := zodb.Xid{at, oid}
buf, serial, err := fs.Load(ctx, xid); X(err)
data := string(buf.XData())
if !(data == dataOk && serial == serialOk) {
t.Fatalf("check load %s:\nhave: %q %s\nwant: %q %s",
xid, data, serial, dataOk, serialOk)
}
}
// commit -> check watcher observes what we committed.
//
// XXX python `import pkg_resources` takes ~ 300ms.
// https://github.com/pypa/setuptools/issues/510
//
// Since pkg_resources are used everywhere (e.g. in zodburi to find all
// uri resolvers) this import slowness becomes the major part of time to
// run py `zodb commit`.
//
// if one day it is either fixed, or worked around, we could ↑ 10 to 100.
for i := zodb.Oid(1); i <= 10; i++ {
data0 := fmt.Sprintf("data0.%d", i)
datai := fmt.Sprintf("data%d", i)
at = xcommit(at,
Object{0, data0},
Object{i, datai})
// TODO also test for watcher errors
e := <-watchq
if objvWant := []zodb.Oid{0, i}; !(e.Tid == at && reflect.DeepEqual(e.Changev, objvWant)) {
t.Fatalf("watch:\nhave: %s %s\nwant: %s %s", e.Tid, e.Changev, at, objvWant)
}
checkLastTid(at)
// make sure we can load what was committed.
checkLoad(at, 0, data0, at)
checkLoad(at, i, datai, at)
}
err := fs.Close(); X(err)
e, ok := <-watchq
if ok {
t.Fatalf("watch after close -> %v; want closed", e)
}
}
// TestOpenRecovery verifies how Open handles data file with not-finished voted
// transaction in the end.
func TestOpenRecovery(t *testing.T) {
X := exc.Raiseif
main, err := ioutil.ReadFile("testdata/1.fs"); X(err)
index, err := ioutil.ReadFile("testdata/1.fs.index"); X(err)
lastTidOk := _1fs_dbEntryv[len(_1fs_dbEntryv)-1].Header.Tid
topPos := int64(_1fs_indexTopPos)
voteTail, err := ioutil.ReadFile("testdata/1voted.tail"); X(err)
workdir := xworkdir(t)
ctx := context.Background()
// checkL runs f on main + voteTail[:l] . Two cases are verified:
// 1) when index is already present, and
// 2) when initially there is no index.
checkL := func(t *testing.T, l int, f func(t *testing.T, tfs string)) {
tfs := fmt.Sprintf("%s/1+vote%d.fs", workdir, l)
t.Run(fmt.Sprintf("oldindex=n/tail=+vote%d", l), func(t *testing.T) {
err := ioutil.WriteFile(tfs, append(main, voteTail[:l]...), 0600); X(err)
f(t, tfs)
})
t.Run(fmt.Sprintf("oldindex=y/tail=+vote%d", l), func(t *testing.T) {
err := ioutil.WriteFile(tfs+".index", index, 0600); X(err)
f(t, tfs)
})
}
// if txn header can be fully read - it should be all ok
lok := []int{0}
for l := len(voteTail); l >= TxnHeaderFixSize; l-- {
lok = append(lok, l)
}
for _, l := range lok {
checkL(t, l, func(t *testing.T, tfs string) {
fs := xfsopen(t, tfs)
defer func() {
err = fs.Close(); X(err)
}()
head, err := fs.LastTid(ctx); X(err)
if head != lastTidOk {
t.Fatalf("last_tid: %s ; expected: %s", head, lastTidOk)
}
})
}
// if txn header is stably incomplete - open should fail
// XXX better check 0..sizeof(txnh)-1 but in this range each check is slow.
for _, l := range []int{TxnHeaderFixSize-1,1} {
checkL(t, l, func(t *testing.T, tfs string) {
_, err := Open(ctx, tfs, &zodb.DriverOptions{ReadOnly: true})
estr := ""
if err != nil {
estr = err.Error()
}
ewant := fmt.Sprintf("open %s: checking whether it is garbage @%d: %s",
tfs, topPos, &RecordError{tfs, "transaction record", topPos, "read", io.ErrUnexpectedEOF})
if estr != ewant {
t.Fatalf("unexpected error:\nhave: %q\nwant: %q", estr, ewant)
}
})
}
}
// Copyright (C) 2017-2018 Nexedi SA and Contributors. // Copyright (C) 2017-2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -152,7 +152,7 @@ func (fh *FileHeader) Load(r io.ReaderAt) error { ...@@ -152,7 +152,7 @@ func (fh *FileHeader) Load(r io.ReaderAt) error {
_, err := r.ReadAt(fh.Magic[:], 0) _, err := r.ReadAt(fh.Magic[:], 0)
err = okEOF(err) err = okEOF(err)
if err != nil { if err != nil {
return err return fmt.Errorf("%sread magic: %s", ioprefix(r), err)
} }
if string(fh.Magic[:]) != Magic { if string(fh.Magic[:]) != Magic {
return fmt.Errorf("%sinvalid fs1 magic %q", ioprefix(r), fh.Magic) return fmt.Errorf("%sinvalid fs1 magic %q", ioprefix(r), fh.Magic)
......
// Copyright (C) 2017-2018 Nexedi SA and Contributors. // Copyright (C) 2017-2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -453,7 +453,10 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, pro ...@@ -453,7 +453,10 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, pro
return err return err
} }
// XXX check txnh.Status != TxnInprogress // this transaction was only voted, not fully committed yet.
if it.Txnh.Status == zodb.TxnInprogress {
return nil
}
// check for topPos overlapping txn & whether we are done. // check for topPos overlapping txn & whether we are done.
// topPos=-1 will never match here // topPos=-1 will never match here
...@@ -469,7 +472,9 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, pro ...@@ -469,7 +472,9 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, pro
// do not update the index immediately so that in case of error // do not update the index immediately so that in case of error
// in the middle of txn's data, index stays consistent and // in the middle of txn's data, index stays consistent and
// correct for topPos pointing to previous transaction. // correct for topPos pointing to previous transaction.
update := map[zodb.Oid]int64{} // XXX malloc every time -> better reuse //
// (keep in sync with FileStorage.watcher)
update := map[zodb.Oid]int64{}
for { for {
err = it.NextData() err = it.NextData()
if err != nil { if err != nil {
...@@ -693,7 +698,9 @@ func (index *Index) VerifyForFile(ctx context.Context, path string, ntxn int, pr ...@@ -693,7 +698,9 @@ func (index *Index) VerifyForFile(ctx context.Context, path string, ntxn int, pr
return nil, err return nil, err
} }
topPos := fi.Size() // XXX there might be last TxnInprogress transaction // FIXME there might be last TxnInprogress transaction.
// TODO -> try to read txn header, and if it is ø or in-progress - that's ok.
topPos := fi.Size()
if index.TopPos != topPos { if index.TopPos != topPos {
return nil, indexCorrupt(f, "topPos mismatch: data=%v index=%v", topPos, index.TopPos) return nil, indexCorrupt(f, "topPos mismatch: data=%v index=%v", topPos, index.TopPos)
} }
......
#!/usr/bin/env python2 #!/usr/bin/env python2
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2017-2018 Nexedi SA and Contributors. # Copyright (C) 2017-2019 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
...@@ -21,7 +21,10 @@ ...@@ -21,7 +21,10 @@
"""generate reference fs1 database and index for tests""" """generate reference fs1 database and index for tests"""
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
from zodbtools.test.gen_testdata import gen_testdb from ZODB import DB
from zodbtools.test.gen_testdata import gen_testdb, precommit
from os import stat, remove
from shutil import copyfile
from golang.gcompat import qq from golang.gcompat import qq
import struct import struct
...@@ -133,6 +136,38 @@ def main(): ...@@ -133,6 +136,38 @@ def main():
emit("\t\t},") emit("\t\t},")
emit("\t},") emit("\t},")
emit("}") emit("}")
stor.close()
# prepare file with voted (not fully committed) tail
voted = "testdata/1voted.fs"
copyfile(outfs, voted)
vstor = FileStorage(voted)
vdb = DB(vstor)
vconn = vdb.open()
vroot = vconn.root()
vroot._p_activate() # to know its current serial
txn = precommit(u"author", u"description", {'aaa': 'bbb'})
vstor.tpc_begin(txn)
vstor.store(vroot._p_oid, vroot._p_serial, '000 data 000', '', txn)
vstor.tpc_vote(txn)
# NO tpc_finish here so that status remain 'c' (voted) instead of ' ' (committed)
st = stat(outfs)
l = st.st_size
vf = open(voted, 'rb')
vf.seek(l)
voted_tail = vf.read()
assert voted_tail[-1+8+8+1] == 'c' # voted, not finished (' ')
with open("testdata/1voted.tail", "wb") as vt:
vt.write(voted_tail)
remove(voted)
remove(voted+".index")
remove(voted+".tmp")
remove(voted+".lock")
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -22,8 +22,6 @@ package fs1 ...@@ -22,8 +22,6 @@ package fs1
import ( import (
"context" "context"
"fmt"
"log"
"net/url" "net/url"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
...@@ -34,21 +32,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (zodb.I ...@@ -34,21 +32,7 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (zodb.I
// XXX u.Path is not always raw path - recheck and fix // XXX u.Path is not always raw path - recheck and fix
path := u.Host + u.Path path := u.Host + u.Path
// XXX readonly stub fs, err := Open(ctx, path, opt)
// XXX place = ?
if !opt.ReadOnly {
return nil, fmt.Errorf("fs1: %s: TODO write mode not implemented", path)
}
// FIXME handle opt.Watchq
// for now we pretend as if the database is not changing.
if opt.Watchq != nil {
log.Print("fs1: FIXME: watchq support not implemented - there" +
"won't be notifications about database changes")
}
fs, err := Open(ctx, path)
fs.watchq = opt.Watchq
return fs, err return fs, err
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment