Commit 6928294a authored by Kirill Smelkov's avatar Kirill Smelkov

wcfs: Rework WatchLink.serve exit codepath for better clarity

Bring in more structure:

- final watchlink cleanup is done in its own block
- cancelling spawned handlers is done in another block
- add more comments explaining things
parent ad766f75
...@@ -1869,11 +1869,6 @@ func (wlink *WatchLink) serve() { ...@@ -1869,11 +1869,6 @@ func (wlink *WatchLink) serve() {
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} }
head := wlink.head
head.wlinkMu.Lock()
delete(head.wlinkTab, wlink)
head.wlinkMu.Unlock()
} }
func (wlink *WatchLink) _serve() (err error) { func (wlink *WatchLink) _serve() (err error) {
...@@ -1882,20 +1877,9 @@ func (wlink *WatchLink) _serve() (err error) { ...@@ -1882,20 +1877,9 @@ func (wlink *WatchLink) _serve() (err error) {
ctx0 := context.TODO() // TODO ctx = merge(ctx of wcfs running, ctx of wlink timeout) ctx0 := context.TODO() // TODO ctx = merge(ctx of wcfs running, ctx of wlink timeout)
ctx, cancel := context.WithCancel(ctx0) ctx, cancel := context.WithCancel(ctx0)
wg := xsync.NewWorkGroup(ctx)
// final watchlink cleanup is done on serve exit
defer func() { defer func() {
// cancel all handlers on both error and ok return.
// ( ok return is e.g. when we received "bye", so if client
// sends "bye" and some pin handlers are in progress - they
// anyway don't need to wait for client replies anymore )
cancel()
err2 := wg.Wait()
if err == nil {
err = err2
}
// unregister all watches created on this wlink // unregister all watches created on this wlink
wlink.byfileMu.Lock() wlink.byfileMu.Lock()
for _, w := range wlink.byfile { for _, w := range wlink.byfile {
...@@ -1906,6 +1890,12 @@ func (wlink *WatchLink) _serve() (err error) { ...@@ -1906,6 +1890,12 @@ func (wlink *WatchLink) _serve() (err error) {
wlink.byfile = nil wlink.byfile = nil
wlink.byfileMu.Unlock() wlink.byfileMu.Unlock()
// unregister wlink itself
head := wlink.head
head.wlinkMu.Lock()
delete(head.wlinkTab, wlink)
head.wlinkMu.Unlock()
// write to peer if it was logical error on client side // write to peer if it was logical error on client side
if err != nil { if err != nil {
_ = wlink.send(ctx0, 0, fmt.Sprintf("error: %s", err)) _ = wlink.send(ctx0, 0, fmt.Sprintf("error: %s", err))
...@@ -1913,12 +1903,33 @@ func (wlink *WatchLink) _serve() (err error) { ...@@ -1913,12 +1903,33 @@ func (wlink *WatchLink) _serve() (err error) {
// close .sk // close .sk
// closing .sk.tx wakes up rx on client side. // closing .sk.tx wakes up rx on client side.
err2 = wlink.sk.Close() err2 := wlink.sk.Close()
if err == nil { if err == nil {
err = err2 err = err2
} }
}() }()
// watch handlers are spawned in dedicated workgroup
//
// Pin handlers are run either inside - for pins run from setupWatch, or,
// for pins run from readPinWatchers, outside.
// Upon serve exit we cancel watch and pin handlers ran inside and wait for their completion.
wg := xsync.NewWorkGroup(ctx)
defer func() {
// cancel all handlers on both error and ok return.
// ( ok return is e.g. when we received "bye", so if client
// sends "bye" and some pin handlers are in progress - they
// anyway don't need to wait for client replies anymore )
cancel()
// wait for setupWatch and pin handlers spawned from it to complete
err2 := wg.Wait()
if err == nil {
err = err2
}
}()
// cancel main thread on any watch handler error // cancel main thread on any watch handler error
ctx, mainCancel := context.WithCancel(ctx) ctx, mainCancel := context.WithCancel(ctx)
defer mainCancel() defer mainCancel()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment