Commit b9efdbbb authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 0f398ab8
......@@ -594,7 +594,7 @@ type WatchLink struct {
byfile map[zodb.Oid]*Watch // {} foid -> Watch
// IO
reqNext uint64 // stream ID for next wcfs-originated request
reqNext uint64 // stream ID for next wcfs-originated request; 0 is reserved for control messages
txMu sync.Mutex
rxMu sync.Mutex
rxTab map[/*stream*/uint64]chan string // client replies go via here
......@@ -1498,6 +1498,7 @@ func (wlink *WatchLink) serve() {
}
// XXX locking
delete(wlink.head.wlinkTab, wlink)
// XXX deactiwate all watches from wlink.byfile[*]
}
func (wlink *WatchLink) _serve() (err error) {
......@@ -1514,9 +1515,15 @@ func (wlink *WatchLink) _serve() (err error) {
err = err2
}
// XXX write to peer if it was logical error on client side
// XXX on which stream? -1? -> better 0 as in HTTP/2?
// XXX then -> CloseWrite
// write to peer if it was logical error on client side
// then .sk.tx to wakeup rx on client side
if err != nil {
_ = wlink.send(ctx, 0, fmt.Sprintf("error: %s", err))
}
err2 = wlink.sk.CloseWrite()
if err == nil {
err = err2
}
}()
// close .sk.rx on error/wcfs stopping or return. closing .sk.rx wakes up read(sk)
......@@ -1630,7 +1637,10 @@ func (wlink *WatchLink) _handleWatch(ctx context.Context, msg string) error {
// sendReq sends wcfs-originated request to client and returns client response.
func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, err error) {
// XXX err ctx
stream := atomic.AddUint64(&wlink.reqNext, +2)
var stream uint64
for stream == 0 {
stream = atomic.AddUint64(&wlink.reqNext, +2)
}
rxq := make(chan string) // XXX cap=1? (so that if we return canceled we do not block client)
wlink.rxMu.Lock()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment