Commit 4cc79d6f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 65c9a383
......@@ -462,7 +462,7 @@ func (fs *FileStorage) watcher(w *fsnotify.Watcher) {
}
}
// XXX fs.watchErr = err (-> fail other operations)
// XXX fs.watchErr = err (-> fail other operations)?
if err != nil {
log.Print(err)
}
......@@ -479,7 +479,7 @@ func (fs *FileStorage) _watcher(w *fsnotify.Watcher) (err error) {
// loop checking f.size vs topPos
//
// besides relying on notify we also check file periodically to avoid
// besides relying on fsnotify we also check file periodically to avoid
// stalls due to e.g. OS notification errors.
tick := time.NewTicker(1*time.Second)
defer tick.Stop()
......@@ -488,14 +488,12 @@ func (fs *FileStorage) _watcher(w *fsnotify.Watcher) (err error) {
mainloop:
for {
if !first {
//tracef("select ...")
select {
case <-fs.down:
// closed
return nil
case err := <-w.Errors:
//tracef("error: %s", err)
if err != fsnotify.ErrEventOverflow {
return err
}
......@@ -504,12 +502,12 @@ mainloop:
case <-w.Events:
// we got some kind of "file was modified" event (e.g.
// write, truncate, chown ...) -> it is time to check the file again.
//tracef("event: %s", e)
case <-tick.C:
// recheck the file periodically.
//tracef("tick")
}
// XXX dequeue everything from w?
}
first = false
......@@ -519,7 +517,6 @@ mainloop:
return err
}
fsize := fi.Size()
//tracef("toppos: %d\tfsize: %d\n", idx.TopPos, fsize)
switch {
case fsize == idx.TopPos:
continue // same as before
......@@ -529,15 +526,14 @@ mainloop:
}
// there is some data after toppos - try to advance as much as we can.
// start iterating afresh with empty buffer.
//tracef("scanning ...")
// start iterating afresh with new empty buffer.
it := Iterate(seqReadAt(f), idx.TopPos, IterForward)
for {
err = it.NextTxn(LoadNoStrings)
if err != nil {
// transaction header could not be fully read.
//
// Even though FileStorage code always calls write with full txn
// even though FileStorage code always calls write with full txn
// header, the kernel could do the write in parts, e.g. if written
// region overlaps page boundary.
//
......@@ -568,19 +564,17 @@ mainloop:
// read ok - reset t₀(partial)
t0partial = time.Time{}
//tracef("@%d tid=%s st=%q", it.Txnh.Pos, it.Txnh.Tid, it.Txnh.Status)
// XXX dup wrt Index.Update
// we could successfully read the transaction header. Try to see now,
// whether it is finished transaction or not.
if it.Txnh.Status == zodb.TxnInprogress {
// not yet. we have to resync because transaction finish writes
// not yet. We have to resync because transaction finish writes
// to what we have already buffered.
continue mainloop
}
// it is fully-committed transaction. scan its data records to update
// it is fully-committed transaction. Scan its data records to update
// our index & notify watchers. There is no expected errors here.
var oidv []zodb.Oid
update := map[zodb.Oid]int64{}
......@@ -610,8 +604,7 @@ mainloop:
}
fs.mu.Unlock()
//tracef("-> tid=%s δoidv=%v", it.Txnh.Tid, oidv)
// notify client
if fs.watchq != nil {
select {
case <-fs.down:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment