Commit 7a036b15 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d8f33e1a
......@@ -46,6 +46,8 @@ type DriverOptions struct {
// it can get out of sync with the on-disk database file.
//
// The storage driver closes !nil Watchq when the driver is closed.
//
// TODO extend watchq to also receive errors from watcher.
Watchq chan<- CommitEvent
}
......
......@@ -565,12 +565,14 @@ mainloop:
for {
select {
case err := <-w.Errors:
traceWatch("drain: error: %s", err)
if err != fsnotify.ErrEventOverflow {
// unexpected error -> shutdown
return err
}
case <-w.Events:
case ev := <-w.Events:
traceWatch("drain: event: %s", ev)
default:
break drain
......@@ -642,8 +644,6 @@ mainloop:
errFirstRead = nil
}
// XXX dup wrt Index.Update
// we could successfully read the transaction header. Try to see now,
// whether it is finished transaction or not.
if it.Txnh.Status == zodb.TxnInprogress {
......@@ -653,9 +653,11 @@ mainloop:
}
// it is fully-committed transaction. Scan its data records to update
// our index & notify watchers. There is no expected errors here.
var oidv []zodb.Oid
update := map[zodb.Oid]int64{}
// our index & notify client watchers. There is no expected errors here.
//
// (keep in sync with Index.Update)
var δoid []zodb.Oid
δidx := map[zodb.Oid]int64{} // oid -> pos(data record)
for {
err = it.NextData()
if err != nil {
......@@ -666,14 +668,14 @@ mainloop:
break
}
update[it.Datah.Oid] = it.Datah.Pos
oidv = append(oidv, it.Datah.Oid)
δidx[it.Datah.Oid] = it.Datah.Pos
δoid = append(δoid, it.Datah.Oid)
}
// update index & txnh{Min,Max}
fs.mu.Lock()
idx.TopPos = it.Txnh.Pos + it.Txnh.Len
for oid, pos := range update {
for oid, pos := range δidx {
idx.Set(oid, pos)
}
fs.txnhMax.CloneFrom(&it.Txnh)
......@@ -682,7 +684,7 @@ mainloop:
}
fs.mu.Unlock()
traceWatch("-> tid=%s δoidv=%v", it.Txnh.Tid, oidv)
traceWatch("-> tid=%s δoidv=%v", it.Txnh.Tid, δoid)
// notify client
if fs.watchq != nil {
......@@ -690,7 +692,7 @@ mainloop:
case <-fs.down:
return nil
case fs.watchq <- zodb.CommitEvent{it.Txnh.Tid, oidv}:
case fs.watchq <- zodb.CommitEvent{it.Txnh.Tid, δoid}:
// ok
}
}
......
......@@ -30,8 +30,6 @@ import (
"reflect"
"testing"
"github.com/pkg/errors"
"lab.nexedi.com/kirr/neo/go/internal/xtesting"
"lab.nexedi.com/kirr/neo/go/zodb"
......@@ -356,7 +354,7 @@ func BenchmarkIterate(b *testing.B) {
b.StopTimer()
}
// TestWatch verifies that watcher can observes commits done from outside.
// TestWatch verifies that watcher can observe commits done from outside.
func TestWatch(t *testing.T) {
X := exc.Raiseif
......@@ -390,7 +388,10 @@ func TestWatch(t *testing.T) {
cmd:= exec.Command("python2", "-m", "zodbtools.zodb", "commit", tfs, at.String())
cmd.Stdin = zin
cmd.Stderr = os.Stderr
out, err := cmd.Output(); X(err)
out, err := cmd.Output()
if err != nil {
return zodb.InvalidTid, err
}
out = bytes.TrimSuffix(out, []byte("\n"))
tid, err := zodb.ParseTid(string(out))
......@@ -452,7 +453,8 @@ func TestWatch(t *testing.T) {
Object{0, data0},
Object{i, datai})
e := <-watchq // XXX err?
// TODO also test for watcher errors
e := <-watchq
if objvWant := []zodb.Oid{0, i}; !(e.Tid == at && reflect.DeepEqual(e.Changev, objvWant)) {
t.Fatalf("watch:\nhave: %s %s\nwant: %s %s", e.Tid, e.Changev, at, objvWant)
......@@ -471,13 +473,6 @@ func TestWatch(t *testing.T) {
if ok {
t.Fatalf("watch after close -> %v; want closed", e)
}
_ = errors.Cause(nil)
// XXX e.Err == ErrClosed
//_, _, err = fs.Watch(ctx)
//if e, eWant := errors.Cause(err), os.ErrClosed; e != eWant {
// t.Fatalf("watch after close -> %v; want: cause %v", err, eWant)
//}
}
// TestOpenRecovery verifies how Open handles data file with not-finished voted
......
......@@ -472,6 +472,8 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, pro
// do not update the index immediately so that in case of error
// in the middle of txn's data, index stays consistent and
// correct for topPos pointing to previous transaction.
//
// (keep in sync with FileStorage.watcher)
update := map[zodb.Oid]int64{}
for {
err = it.NextData()
......@@ -495,7 +497,7 @@ func (index *Index) Update(ctx context.Context, r io.ReaderAt, topPos int64, pro
// notify progress
if progress != nil {
pd.TxnIndexed++
progress(pd) // XXX + update
progress(pd)
}
}
......@@ -696,7 +698,9 @@ func (index *Index) VerifyForFile(ctx context.Context, path string, ntxn int, pr
return nil, err
}
topPos := fi.Size() // XXX there might be last TxnInprogress transaction XXX
// FIXME there might be last TxnInprogress transaction.
// TODO -> try to read txn header, and if it is ø or in-progress - that's ok.
topPos := fi.Size()
if index.TopPos != topPos {
return nil, indexCorrupt(f, "topPos mismatch: data=%v index=%v", topPos, index.TopPos)
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment