Commit 72eb5bc2 authored by Kirill Smelkov's avatar Kirill Smelkov

X zodb: Correct storage not to deadlock driver on watcher failure

parent 18d00625
...@@ -181,9 +181,10 @@ type storage struct { ...@@ -181,9 +181,10 @@ type storage struct {
driver IStorageDriver driver IStorageDriver
l1cache *Cache // can be =nil, if opened with NoCache l1cache *Cache // can be =nil, if opened with NoCache
down chan struct{} // ready when no longer operational down chan struct{} // ready when no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error in watcher|Sync downOnce sync.Once // shutdown may be due to both Close and IO error in watcher|Sync
downErr error // reason for shutdown downErr error // reason for shutdown
drvCloseErr error // err from .driver.Close()
// watcher // watcher
...@@ -206,6 +207,11 @@ func (s *storage) shutdown(reason error) { ...@@ -206,6 +207,11 @@ func (s *storage) shutdown(reason error) {
s.downOnce.Do(func() { s.downOnce.Do(func() {
close(s.down) close(s.down)
s.downErr = fmt.Errorf("not operational due: %s", reason) s.downErr = fmt.Errorf("not operational due: %s", reason)
// - if called by Close or failed Sync: driver.Close will close
// drvWatchq and cause watcher to stop.
// - if called by failed watcher: closing driver will prevent
// drvWatchq<- deadlock in driver because we no longer read from it.
s.drvCloseErr = s.driver.Close()
}) })
} }
...@@ -219,7 +225,7 @@ func (s *storage) Iterate(ctx context.Context, tidMin, tidMax Tid) ITxnIterator ...@@ -219,7 +225,7 @@ func (s *storage) Iterate(ctx context.Context, tidMin, tidMax Tid) ITxnIterator
func (s *storage) Close() error { func (s *storage) Close() error {
s.shutdown(fmt.Errorf("closed")) s.shutdown(fmt.Errorf("closed"))
return s.driver.Close() // this will close drvWatchq and cause watcher stop return s.drvCloseErr
} }
// loading goes through cache - this way prefetching can work // loading goes through cache - this way prefetching can work
...@@ -492,6 +498,14 @@ func (s *storage) Sync(ctx context.Context) (err error) { ...@@ -492,6 +498,14 @@ func (s *storage) Sync(ctx context.Context) (err error) {
} }
// wait till .head >= head // wait till .head >= head
// XXX instead require from drivers that `drv.Sync() -> head`
// guarantees that all EventCommit with .tid <= head were sent to
// watchq
//
// https://lab.nexedi.com/nexedi/ZODB/commit/40116375
// https://github.com/zopefoundation/ZODB/commit/4a6b0283#diff-d2a01f71a79ac2b379e218cf72fa1205d3426cad19e7b72d71899f643be4bb73
//
// ?
watchq := make(chan Event) watchq := make(chan Event)
at = s.AddWatch(watchq) at = s.AddWatch(watchq)
defer s.DelWatch(watchq) defer s.DelWatch(watchq)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment