Commit 43915fe9 authored by Kirill Smelkov's avatar Kirill Smelkov

X wcfs: Don't forbid simultaneous watch requests

The limitation that only one simultaneous watch request over one wlink
is possible was added in April (b4857f66) when both Watch and WatchLink
locking was not there and marked as XXX. That locking was added in July
(85d86a32) though, so there should be real need to simultaneous watch
requests over one wlink.

Also the limit implementation had a bug: it was setting handlingWatch
back to 0, but _after_ sending reply to client. This way a situation was
possible when client is woken up first, sends another watch request,
wcfs was not yet scheduled, handlingWatch is still _1_, and watch
request is rejected. This bug is very likely to happen when running wcfs
tests with 2 CPU machine or with just GOMAXPROCS=2:

    C: setup watch f<0000000000000043> @at1 (03d2c23f46d04dcc)
    #  pinok: {2: @at1 (03d2c23f46d04dcc), 3: @at0 (03d2c23f46c44300), 5: @at0 (03d2c23f46c44300)}
    S: wlink 6: rx: "1 watch 0000000000000043 @03d2c23f46d04dcc\n"
    S: wlink 6: tx: "2 pin 0000000000000043 #3 @03d2c23f46c44300\n"
    C: watch  : rx: '2 pin 0000000000000043 #3 @03d2c23f46c44300\n'
    S: wlink 6: tx: "4 pin 0000000000000043 #2 @03d2c23f46d04dcc\n"
    S: wlink 6: tx: "6 pin 0000000000000043 #5 @03d2c23f46c44300\n"
    C: watch  : rx: '4 pin 0000000000000043 #2 @03d2c23f46d04dcc\n'
    C: watch  : rx: '6 pin 0000000000000043 #5 @03d2c23f46c44300\n'
    S: wlink 6: rx: "2 ack\n"
    S: wlink 6: rx: "4 ack\n"
    S: wlink 6: rx: "6 ack\n"
    S: wlink 6: tx: "1 ok\n"
    C: watch  : rx: '1 ok\n'

    C: setup watch f<0000000000000043> (@at1 (03d2c23f46d04dcc) ->) @at2 (03d2c23f46e91daa)
    # pin@old: {2: @at1 (03d2c23f46d04dcc), 3: @at0 (03d2c23f46c44300), 5: @at0 (03d2c23f46c44300)}
    # pin@new: {2: @at2 (03d2c23f46e91daa), 3: @at2 (03d2c23f46e91daa), 5: @at2 (03d2c23f46e91daa)}
    #  pinok: {2: @at2 (03d2c23f46e91daa), 3: @at2 (03d2c23f46e91daa), 5: @at2 (03d2c23f46e91daa)}
    S: wlink 6: rx: "3 watch 0000000000000043 @03d2c23f46e91daa\n"
    S: wlink 6: tx: "0 error: 3: another watch request is already in progress\n"
    C: watch  : rx: '0 error: 3: another watch request is already in progress\n'
    C: watch  : rx fatal: 'error: 3: another watch request is already in progress'
    C: watch  : rx: ''

If we would need to maintain the limit, we should move setting
handlingWatch=0 just before sending final reply to client, but since the
need for the limit is not there anymore, let's fix it by removing the
limit altogether.
parent 26269f8e
...@@ -59,6 +59,10 @@ setenv = ...@@ -59,6 +59,10 @@ setenv =
wcfs:1: WENDELIN_CORE_VIRTMEM=r:wcfs+w:uvmm wcfs:1: WENDELIN_CORE_VIRTMEM=r:wcfs+w:uvmm
wcfs:1: GOMAXPROCS=1 wcfs:1: GOMAXPROCS=1
# ----//---- 2 main OS threads in wcfs
wcfs:2: WENDELIN_CORE_VIRTMEM=r:wcfs+w:uvmm
wcfs:2: GOMAXPROCS=2
commands= {envpython} setup.py test commands= {envpython} setup.py test
# XXX setenv = TMPDIR = ... ? (so that /tmp is not on tmpfs and we don't run out of memory on bench) # XXX setenv = TMPDIR = ... ? (so that /tmp is not on tmpfs and we don't run out of memory on bench)
# + {envpython} setup.py bench (?) # + {envpython} setup.py bench (?)
...@@ -1789,10 +1789,6 @@ func (wlink *WatchLink) _serve() (err error) { ...@@ -1789,10 +1789,6 @@ func (wlink *WatchLink) _serve() (err error) {
return e return e
}) })
// allow for only 1 watch request at a time
// TODO allow simultaneous watch requests for different files
var handlingWatch int32
for { for {
l, err := r.ReadString('\n') // XXX limit accepted line len to prevent DOS l, err := r.ReadString('\n') // XXX limit accepted line len to prevent DOS
if err != nil { if err != nil {
...@@ -1833,12 +1829,7 @@ func (wlink *WatchLink) _serve() (err error) { ...@@ -1833,12 +1829,7 @@ func (wlink *WatchLink) _serve() (err error) {
} }
// watch ... // watch ...
if atomic.LoadInt32(&handlingWatch) != 0 {
return fmt.Errorf("%d: another watch request is already in progress", stream)
}
atomic.StoreInt32(&handlingWatch, 1)
wg.Go(func() error { wg.Go(func() error {
defer atomic.StoreInt32(&handlingWatch, 0)
return wlink.handleWatch(ctx, stream, msg) return wlink.handleWatch(ctx, stream, msg)
}) })
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment