Commit 24faff13 authored by Kirill Smelkov's avatar Kirill Smelkov

X fs.watcher started to draftly work

parent e28f2b01
......@@ -453,6 +453,7 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher) {
err := fs._watcher(w)
// XXX fs.watchErr = err (-> fail other operations)
_ = err
log.Print(err)
}
func (fs *FileStorage) _watcher(w *fsnotify.Watcher) (err error) {
......@@ -471,21 +472,25 @@ func (fs *FileStorage) _watcher(w *fsnotify.Watcher) (err error) {
mainloop:
for {
if !first {
tracef("select ...")
select {
// XXX handle close
case err := <-w.Errors:
tracef("error: %s", err)
if err != fsnotify.ErrEventOverflow {
return err
}
// events lost, but it is safe since we are always rechecking file size
case <-w.Events:
case e := <-w.Events:
// we got some kind of "file was modified" event (e.g.
// write, truncate, chown ...) -> it is time to check the file again.
tracef("event: %s", e)
case <-tick.C:
// recheck the file periodically.
tracef("tick")
}
}
first = false
......@@ -496,6 +501,7 @@ mainloop:
return err
}
fsize := fi.Size()
tracef("toppos: %d\tfsize: %d\n", idx.TopPos, fsize)
switch {
case fsize == idx.TopPos:
continue // same as before
......@@ -506,6 +512,7 @@ mainloop:
// there is some data after toppos - try to advance as much as we can.
// start iterating afresh with empty buffer.
tracef("scanning ...")
it := Iterate(seqReadAt(f), idx.TopPos, IterForward)
for {
err = it.NextTxn(LoadNoStrings)
......@@ -543,6 +550,8 @@ mainloop:
// read ok - reset t₀(partial)
t0partial = time.Time{}
tracef("@%d tid=%s st=%q", it.Txnh.Pos, it.Txnh.Tid, it.Txnh.Status)
// XXX dup wrt Index.Update
// we could successfully read the transaction header. Try to see now,
......@@ -583,8 +592,11 @@ mainloop:
}
fs.mu.Unlock()
tracef("-> tid=%s δoidv=%v", it.Txnh.Tid, oidv) // XXX oidv=[0,0] - recheck
// XXX cancel on close
fs.watchq <- watchEvent{it.Txnh.Tid, oidv}
//tracef("zzz")
}
}
}
......@@ -599,11 +611,14 @@ func (fs *FileStorage) Watch(ctx context.Context) (_ zodb.Tid, _ []zodb.Oid, err
defer xerr.Contextf(&err, "%s: watch", fs.file.Name())
// XXX handle close
//tracef("watch -> select ...")
select {
case <-ctx.Done():
//tracef("\t-> canceled")
return zodb.InvalidTid, nil, ctx.Err()
case w := <-fs.watchq:
//tracef("\t-> data")
return w.tid, w.oidv, nil
}
}
......@@ -627,7 +642,9 @@ func (fs *FileStorage) Close() error {
//
// TODO read-write support
func Open(ctx context.Context, path string) (_ *FileStorage, err error) {
fs := &FileStorage{}
fs := &FileStorage{
watchq: make(chan watchEvent),
}
f, err := os.Open(path)
if err != nil {
......
......@@ -35,6 +35,8 @@ import (
"lab.nexedi.com/kirr/go123/xerr"
)
import "log"
// one database transaction record
type dbEntry struct {
Header TxnHeader
......@@ -346,6 +348,15 @@ func BenchmarkIterate(b *testing.B) {
b.StopTimer()
}
var tracef = func(format string, argv ...interface{}) {
log.Printf("W " + format, argv...)
}
func init() {
log.SetFlags(log.Lmicroseconds)
}
func TestWatch(t *testing.T) {
//xtesting.NeedPy(t, "zodbtools")
......@@ -394,6 +405,8 @@ func TestWatch(t *testing.T) {
}
xcommit := func(at zodb.Tid, objv ...Object) zodb.Tid {
tracef("\n\n-> xcommit %s", at)
defer tracef("<- xcommit")
t.Helper()
tid, err := zcommit(at, objv...)
if err != nil {
......@@ -421,6 +434,9 @@ func TestWatch(t *testing.T) {
checkLastTid(at)
//time.Sleep(3*time.Second)
//tracef("AAA")
// commit -> check watcher observes what we committed.
for i := zodb.Oid(0); i < 1000; i++ {
at = xcommit(at,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment