Commit 2206f438 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 42630e66
......@@ -109,7 +109,7 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
cache = NewCache(storDriver, 128 * 4*1024)
}
return &storage{
stor := &storage{
IStorageDriver: storDriver,
l1cache: cache,
......@@ -117,7 +117,10 @@ func OpenStorage(ctx context.Context, storageURL string, opt *OpenOptions) (ISto
watchReq: make(chan watchRequest),
watchTab: make(map[chan CommitEvent]struct{}),
}, nil
}
go stor.watcher() // XXX stop on close
return stor, nil
}
......
......@@ -503,6 +503,11 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) {
// _watcher serves watcher and returns either when fs is closed (ok), or when
// it hits any kind of non-recoverable error.
func (fs *FileStorage) _watcher(w *fsnotify.Watcher, errFirstRead chan<- error) (err error) {
tracef := func(format string, argv ...interface{}) {
log.Printf(" fs1: watcher: " + format, argv...)
}
tracef(">>>")
f := fs.file
idx := fs.index
defer xerr.Contextf(&err, "%s: watcher", f.Name())
......@@ -524,12 +529,15 @@ func (fs *FileStorage) _watcher(w *fsnotify.Watcher, errFirstRead chan<- error)
mainloop:
for {
if !first {
tracef("select ...")
select {
case <-fs.down:
// closed
tracef("down")
return nil
case err := <-w.Errors:
tracef("error")
if err != fsnotify.ErrEventOverflow {
return err
}
......@@ -538,9 +546,11 @@ mainloop:
case <-w.Events:
// we got some kind of "file was modified" event (e.g.
// write, truncate, chown ...) -> it is time to check the file again.
tracef("event")
case <-tick.C:
// recheck the file periodically.
tracef("tick")
}
// we will be advancing through the file as much as we can.
......@@ -569,6 +579,7 @@ mainloop:
return err
}
fsize := fi.Size()
tracef("toppos: %d\tfsize: %d\n", idx.TopPos, fsize)
switch {
case fsize == idx.TopPos:
continue // same as before
......@@ -579,6 +590,7 @@ mainloop:
// there is some data after toppos - try to advance as much as we can.
// start iterating afresh with new empty buffer.
tracef("scanning ...")
it := Iterate(seqReadAt(f), idx.TopPos, IterForward)
for {
err = it.NextTxn(LoadNoStrings)
......@@ -617,6 +629,8 @@ mainloop:
// read ok - reset t₀(partial)
t0partial = time.Time{}
tracef("@%d tid=%s st=%q", it.Txnh.Pos, it.Txnh.Tid, it.Txnh.Status)
if errFirstRead != nil {
errFirstRead <- nil // ok
errFirstRead = nil
......@@ -662,6 +676,8 @@ mainloop:
}
fs.mu.Unlock()
tracef("-> tid=%s δoidv=%v", it.Txnh.Tid, oidv)
// notify client
if fs.watchq != nil {
select {
......@@ -712,6 +728,9 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
return nil, fmt.Errorf("fs1: %s: TODO write mode not implemented", path)
}
log.Print()
log.Printf("fs1 open, watchq: %v", opt.Watchq)
fs := &FileStorage{
watchq: opt.Watchq,
down: make(chan struct{}),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment