Commit 48eabaaa authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent b1229367
...@@ -75,7 +75,9 @@ import ( ...@@ -75,7 +75,9 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/mem"
// "lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"github.com/fsnotify/fsnotify"
) )
// FileStorage is a ZODB storage which stores data in simple append-only file // FileStorage is a ZODB storage which stores data in simple append-only file
...@@ -437,8 +439,61 @@ func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb. ...@@ -437,8 +439,61 @@ func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb.
// --- watcher --- // --- watcher ---
func (fs *FileStorage) watch() { func (fs *FileStorage) watcher() {
// XXX err := fs.watch()
// XXX fs.watchErr = err (-> fail other operations)
_ = err
}
func (fs *FileStorage) watch() (err error) {
f := fs.file
defer xerr.Contextf(&err, "%s: watch", f.Name())
// setup watcher to changes on f
w, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer w.Close() // XXX lclose
err = w.Add(f.Name())
if err != nil {
return err
}
toppos := fs.index.TopPos
// loop checking f.size vs topPos vs posLastChecked (XXX)
for {
fi, err := f.Stat()
if err != nil {
return err
}
fsize := fi.Size()
if fsize > toppos {
// XXX
}
select {
// XXX handle quit
case err := <-w.Errors:
if err == fsnotify.ErrEventOverflow {
// events lost, but it is safe since we are always rechecking file size
continue
}
// shutdown
return err
case <-w.Events:
// we got some kind of "file was modified" event (e.g.
// write, truncate, chown ...) -> it is time to check the file again.
continue
// TODO + time.After(30s) to avoid stalls due to e.g. OS notification errors
}
}
} }
func (fs *FileStorage) Watch(ctx context.Context) (zodb.Tid, []zodb.Oid, error) { func (fs *FileStorage) Watch(ctx context.Context) (zodb.Tid, []zodb.Oid, error) {
...@@ -546,7 +601,7 @@ func Open(ctx context.Context, path string) (_ *FileStorage, err error) { ...@@ -546,7 +601,7 @@ func Open(ctx context.Context, path string) (_ *FileStorage, err error) {
// there might be simultaneous updates to the data file from outside. // there might be simultaneous updates to the data file from outside.
// launch the watcher who will observe them. // launch the watcher who will observe them.
//go fs.watcher() go fs.watcher()
return fs, nil return fs, nil
} }
...@@ -20,15 +20,19 @@ ...@@ -20,15 +20,19 @@
package fs1 package fs1
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"io" "io"
"os"
"os/exec"
"reflect" "reflect"
"testing" "testing"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/go123/exc" "lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/xerr"
) )
// one database transaction record // one database transaction record
...@@ -341,3 +345,103 @@ func BenchmarkIterate(b *testing.B) { ...@@ -341,3 +345,103 @@ func BenchmarkIterate(b *testing.B) {
b.StopTimer() b.StopTimer()
} }
func TestWatch(t *testing.T) {
//xtesting.NeedPy(t, "zodbtools")
needZODBPy(t)
workdir := xworkdir(t)
tfs := workdir + "/t.fs"
// Object represents object state to be committed.
type Object struct {
oid zodb.Oid
data string
}
// zcommit commits new transaction into tfs with data specified by objv.
zcommit := func(at zodb.Tid, objv ...Object) (_ zodb.Tid, err error) {
defer xerr.Contextf(&err, "zcommit @%s", at)
// prepare text input for `zodb commit`
zin := &bytes.Buffer{}
fmt.Fprintf(zin, "user %q\n", "author")
fmt.Fprintf(zin, "description %q\n", fmt.Sprintf("test commit; at=%s", at))
fmt.Fprintf(zin, "extension %q\n", "")
for _, obj := range objv {
fmt.Fprintf(zin, "obj %s %d null:00\n", obj.oid, len(obj.data))
zin.WriteString(obj.data)
zin.WriteString("\n")
}
zin.WriteString("\n")
// run py `zodb commit`
cmd:= exec.Command("python2", "-m", "zodbtools.zodb", "commit", tfs, at.String())
cmd.Stdin = zin
cmd.Stderr = os.Stderr
out, err := cmd.Output()
if err != nil {
return zodb.InvalidTid, err
}
out = bytes.TrimSuffix(out, []byte("\n"))
tid, err := zodb.ParseTid(string(out))
if err != nil {
return zodb.InvalidTid, fmt.Errorf("committed, but invalid output: %s", err)
}
return tid, nil
}
xcommit := func(at zodb.Tid, objv ...Object) zodb.Tid {
t.Helper()
tid, err := zcommit(at, objv...)
if err != nil {
t.Fatal(err)
}
return tid
}
// force tfs creation & open tfs at go side
at := xcommit(0, Object{0, "data0"})
fs := xfsopen(t, tfs)
ctx := context.Background()
checkLastTid := func(lastOk zodb.Tid) {
t.Helper()
head, err := fs.LastTid(ctx)
if err != nil {
t.Fatalf("check last_tid: %s", err)
}
if head != lastOk {
t.Fatalf("check last_tid: got %s; want %s", head, lastOk)
}
}
checkLastTid(at)
// commit -> check watcher observes what we committed.
for i := zodb.Oid(0); i < 1000; i++ {
at = xcommit(at,
Object{0, fmt.Sprintf("data0.%d", i)},
Object{i, fmt.Sprintf("data%d", i)})
tid, objv, err := fs.Watch(ctx)
if err != nil {
t.Fatal(err)
}
if objvWant := []zodb.Oid{0, i}; !(tid == at && reflect.DeepEqual(objv, objvWant)) {
t.Fatalf("watch:\nhave: %s %s\nwant: %s %s", tid, objv, at, objvWant)
}
}
err := fs.Close()
if err != nil {
t.Fatal(err)
}
_, _, err = fs.Watch(ctx)
// assert err = "closed"
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment