Commit e28f2b01 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 083aed66
......@@ -444,45 +444,51 @@ func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb.
// --- watcher ---
func (fs *FileStorage) watcher() {
err := fs.watch()
// watcher watches updates to .file and notifies Watch about new transactions.
//
// watcher is the only place that mutates index and txnh{Min,Max}.
// XXX ^^^ will change after commit is implemented.
func (fs *FileStorage) watcher(w *fsnotify.Watcher) {
defer w.Close() // XXX lclose
err := fs._watcher(w)
// XXX fs.watchErr = err (-> fail other operations)
_ = err
}
// watch watches updates to .file and notifies Watch about new transactions.
//
// watch is the only place that mutates index and txnh{Min,Max}.
// XXX ^^^ will change after commit is implemented.
func (fs *FileStorage) watch() (err error) {
func (fs *FileStorage) _watcher(w *fsnotify.Watcher) (err error) {
f := fs.file
idx := fs.index
defer xerr.Contextf(&err, "%s: watcher", f.Name())
// XXX race -> start watching before FileStorage is returned to user
// setup watcher to observe changes on f
w, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer w.Close() // XXX lclose
err = w.Add(f.Name())
if err != nil {
return err
}
// loop checking f.size vs topPos vs posLastChecked (XXX)
// loop checking f.size vs topPos
//
// besides relying on notify we also check file periodically to avoid
// stalls due to e.g. OS notification errors.
tick := time.NewTicker(1*time.Second)
defer tick.Stop()
var t0partial time.Time
first := true
mainloop:
for {
// XXX select here
if !first {
select {
// XXX handle close
case err := <-w.Errors:
if err != fsnotify.ErrEventOverflow {
return err
}
// events lost, but it is safe since we are always rechecking file size
case <-w.Events:
// we got some kind of "file was modified" event (e.g.
// write, truncate, chown ...) -> it is time to check the file again.
case <-tick.C:
// recheck the file periodically.
}
}
first = false
// check f size, to see whether there could be any updates.
fi, err := f.Stat()
......@@ -580,28 +586,6 @@ mainloop:
// XXX cancel on close
fs.watchq <- watchEvent{it.Txnh.Tid, oidv}
}
select {
// XXX handle close
case err := <-w.Errors:
if err == fsnotify.ErrEventOverflow {
// events lost, but it is safe since we are always rechecking file size
continue
}
// shutdown
return err
case <-w.Events:
// we got some kind of "file was modified" event (e.g.
// write, truncate, chown ...) -> it is time to check the file again.
continue
case <-tick.C:
// recheck the file periodically.
continue
}
}
}
......@@ -725,7 +709,24 @@ func Open(ctx context.Context, path string) (_ *FileStorage, err error) {
// there might be simultaneous updates to the data file from outside.
// launch the watcher who will observe them.
go fs.watcher()
//
// the filesystem watcher is setup before fs returned to user to avoid
// race of missing early file writes.
w, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
defer func() {
if err != nil {
w.Close() // XXX lclose
}
}()
err = w.Add(f.Name())
if err != nil {
return nil, err
}
go fs.watcher(w)
return fs, nil
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment