Commit cb68775a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent eaea13a5
......@@ -260,6 +260,7 @@ func (s *storage) watcher() {
switch e := event.(type) {
default:
// XXX -> just log?
panic(fmt.Sprintf("unexpected event: %T", e))
case *EventError:
......@@ -305,16 +306,59 @@ func (s *storage) watcher() {
// AddWatch implements Watcher.
func (s *storage) AddWatch(watchq chan<- Event) (at0 Tid) {
// XXX when already Closed? -> `go watchq <- .downErr + close(watchq)`
ack := make(chan Tid)
s.watchReq <- watchRequest{addWatch, ack, watchq}
select {
// no longer operational: behave if watchq was registered before that
// and then seen down/close events. Interact with DelWatch directly.
case <-s.down:
s.watchMu.Lock()
_, already := s.watchTab[watchq]
if already {
s.watchMu.Unlock()
return // multiple AddWatch
}
s.watchTab[watchq] = struct{}{}
cancel := make(chan struct{})
s.watchCancel[watchq] = cancel
s.watchMu.Unlock()
go func() {
if s.downErr != nil {
select {
case <-cancel:
return
case watchq <- &EventError{s.downErr}:
// ok
}
}
close(watchq)
}()
return s.drvHead
// operational - interact with watcher
case s.watchReq <- watchRequest{addWatch, ack, watchq}:
return <-ack
}
}
// DelWatch implements Watcher.
func (s *storage) DelWatch(watchq chan<- Event) {
// XXX when already Closed? -> noop
ack := make(chan Tid)
s.watchReq <- watchRequest{delWatch, ack, watchq}
select {
// no longer operational - interact with AddWatch directly.
case <-s.down:
s.watchMu.Lock()
delete(s.watchTab, watchq)
cancel := s.watchCancel[watchq]
if cancel != nil {
delete(s.watchCancel, watchq)
close(cancel)
}
s.watchMu.Unlock()
// operational - interact with watcher
case s.watchReq <- watchRequest{delWatch, ack, watchq}:
<-ack
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment