Commit 083aed66 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9003ac50
...@@ -101,7 +101,7 @@ type FileStorage struct { ...@@ -101,7 +101,7 @@ type FileStorage struct {
txnhMax TxnHeader txnhMax TxnHeader
// driver client <- watcher: data file updates. // driver client <- watcher: data file updates.
watchq chan interface{} // XXX watchq chan watchEvent
} }
// IStorageDriver // IStorageDriver
...@@ -450,14 +450,14 @@ func (fs *FileStorage) watcher() { ...@@ -450,14 +450,14 @@ func (fs *FileStorage) watcher() {
_ = err _ = err
} }
// watch watches updates to fs.file and notifies subscriber about new transaction via fs.watchq. // watch watches updates to .file and notifies Watch about new transactions.
// //
// watch is the only place that mutates index and txnh{Min,Max}. // watch is the only place that mutates index and txnh{Min,Max}.
// XXX ^^^ will change after commit is implemented. // XXX ^^^ will change after commit is implemented.
func (fs *FileStorage) watch() (err error) { func (fs *FileStorage) watch() (err error) {
f := fs.file f := fs.file
idx := fs.index idx := fs.index
defer xerr.Contextf(&err, "%s: watch", f.Name()) defer xerr.Contextf(&err, "%s: watcher", f.Name())
// XXX race -> start watching before FileStorage is returned to user // XXX race -> start watching before FileStorage is returned to user
...@@ -477,15 +477,10 @@ func (fs *FileStorage) watch() (err error) { ...@@ -477,15 +477,10 @@ func (fs *FileStorage) watch() (err error) {
// //
// besides relying on notify we also check file periodically to avoid // besides relying on notify we also check file periodically to avoid
// stalls due to e.g. OS notification errors. // stalls due to e.g. OS notification errors.
var it *Iter
itReset := func() { // reset it to (re)start from toppos with empty buffer
it = Iterate(seqReadAt(f), idx.TopPos, IterForward)
}
itReset()
tick := time.NewTicker(1*time.Second) tick := time.NewTicker(1*time.Second)
defer tick.Stop() defer tick.Stop()
var t0partial time.Time var t0partial time.Time
mainloop:
for { for {
// XXX select here // XXX select here
...@@ -503,90 +498,91 @@ func (fs *FileStorage) watch() (err error) { ...@@ -503,90 +498,91 @@ func (fs *FileStorage) watch() (err error) {
return fmt.Errorf("file truncated (%d -> %d)", idx.TopPos, fsize) return fmt.Errorf("file truncated (%d -> %d)", idx.TopPos, fsize)
} }
// there is some data after toppos - try to read it. // there is some data after toppos - try to advance as much as we can.
err = it.NextTxn(LoadNoStrings) // start iterating afresh with empty buffer.
if err != nil { it := Iterate(seqReadAt(f), idx.TopPos, IterForward)
// transaction header could not be fully read. for {
// err = it.NextTxn(LoadNoStrings)
// Even though FileStorage code always calls write with full txn if err != nil {
// header, the kernel could do the write in parts, e.g. if written // transaction header could not be fully read.
// region overlaps page boundary. //
// // Even though FileStorage code always calls write with full txn
// we check for some time to distinguish in-progress write from just // header, the kernel could do the write in parts, e.g. if written
// trailing garbage. // region overlaps page boundary.
if errors.Cause(err) == io.ErrUnexpectedEOF { //
now := time.Now() // we check for some time to distinguish in-progress write from just
if t0partial.IsZero() { // trailing garbage.
t0partial = now if errors.Cause(err) == io.ErrUnexpectedEOF {
} else if now.Sub(t0partial) > 3*time.Second { now := time.Now()
return err // garbage if t0partial.IsZero() {
} t0partial = now
} else { } else if now.Sub(t0partial) > 3*time.Second {
// not ok - e.g. IO error return err // garbage
// only EOF is ok - it is e.g. when transaction was aborted }
if err != io.EOF { } else {
return err // not ok - e.g. IO error
// only EOF is ok - it can happen when transaction was aborted
if err != io.EOF {
return err
}
// EOF - reset t₀(partial)
t0partial = time.Time{}
} }
// EOF - reset t₀(partial) // after any error (EOF, partial read) we have to resync
t0partial = time.Time{} continue mainloop
} }
// after any error (EOF, partial read) we have to retry with // read ok - reset t₀(partial)
// reset bufferring. t0partial = time.Time{}
itReset()
continue
}
// read ok - reset t₀(partial)
t0partial = time.Time{}
// XXX dup wrt Index.Update // XXX dup wrt Index.Update
// we could successfully read the transaction header. Try to see now, // we could successfully read the transaction header. Try to see now,
// whether it is finished transaction or not. // whether it is finished transaction or not.
if it.Txnh.Status == zodb.TxnInprogress { if it.Txnh.Status == zodb.TxnInprogress {
// not yet. we have to reset because transaction finish writes // not yet. we have to resync because transaction finish writes
// to what we have already buffered. // to what we have already buffered.
itReset() continue mainloop
continue }
}
// it is fully-committed transaction. scan its data records to update // it is fully-committed transaction. scan its data records to update
// our index & notify watchers. There is no expected errors here. // our index & notify watchers. There is no expected errors here.
var oidv []zodb.Oid var oidv []zodb.Oid
update := map[zodb.Oid]int64{} update := map[zodb.Oid]int64{}
for { for {
err = it.NextData() err = it.NextData()
if err != nil {
err = okEOF(err)
if err != nil { if err != nil {
return err err = okEOF(err)
if err != nil {
return err
}
break
} }
break
update[it.Datah.Oid] = it.Datah.Pos
oidv = append(oidv, it.Datah.Oid)
} }
update[it.Datah.Oid] = it.Datah.Pos // update index & txnh{MinMax}
oidv = append(oidv, it.Datah.Oid) fs.mu.Lock()
} idx.TopPos = it.Txnh.Pos + it.Txnh.Len
for oid, pos := range update {
idx.Set(oid, pos)
}
fs.txnhMax.CloneFrom(&it.Txnh)
if fs.txnhMin.Len == 0 { // was empty
fs.txnhMin.CloneFrom(&it.Txnh)
}
fs.mu.Unlock()
// update index & txnh{MinMax} // XXX cancel on close
fs.mu.Lock() fs.watchq <- watchEvent{it.Txnh.Tid, oidv}
idx.TopPos = it.Txnh.Pos + it.Txnh.Len
for oid, pos := range update {
idx.Set(oid, pos)
}
fs.txnhMax.CloneFrom(&it.Txnh)
if fs.txnhMin.Len == 0 { // was empty
fs.txnhMin.CloneFrom(&it.Txnh)
} }
fs.mu.Unlock()
// XXX -> watchq
select { select {
// XXX handle quit // XXX handle close
case err := <-w.Errors: case err := <-w.Errors:
if err == fsnotify.ErrEventOverflow { if err == fsnotify.ErrEventOverflow {
...@@ -609,9 +605,23 @@ func (fs *FileStorage) watch() (err error) { ...@@ -609,9 +605,23 @@ func (fs *FileStorage) watch() (err error) {
} }
} }
func (fs *FileStorage) Watch(ctx context.Context) (zodb.Tid, []zodb.Oid, error) { // watchEvent is one event from watch to Watch
panic("TODO") type watchEvent struct {
// <-fs.watchq tid zodb.Tid
oidv []zodb.Oid
}
func (fs *FileStorage) Watch(ctx context.Context) (_ zodb.Tid, _ []zodb.Oid, err error) {
defer xerr.Contextf(&err, "%s: watch", fs.file.Name())
// XXX handle close
select {
case <-ctx.Done():
return zodb.InvalidTid, nil, ctx.Err()
case w := <-fs.watchq:
return w.tid, w.oidv, nil
}
} }
// --- open + rebuild index --- // --- open + rebuild index ---
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment