Commit ffef2a72 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3d7bbff5
...@@ -91,17 +91,17 @@ type FileStorage struct { ...@@ -91,17 +91,17 @@ type FileStorage struct {
// protects updates to index and to txnh{Min,Max} - in other words // protects updates to index and to txnh{Min,Max} - in other words
// to everything that depends on what current last transaction is. // to everything that depends on what current last transaction is.
mu sync.RWMutex mu sync.RWMutex
index *Index // oid -> data record position in transaction which last changed oid
index *Index // oid -> data record position in transaction which last changed oid txnhMin TxnHeader // transaction headers for min/max transactions committed
txnhMax TxnHeader // (both with .Len=0 & .Tid=0 if database is empty)
// transaction headers for min/max transactions committed
// (both with .Len=0 & .Tid=0 if database is empty)
txnhMin TxnHeader
txnhMax TxnHeader
// driver client <- watcher: data file updates. // driver client <- watcher: data file updates.
watchq chan watchEvent watchq chan watchEvent
down chan struct{} // ready when FileStorage is no longer operational
downOnce sync.Once
errClose error // error from .file.Close()
} }
// IStorageDriver // IStorageDriver
...@@ -451,9 +451,19 @@ func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb. ...@@ -451,9 +451,19 @@ func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb.
func (fs *FileStorage) watcher(w *fsnotify.Watcher) { func (fs *FileStorage) watcher(w *fsnotify.Watcher) {
defer w.Close() // XXX lclose defer w.Close() // XXX lclose
err := fs._watcher(w) err := fs._watcher(w)
// it is ok if we got read error due to file being closed
if e, _ := errors.Cause(err).(*os.PathError); e != nil && e.Err == os.ErrClosed {
select {
case <-fs.down:
err = nil
default:
}
}
// XXX fs.watchErr = err (-> fail other operations) // XXX fs.watchErr = err (-> fail other operations)
_ = err if err != nil {
log.Print(err) log.Print(err)
}
} }
func (fs *FileStorage) _watcher(w *fsnotify.Watcher) (err error) { func (fs *FileStorage) _watcher(w *fsnotify.Watcher) (err error) {
...@@ -474,7 +484,9 @@ mainloop: ...@@ -474,7 +484,9 @@ mainloop:
if !first { if !first {
//tracef("select ...") //tracef("select ...")
select { select {
// XXX handle close case <-fs.down:
// closed
return nil
case err := <-w.Errors: case err := <-w.Errors:
//tracef("error: %s", err) //tracef("error: %s", err)
...@@ -506,7 +518,7 @@ mainloop: ...@@ -506,7 +518,7 @@ mainloop:
case fsize == idx.TopPos: case fsize == idx.TopPos:
continue // same as before continue // same as before
case fsize < idx.TopPos: case fsize < idx.TopPos:
// XXX add pack support // XXX add pack support?
return fmt.Errorf("file truncated (%d -> %d)", idx.TopPos, fsize) return fmt.Errorf("file truncated (%d -> %d)", idx.TopPos, fsize)
} }
...@@ -592,11 +604,15 @@ mainloop: ...@@ -592,11 +604,15 @@ mainloop:
} }
fs.mu.Unlock() fs.mu.Unlock()
tracef("-> tid=%s δoidv=%v", it.Txnh.Tid, oidv) // XXX oidv=[0,0] - recheck tracef("-> tid=%s δoidv=%v", it.Txnh.Tid, oidv)
select {
case <-fs.down:
return nil
// XXX cancel on close case fs.watchq <- watchEvent{it.Txnh.Tid, oidv}:
fs.watchq <- watchEvent{it.Txnh.Tid, oidv} // ok
//tracef("zzz") }
} }
} }
} }
...@@ -607,30 +623,35 @@ type watchEvent struct { ...@@ -607,30 +623,35 @@ type watchEvent struct {
oidv []zodb.Oid oidv []zodb.Oid
} }
// XXX doc
func (fs *FileStorage) Watch(ctx context.Context) (_ zodb.Tid, _ []zodb.Oid, err error) { func (fs *FileStorage) Watch(ctx context.Context) (_ zodb.Tid, _ []zodb.Oid, err error) {
defer xerr.Contextf(&err, "%s: watch", fs.file.Name()) defer xerr.Contextf(&err, "%s: watch", fs.file.Name())
// XXX handle close
//tracef("watch -> select ...")
select { select {
case <-ctx.Done(): case <-ctx.Done():
//tracef("\t-> canceled")
return zodb.InvalidTid, nil, ctx.Err() return zodb.InvalidTid, nil, ctx.Err()
case <-fs.down:
return zodb.InvalidTid, nil, os.ErrClosed // FIXME -> proper error
case w := <-fs.watchq: case w := <-fs.watchq:
//tracef("\t-> data")
return w.tid, w.oidv, nil return w.tid, w.oidv, nil
} }
} }
// --- open + rebuild index --- // --- open + rebuild index ---
func (fs *FileStorage) Close() error { func (fs *FileStorage) shutdown() {
// XXX stop watcher fs.downOnce.Do(func() {
fs.errClose = fs.file.Close()
close(fs.down)
})
}
err := fs.file.Close() func (fs *FileStorage) Close() error {
if err != nil { fs.shutdown()
return &zodb.OpError{URL: fs.URL(), Op: "close", Args: nil, Err: err} if fs.errClose != nil {
return &zodb.OpError{URL: fs.URL(), Op: "close", Args: nil, Err: fs.errClose}
} }
// TODO if opened !ro -> .saveIndex() // TODO if opened !ro -> .saveIndex()
...@@ -644,6 +665,7 @@ func (fs *FileStorage) Close() error { ...@@ -644,6 +665,7 @@ func (fs *FileStorage) Close() error {
func Open(ctx context.Context, path string) (_ *FileStorage, err error) { func Open(ctx context.Context, path string) (_ *FileStorage, err error) {
fs := &FileStorage{ fs := &FileStorage{
watchq: make(chan watchEvent), watchq: make(chan watchEvent),
down: make(chan struct{}),
} }
f, err := os.Open(path) f, err := os.Open(path)
......
...@@ -29,6 +29,8 @@ import ( ...@@ -29,6 +29,8 @@ import (
"reflect" "reflect"
"testing" "testing"
"github.com/pkg/errors"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/go123/exc" "lab.nexedi.com/kirr/go123/exc"
...@@ -468,5 +470,7 @@ func TestWatch(t *testing.T) { ...@@ -468,5 +470,7 @@ func TestWatch(t *testing.T) {
} }
_, _, err = fs.Watch(ctx) _, _, err = fs.Watch(ctx)
// XXX assert err = "closed" if e, eWant := errors.Cause(err), os.ErrClosed; e != eWant {
t.Fatalf("watch after close -> %v; want: cause %v", err, eWant)
}
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment