Commit af0a64cb authored by Kirill Smelkov's avatar Kirill Smelkov

X test for "bye" canceling blocked handlers

parent 12628943
...@@ -1325,12 +1325,17 @@ retry: ...@@ -1325,12 +1325,17 @@ retry:
// //
// XXX error - when? or close watch on any error? // XXX error - when? or close watch on any error?
func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
defer xerr.Contextf(&err, "wlink%d: f<%s>", w.link.id, w.file.zfile.POid())
return w._pin(ctx, blk, rev)
}
func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
foid := w.file.zfile.POid() foid := w.file.zfile.POid()
revstr := rev.String() revstr := rev.String()
if rev == zodb.TidMax { if rev == zodb.TidMax {
revstr = "head" revstr = "head"
} }
defer xerr.Contextf(&err, "f<%s>: wlink%d: pin #%d @%s", foid, w.link.id, blk, revstr) defer xerr.Contextf(&err, "pin #%d @%s", blk, revstr)
// XXX locking? // XXX locking?
// XXX simultaneous calls? // XXX simultaneous calls?
...@@ -1464,7 +1469,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1464,7 +1469,7 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
blk := blk blk := blk
rev := rev rev := rev
wg.Go(func() error { wg.Go(func() error {
return w.pin(ctx, blk, rev) return w._pin(ctx, blk, rev)
}) })
} }
err = wg.Wait() err = wg.Wait()
......
...@@ -641,6 +641,7 @@ class tWatchLink: ...@@ -641,6 +641,7 @@ class tWatchLink:
t._accepted = set() # of stream streams we accepted but did not replied yet t._accepted = set() # of stream streams we accepted but did not replied yet
t._txmu = threading.Lock() # serializes writes t._txmu = threading.Lock() # serializes writes
t._txclosed = False
serveCtx, t._serveCancel = context.with_cancel(context.background()) serveCtx, t._serveCancel = context.with_cancel(context.background())
t._serveWG = sync.WorkGroup(serveCtx) t._serveWG = sync.WorkGroup(serveCtx)
...@@ -651,17 +652,25 @@ class tWatchLink: ...@@ -651,17 +652,25 @@ class tWatchLink:
tdb._wlinks.add(t) tdb._wlinks.add(t)
def close(t): def _closeTX(t):
t.tdb._wlinks.remove(t) if t._txclosed:
return
t._serveCancel()
# ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up # ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up
# _serveRX on client (= on us). The connection can be already closed by # _serveRX on client (= on us). The connection can be already closed by
# wcfs - so ignore errorswhen sending bye. # wcfs - so ignore errors when sending bye.
try: try:
t._send(1, b'bye') t._send(1, b'bye')
except IOError: except IOError:
pass pass
t._wtx.close()
t._txclosed = True
def close(t):
t.tdb._wlinks.remove(t)
t._closeTX()
t._serveCancel()
# XXX we can get stuck here if wcfs does not behave as we want. # XXX we can get stuck here if wcfs does not behave as we want.
# XXX in particular if there is a silly - e.g. syntax or type error in # XXX in particular if there is a silly - e.g. syntax or type error in
# test code - we curently get stuck here. # test code - we curently get stuck here.
...@@ -674,7 +683,6 @@ class tWatchLink: ...@@ -674,7 +683,6 @@ class tWatchLink:
if e != context.canceled: if e != context.canceled:
reraise(e, None, e.__traceback__) reraise(e, None, e.__traceback__)
t._wtx.close()
t._wrx.close() t._wrx.close()
...@@ -1133,13 +1141,30 @@ def test_wcfs(): ...@@ -1133,13 +1141,30 @@ def test_wcfs():
# >>> XXX commit data to not yet accessed f part - nothing happens # >>> XXX commit data to not yet accessed f part - nothing happens
""" # """
# >>> invalidation protocol # >>> invalidation protocol
print('\n\n inv. protocol \n\n') print('\n\n inv. protocol \n\n')
# closeTX/bye cancels blocked pin handlers
wl = t.openwatch()
wg = sync.WorkGroup(timeout())
def _(ctx):
assert wl.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at2))) == \
"error setup watch f<%s> @%s: pin #%d @%s: context canceled" % \
(h(zf._p_oid), h(at2), 2, h(at2))
wg.go(_)
def _(ctx):
req = wl.recvReq(ctx)
assert req is not None
assert req.msg == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at2))
# don't reply to req - close instead
wl._closeTX()
wg.go(_)
wg.wait()
# invalid requests -> wcfs replies error # invalid requests -> wcfs replies error
wl = t.openwatch() wl = t.openwatch()
assert wl.sendReq(context.background(), b'bla bla') == \ assert wl.sendReq(timeout(), b'bla bla') == \
b'error bad watch: not a watch request: "bla bla"' b'error bad watch: not a watch request: "bla bla"'
# invalid request not following frame structure -> fatal + wcfs must close watch link # invalid request not following frame structure -> fatal + wcfs must close watch link
...@@ -1181,7 +1206,7 @@ def test_wcfs(): ...@@ -1181,7 +1206,7 @@ def test_wcfs():
for at in revv[1:]: for at in revv[1:]:
wl.watch(zf, at) wl.watch(zf, at)
wl.close() wl.close()
""" # """
# XXX move before setup watch? # XXX move before setup watch?
print('\n\n\n\nWATCH+COMMIT\n\n') print('\n\n\n\nWATCH+COMMIT\n\n')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment