Commit 5d0a19af authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 6b0fddc2
...@@ -1608,13 +1608,11 @@ func (wlink *WatchLink) _serve() (err error) { ...@@ -1608,13 +1608,11 @@ func (wlink *WatchLink) _serve() (err error) {
func (wlink *WatchLink) handleWatch(ctx context.Context, stream uint64, msg string) (err error) { func (wlink *WatchLink) handleWatch(ctx context.Context, stream uint64, msg string) (err error) {
defer xerr.Contextf(&err, "%d", stream) defer xerr.Contextf(&err, "%d", stream)
// FIXME error "at is too far away" from setupWatch shuold be not fatal
// (XXX bad parse watch -> fatal (not following protocol))
// XXX kill fatal replies? (i.e. make almost everything non-fatal ?)
err = wlink._handleWatch(ctx, msg) err = wlink._handleWatch(ctx, msg)
reply := "ok" reply := "ok"
if err != nil { if err != nil {
reply = fmt.Sprintf("error %s", err) reply = fmt.Sprintf("error %s", err)
err = nil
} }
err2 := wlink.send(ctx, stream, reply) err2 := wlink.send(ctx, stream, reply)
......
...@@ -477,8 +477,8 @@ class tWatchLink: ...@@ -477,8 +477,8 @@ class tWatchLink:
t._wrx = os.fdopen(wh, 'rb') t._wrx = os.fdopen(wh, 'rb')
t._wtx = os.fdopen(wh2, 'wb') t._wtx = os.fdopen(wh2, 'wb')
# .rx_eof becomes ready when wcfs closes its tx side t.rx_eof = chan() # .rx_eof becomes ready when wcfs closes its tx side
t.rx_eof = chan() t.fatalv = [] # fatal messages go here
# inv.protocol message IO # inv.protocol message IO
t._acceptq = chan() # (stream, msg) server originated messages go here t._acceptq = chan() # (stream, msg) server originated messages go here
...@@ -555,6 +555,7 @@ class tWatchLink: ...@@ -555,6 +555,7 @@ class tWatchLink:
if stream == 0: # control/fatal message from wcfs if stream == 0: # control/fatal message from wcfs
# XXX print -> receive somewhere? # XXX print -> receive somewhere?
print('C: watch : rx fatal: %r' % msg) print('C: watch : rx fatal: %r' % msg)
t.fatalv.append(msg)
continue continue
reply = bool(stream % 2) reply = bool(stream % 2)
...@@ -586,8 +587,11 @@ class tWatchLink: ...@@ -586,8 +587,11 @@ class tWatchLink:
# XXX +ctx? # XXX +ctx?
def _send(t, stream, msg): def _send(t, stream, msg):
assert '\n' not in msg assert '\n' not in msg
pkt = b"%d %s\n" % (stream, msg)
t._write(pkt)
def _write(t, pkt):
with t._txmu: with t._txmu:
pkt = b"%d %s\n" % (stream, msg)
#print('C: watch : tx: %r' % pkt) #print('C: watch : tx: %r' % pkt)
t._wtx.write(pkt) t._wtx.write(pkt)
t._wtx.flush() t._wtx.flush()
...@@ -886,21 +890,22 @@ def test_wcfs(): ...@@ -886,21 +890,22 @@ def test_wcfs():
# >>> invalidation protocol # >>> invalidation protocol
print('\n\n inv. protocol \n\n') print('\n\n inv. protocol \n\n')
# invalid requests -> wcfs replies error
# XXX invalid request not following frame structure `<stream> ...`
# invalid requests -> wcfs replies error
wl = t.openwatch() wl = t.openwatch()
assert wl.sendReq(context.background(), b'bla bla') == \ assert wl.sendReq(context.background(), b'bla bla') == \
b'error bad watch: not a watch request: "bla bla"' b'error bad watch: not a watch request: "bla bla"'
# wcfs must close watch link after invalid request # invalid request not following frame structure -> fatal + wcfs must close watch link
assert wl.fatalv == []
wl._write(b'zzz hello\n')
_, _rx = select( _, _rx = select(
time.after(3*time.second).recv, time.after(3*time.second).recv,
wl.rx_eof.recv, wl.rx_eof.recv,
) )
if _ == 0: if _ == 0:
raise RuntimeError("%s: did not rx EOF after invalid watch request" % wl) raise RuntimeError("%s: did not rx EOF after bad frame " % wl)
assert wl.fatalv == [] # XXX
wl.close() wl.close()
return return
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment