Commit 4895b218 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 22a4acd4
...@@ -89,7 +89,7 @@ import ( ...@@ -89,7 +89,7 @@ import (
type FileStorage struct { type FileStorage struct {
file *os.File file *os.File
// protects updates to index and to txnh{Min,Max} - in other words // mu protects updates to index and to txnh{Min,Max} - in other words
// to everything that depends on what current last transaction is. // to everything that depends on what current last transaction is.
// mu also protects downErr. // mu also protects downErr.
mu sync.RWMutex mu sync.RWMutex
...@@ -500,13 +500,19 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) { ...@@ -500,13 +500,19 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) {
} }
} }
const watchTrace = false
func traceWatch(format string, argv ...interface{}) {
if !watchTrace {
return
}
log.Printf(" fs1: watcher: " + format, argv...)
}
// _watcher serves watcher and returns either when fs is closed (ok), or when // _watcher serves watcher and returns either when fs is closed (ok), or when
// it hits any kind of non-recoverable error. // it hits any kind of non-recoverable error.
func (fs *FileStorage) _watcher(w *fsnotify.Watcher, errFirstRead chan<- error) (err error) { func (fs *FileStorage) _watcher(w *fsnotify.Watcher, errFirstRead chan<- error) (err error) {
tracef := func(format string, argv ...interface{}) { traceWatch(">>>")
log.Printf(" fs1: watcher: " + format, argv...)
}
tracef(">>>")
f := fs.file f := fs.file
idx := fs.index idx := fs.index
...@@ -529,28 +535,28 @@ func (fs *FileStorage) _watcher(w *fsnotify.Watcher, errFirstRead chan<- error) ...@@ -529,28 +535,28 @@ func (fs *FileStorage) _watcher(w *fsnotify.Watcher, errFirstRead chan<- error)
mainloop: mainloop:
for { for {
if !first { if !first {
tracef("select ...") traceWatch("select ...")
select { select {
case <-fs.down: case <-fs.down:
// closed // closed
tracef("down") traceWatch("down")
return nil return nil
case err := <-w.Errors: case err := <-w.Errors:
tracef("error") traceWatch("error: %s", err)
if err != fsnotify.ErrEventOverflow { if err != fsnotify.ErrEventOverflow {
return err return err
} }
// events lost, but it is safe since we are always rechecking file size // events lost, but it is safe since we are always rechecking file size
case <-w.Events: case ev := <-w.Events:
// we got some kind of "file was modified" event (e.g. // we got some kind of "file was modified" event (e.g.
// write, truncate, chown ...) -> it is time to check the file again. // write, truncate, chown ...) -> it is time to check the file again.
tracef("event") traceWatch("event: %s", ev)
case <-tick.C: case <-tick.C:
// recheck the file periodically. // recheck the file periodically.
tracef("tick") traceWatch("tick")
} }
// we will be advancing through the file as much as we can. // we will be advancing through the file as much as we can.
...@@ -579,7 +585,7 @@ mainloop: ...@@ -579,7 +585,7 @@ mainloop:
return err return err
} }
fsize := fi.Size() fsize := fi.Size()
tracef("toppos: %d\tfsize: %d\n", idx.TopPos, fsize) traceWatch("toppos: %d\tfsize: %d\n", idx.TopPos, fsize)
switch { switch {
case fsize == idx.TopPos: case fsize == idx.TopPos:
continue // same as before continue // same as before
...@@ -590,7 +596,7 @@ mainloop: ...@@ -590,7 +596,7 @@ mainloop:
// there is some data after toppos - try to advance as much as we can. // there is some data after toppos - try to advance as much as we can.
// start iterating afresh with new empty buffer. // start iterating afresh with new empty buffer.
tracef("scanning ...") traceWatch("scanning ...")
it := Iterate(seqReadAt(f), idx.TopPos, IterForward) it := Iterate(seqReadAt(f), idx.TopPos, IterForward)
for { for {
err = it.NextTxn(LoadNoStrings) err = it.NextTxn(LoadNoStrings)
...@@ -629,7 +635,7 @@ mainloop: ...@@ -629,7 +635,7 @@ mainloop:
// read ok - reset t₀(partial) // read ok - reset t₀(partial)
t0partial = time.Time{} t0partial = time.Time{}
tracef("@%d tid=%s st=%q", it.Txnh.Pos, it.Txnh.Tid, it.Txnh.Status) traceWatch("@%d tid=%s st=%q", it.Txnh.Pos, it.Txnh.Tid, it.Txnh.Status)
if errFirstRead != nil { if errFirstRead != nil {
errFirstRead <- nil // ok errFirstRead <- nil // ok
...@@ -676,7 +682,7 @@ mainloop: ...@@ -676,7 +682,7 @@ mainloop:
} }
fs.mu.Unlock() fs.mu.Unlock()
tracef("-> tid=%s δoidv=%v", it.Txnh.Tid, oidv) traceWatch("-> tid=%s δoidv=%v", it.Txnh.Tid, oidv)
// notify client // notify client
if fs.watchq != nil { if fs.watchq != nil {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment