Commit 9003ac50 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5167f79c
...@@ -71,6 +71,7 @@ import ( ...@@ -71,6 +71,7 @@ import (
"log" "log"
"os" "os"
"sync" "sync"
"time"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
...@@ -78,6 +79,7 @@ import ( ...@@ -78,6 +79,7 @@ import (
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
) )
// FileStorage is a ZODB storage which stores data in simple append-only file // FileStorage is a ZODB storage which stores data in simple append-only file
...@@ -98,7 +100,8 @@ type FileStorage struct { ...@@ -98,7 +100,8 @@ type FileStorage struct {
txnhMin TxnHeader txnhMin TxnHeader
txnhMax TxnHeader txnhMax TxnHeader
watchq chan XXX // driver client <- watcher: data file updates.
watchq chan interface{} // XXX
} }
// IStorageDriver // IStorageDriver
...@@ -447,13 +450,18 @@ func (fs *FileStorage) watcher() { ...@@ -447,13 +450,18 @@ func (fs *FileStorage) watcher() {
_ = err _ = err
} }
// watch watches updates to fs.file and notifies subscriber about new transaction via fs.watchq.
//
// watch is the only place that mutates index and txnh{Min,Max}.
// XXX ^^^ will change after commit is implemented.
func (fs *FileStorage) watch() (err error) { func (fs *FileStorage) watch() (err error) {
f := fs.file f := fs.file
idx := fs.index
defer xerr.Contextf(&err, "%s: watch", f.Name()) defer xerr.Contextf(&err, "%s: watch", f.Name())
// XXX race -> start watching before FileStorage is returned to user // XXX race -> start watching before FileStorage is returned to user
// setup watcher to changes on f // setup watcher to observe changes on f
w, err := fsnotify.NewWatcher() w, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
return err return err
...@@ -469,34 +477,33 @@ func (fs *FileStorage) watch() (err error) { ...@@ -469,34 +477,33 @@ func (fs *FileStorage) watch() (err error) {
// //
// besides relying on notify we also check file periodically to avoid // besides relying on notify we also check file periodically to avoid
// stalls due to e.g. OS notification errors. // stalls due to e.g. OS notification errors.
toppos := fs.index.TopPos
var it *Iter var it *Iter
resetit := func() { // reset it to start from toppos with empty buffer itReset := func() { // reset it to (re)start from toppos with empty buffer
it = Iterate(seqReadAt(f), toppos, IterForward) it = Iterate(seqReadAt(f), idx.TopPos, IterForward)
} }
resetit() itReset()
tick := time.NewTicker(1*time.Second) tick := time.NewTicker(1*time.Second)
defer tick.Stop() defer tick.Stop()
var tpartWrite time.Time var t0partial time.Time
for { for {
// XXX select here // XXX select here
// check f size, to see whether there could be any updates.
fi, err := f.Stat() fi, err := f.Stat()
if err != nil { if err != nil {
return err return err
} }
fsize := fi.Size() fsize := fi.Size()
switch { switch {
case fsize == toppos: case fsize == idx.TopPos:
continue // same as before continue // same as before
case fsize < toppos: case fsize < idx.TopPos:
// XXX add pack support // XXX add pack support
return fmt.Errorf("file truncated (%d -> %d)", toppos, fsize) return fmt.Errorf("file truncated (%d -> %d)", idx.TopPos, fsize)
} }
// there is some data after toppos - try to read it. always create buffer // there is some data after toppos - try to read it.
// anew, since there could be written and then aborted transactions.
err = it.NextTxn(LoadNoStrings) err = it.NextTxn(LoadNoStrings)
if err != nil { if err != nil {
// transaction header could not be fully read. // transaction header could not be fully read.
...@@ -506,33 +513,34 @@ func (fs *FileStorage) watch() (err error) { ...@@ -506,33 +513,34 @@ func (fs *FileStorage) watch() (err error) {
// region overlaps page boundary. // region overlaps page boundary.
// //
// we check for some time to distinguish in-progress write from just // we check for some time to distinguish in-progress write from just
// garbage. // trailing garbage.
if errors.Cause(err) == io.ErrUnexpectedEOF { if errors.Cause(err) == io.ErrUnexpectedEOF {
now := time.Now() now := time.Now()
if tpartWrite.IsZero() { if t0partial.IsZero() {
tpartWrite = now t0partial = now
} else if now.Sub(tpartialStart) > 3*time.Second { } else if now.Sub(t0partial) > 3*time.Second {
return err return err // garbage
} }
} else { } else {
// not ok - e.g. IO error // not ok - e.g. IO error
// EOF is ok - it is e.g. when transaction was aborted // only EOF is ok - it is e.g. when transaction was aborted
if err != io.EOF { if err != io.EOF {
return err return err
} }
// EOF - reset t(partial) // EOF - reset t(partial)
tpartWrite = time.Time{} t0partial = time.Time{}
} }
// after an error (EOF, partial read) we have to retry with // after any error (EOF, partial read) we have to retry with
// reset bufferring. // reset bufferring.
resetit() itReset()
continue continue
} }
tpartWrite = time.Time{} // reset // read ok - reset t₀(partial)
t0partial = time.Time{}
// XXX dup wrt Index.Update // XXX dup wrt Index.Update
...@@ -540,8 +548,8 @@ func (fs *FileStorage) watch() (err error) { ...@@ -540,8 +548,8 @@ func (fs *FileStorage) watch() (err error) {
// whether it is finished transaction or not. // whether it is finished transaction or not.
if it.Txnh.Status == zodb.TxnInprogress { if it.Txnh.Status == zodb.TxnInprogress {
// not yet. we have to reset because transaction finish writes // not yet. we have to reset because transaction finish writes
// what we have already buffered. // to what we have already buffered.
resetit() itReset()
continue continue
} }
...@@ -563,11 +571,15 @@ func (fs *FileStorage) watch() (err error) { ...@@ -563,11 +571,15 @@ func (fs *FileStorage) watch() (err error) {
oidv = append(oidv, it.Datah.Oid) oidv = append(oidv, it.Datah.Oid)
} }
// update index // update index & txnh{MinMax}
fs.mu.Lock() fs.mu.Lock()
index.TopPos = it.Txnh.Pos + it.Txnh.Len idx.TopPos = it.Txnh.Pos + it.Txnh.Len
for oid, pos := range update { for oid, pos := range update {
index.Set(oid, pos) idx.Set(oid, pos)
}
fs.txnhMax.CloneFrom(&it.Txnh)
if fs.txnhMin.Len == 0 { // was empty
fs.txnhMin.CloneFrom(&it.Txnh)
} }
fs.mu.Unlock() fs.mu.Unlock()
...@@ -597,11 +609,9 @@ func (fs *FileStorage) watch() (err error) { ...@@ -597,11 +609,9 @@ func (fs *FileStorage) watch() (err error) {
} }
} }
//func (fs *FileStorage) readUpdates() {
//}
func (fs *FileStorage) Watch(ctx context.Context) (zodb.Tid, []zodb.Oid, error) { func (fs *FileStorage) Watch(ctx context.Context) (zodb.Tid, []zodb.Oid, error) {
panic("TODO") panic("TODO")
// <-fs.watchq
} }
// --- open + rebuild index --- // --- open + rebuild index ---
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment