Commit 6454fbcf authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 09b49b68
...@@ -692,7 +692,14 @@ func traceZWatch(format string, argv ...interface{}) { ...@@ -692,7 +692,14 @@ func traceZWatch(format string, argv ...interface{}) {
// zwatcher watches for ZODB changes. // zwatcher watches for ZODB changes.
// //
// see "4) when we receive an invalidation message from ZODB ..." // see "4) when we receive an invalidation message from ZODB ..."
func (root *Root) zwatcher(ctx context.Context, zwatchq chan zodb.Event) (err error) { func (root *Root) zwatcher(ctx context.Context, zwatchq chan zodb.Event) {
err := root._zwatcher(ctx, zwatchq)
if err != nil {
panic(err) // XXX -> wg.Wait in main
}
}
func (root *Root) _zwatcher(ctx context.Context, zwatchq chan zodb.Event) (err error) {
defer xerr.Contextf(&err, "zwatch %s", root.zstor.URL()) defer xerr.Contextf(&err, "zwatch %s", root.zstor.URL())
// XXX error -> always EIO for data operations // XXX error -> always EIO for data operations
// XXX unmount -> stop // XXX unmount -> stop
...@@ -726,13 +733,18 @@ func (root *Root) zwatcher(ctx context.Context, zwatchq chan zodb.Event) (err er ...@@ -726,13 +733,18 @@ func (root *Root) zwatcher(ctx context.Context, zwatchq chan zodb.Event) (err er
return zevent.Err return zevent.Err
case *zodb.EventCommit: case *zodb.EventCommit:
root.handleδZ(zevent) err = root.handleδZ(zevent)
if err != nil {
return err
}
} }
} }
} }
// handleδZ handles 1 change event from ZODB notification. // handleδZ handles 1 change event from ZODB notification.
func (root *Root) handleδZ(δZ *zodb.EventCommit) { func (root *Root) handleδZ(δZ *zodb.EventCommit) (err error) {
defer xerr.Contextf(&err, "handleδZ @%s", δZ.Tid)
head := root.head head := root.head
// while we are invalidating OS cache, make sure that nothing, that // while we are invalidating OS cache, make sure that nothing, that
...@@ -803,9 +815,9 @@ retry: ...@@ -803,9 +815,9 @@ retry:
}) })
} }
} }
err := wg.Wait() err = wg.Wait()
if err != nil { if err != nil {
panic(err) // XXX return err
} }
// invalidate kernel cache for attributes // invalidate kernel cache for attributes
...@@ -824,7 +836,7 @@ retry: ...@@ -824,7 +836,7 @@ retry:
} }
err = wg.Wait() err = wg.Wait()
if err != nil { if err != nil {
panic(err) // XXX return err
} }
// resync .zhead to δZ.tid // resync .zhead to δZ.tid
...@@ -835,7 +847,7 @@ retry: ...@@ -835,7 +847,7 @@ retry:
_, ctx = transaction.New(context.Background()) // XXX bg ok? _, ctx = transaction.New(context.Background()) // XXX bg ok?
err = zhead.Resync(ctx, δZ.Tid) err = zhead.Resync(ctx, δZ.Tid)
if err != nil { if err != nil {
panic(err) // XXX return err
} }
zhead.txnCtx = ctx zhead.txnCtx = ctx
...@@ -845,7 +857,7 @@ retry: ...@@ -845,7 +857,7 @@ retry:
for file := range δF.ByFile { for file := range δF.ByFile {
size, sizePath, err := file.zfile.Size(ctx) size, sizePath, err := file.zfile.Size(ctx)
if err != nil { if err != nil {
panic(err) // XXX return err
} }
file.size = size file.size = size
...@@ -874,6 +886,8 @@ retry: ...@@ -874,6 +886,8 @@ retry:
close(w.ready) close(w.ready)
} }
} }
return nil
} }
// hwaiter represents someone waiting for zhead to become ≥ at. // hwaiter represents someone waiting for zhead to become ≥ at.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment