Commit f7789b0e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 0c484705
......@@ -101,9 +101,10 @@ type FileStorage struct {
// driver client <- watcher: database commits.
watchq chan<- zodb.WatchEvent
down chan struct{} // ready when storage is no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher
errClose error // error from .file.Close()
down chan struct{} // ready when storage is no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher
errClose error // error from .file.Close()
watchWg sync.WaitGroup // to wait for watcher finish
}
// IStorageDriver
......@@ -471,6 +472,7 @@ func (fs *FileStorage) Iterate(_ context.Context, tidMin, tidMax zodb.Tid) zodb.
//
// if errFirstRead is !nil, the error of reading first transaction header is sent to it.
func (fs *FileStorage) watcher(w *fsnotify.Watcher, errFirstRead chan<- error) {
defer fs.watchWg.Done()
defer w.Close() // XXX lclose
err := fs._watcher(w, errFirstRead)
// it is ok if we got read error due to file being closed
......@@ -692,7 +694,7 @@ func (fs *FileStorage) shutdown(reason error) {
func (fs *FileStorage) Close() error {
fs.shutdown(fmt.Errorf("closed"))
// XXX wait for watcher?
fs.watchWg.Wait()
if fs.errClose != nil {
return fs.zerr("close", nil, fs.errClose)
......@@ -819,6 +821,7 @@ func Open(ctx context.Context, path string, opt *zodb.DriverOptions) (_ *FileSto
errFirstRead = make(chan error, 1)
}
fs.watchWg.Add(1)
go fs.watcher(w, errFirstRead)
if checkTailGarbage {
......
......@@ -476,13 +476,17 @@ func TestOpenRecovery(t *testing.T) {
workdir := xworkdir(t)
ctx := context.Background()
// checkL runs f on main + voteTail[:l]
// checkL runs f on main + voteTail[:l] . Two cases are verified:
// 1) when index is already present, and
// 2) when initially there is no index.
checkL := func(t *testing.T, l int, f func(t *testing.T, tfs string)) {
t.Run(fmt.Sprintf("tail=+vote%d", l), func(t *testing.T) {
tfs := fmt.Sprintf("%s/1+vote%d.fs", workdir, l)
tfs := fmt.Sprintf("%s/1+vote%d.fs", workdir, l)
t.Run(fmt.Sprintf("oldindex=n/tail=+vote%d", l), func(t *testing.T) {
err := ioutil.WriteFile(tfs, append(main, voteTail[:l]...), 0600); X(err)
err = ioutil.WriteFile(tfs+".index", index, 0600); X(err)
f(t, tfs)
})
t.Run(fmt.Sprintf("oldindex=y/tail=+vote%d", l), func(t *testing.T) {
err := ioutil.WriteFile(tfs+".index", index, 0600); X(err)
f(t, tfs)
})
}
......@@ -495,12 +499,13 @@ func TestOpenRecovery(t *testing.T) {
for _, l := range lok {
checkL(t, l, func(t *testing.T, tfs string) {
fs := xfsopen(t, tfs)
defer func() {
err = fs.Close(); X(err)
}()
head, err := fs.LastTid(ctx); X(err)
if head != lastTidOk {
t.Fatalf("last_tid: %s ; expected: %s", head, lastTidOk)
}
err = fs.Close(); X(err)
})
}
......
......@@ -442,13 +442,11 @@ type WatchEvent struct {
// Watcher allows to be notified of changes to database.
type Watcher interface {
// Watch waits-for and returns next event corresponding to comitted transaction.
//
// XXX queue overflow -> special error?
Watch(ctx context.Context) (WatchEvent, error) // XXX name -> Read? ReadEvent?
// Close stops the watcher.
// err is always nil. XXX ok?
Close() error
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment