Commit 8c05bc4c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 4235f5c6
...@@ -33,10 +33,11 @@ from persistent import Persistent ...@@ -33,10 +33,11 @@ from persistent import Persistent
from persistent.timestamp import TimeStamp from persistent.timestamp import TimeStamp
from ZODB.utils import z64, u64, p64 from ZODB.utils import z64, u64, p64
import sys, os, os.path, subprocess, threading, inspect, traceback import sys, os, os.path, subprocess, threading, inspect, traceback, re
from errno import EINVAL from errno import EINVAL
from golang import go, chan, func, defer, select from golang import go, chan, func, defer, select
from golang import context, sync, time from golang import context, sync, time
from golang.gcompat import qq
from zodbtools.util import ashex as h, fromhex from zodbtools.util import ashex as h, fromhex
from pytest import raises from pytest import raises
from six import reraise from six import reraise
...@@ -676,6 +677,28 @@ class tSrvReq: ...@@ -676,6 +677,28 @@ class tSrvReq:
# XXX also track as answered? (and don't accept with the same ID ?) # XXX also track as answered? (and don't accept with the same ID ?)
def _parse(req): # -> (foid, blk, at|None)
# pin <foid> #<blk> @(<at>|head)
m = re.match(b"pin (?P<foid>[0-9a-f]{16}) #(?P<blk>[0-9]+) @(P<at>[^ ]+)$", req.msg)
if m is None:
raise RuntimeError("message is not valid pin request: %s" % qq(req.msg))
foid = fromhex(m.group('foid'))
blk = int(m.group('blk'))
at = m.group('at')
if at == "head":
at = None
else:
at = fromhex(at)
return foid, blk, at
@property
def foid(req): return req._parse()[0]
@property
def blk(req): return req._parse()[1]
@property
def at(req): return req._parse()[2]
# ---- watch setup/adjust ---- # ---- watch setup/adjust ----
...@@ -768,7 +791,7 @@ def watch(twlink, zf, at): # XXX -> ? ...@@ -768,7 +791,7 @@ def watch(twlink, zf, at): # XXX -> ?
twlink._watch(zf, at, pinok, "ok") twlink._watch(zf, at, pinok, "ok")
# _watch sends watch request for zf@at, expects initial pins specified by pinok, and finaly reply. # _watch sends watch request for zf@at, expects initial pins specified by pinok, and final reply.
# #
# pinok: {} blk -> at that have to be pinned. # pinok: {} blk -> at that have to be pinned.
# if replyok ends with '…' only reply prefix until the dots is checked. # if replyok ends with '…' only reply prefix until the dots is checked.
...@@ -776,6 +799,8 @@ def watch(twlink, zf, at): # XXX -> ? ...@@ -776,6 +799,8 @@ def watch(twlink, zf, at): # XXX -> ?
def _watch(twlink, zf, at, pinok, replyok): def _watch(twlink, zf, at, pinok, replyok):
# send watch request and check that we receive pins for in-cache blocks # send watch request and check that we receive pins for in-cache blocks
# changed > at. Use timeout to detect wcfs replying less pins than expected. # changed > at. Use timeout to detect wcfs replying less pins than expected.
"""
# #
# XXX detect not sent pins with timeout, or better via ack'ing previous # XXX detect not sent pins with timeout, or better via ack'ing previous
# pins as they come in (not waiting for all of them) and then seeing that # pins as they come in (not waiting for all of them) and then seeing that
...@@ -812,14 +837,73 @@ def _watch(twlink, zf, at, pinok, replyok): ...@@ -812,14 +837,73 @@ def _watch(twlink, zf, at, pinok, replyok):
wg.go(_) wg.go(_)
wg.wait() wg.wait()
"""
def _(ctx):
reply = twlink.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at)))
if replyok.endswith('…'):
rok = replyok[:-len('…')]
assert reply[:len(rok)] == rok
else:
assert reply == replyok
doCheckingPin(_, {twlink: pinok})
twlink._watching[zf] = at twlink._watching[zf] = at
# doCheckingPin calls f and verifies that wcfs sends expected pins during the
# time f executes.
#
# f(ctx) # XXX + ev?
# pinokByWLink: {} tWatchLink -> {} blk -> at.
# pinfunc(wlink, foid, blk, at) | None. XXX foid -> ZBigFile?
#
# pinfunc is called after pin request is received from wcfs but before pin ack
# is replied back. pinfunc must not block.
def doCheckingPin(f, pinokByWLink, pinfunc=None):
# call f and check that we receive pins as specified.
# Use timeout to detect wcfs replying less pins than expected.
#
# XXX detect not sent pins with timeout, or better via ack'ing previous
# pins as they come in (not waiting for all of them) and then seeing that
# we did not received expeced pin when wcfs sends final ok?
ctx, cancel = with_timeout()
wg = sync.WorkGroup(ctx)
for wlink, pinok in pinokByWLink.items():
def _(ctx, wlink):
pinv = wlink._expectPin(ctx, zf, pinok) # XXX zf?
tdelay() # increase probability to receive erroneous extra pins
for p in pinv:
if pinfunc is not None:
pinfunc(wlink, p.foid, p.blk, p.at)
p.reply(b"ack")
# check that we don't get extra pins before reply to "watch"
try:
req = wlink.recvReq(ctx)
except Exception as e:
if e is context.canceled:
return # cancel is expected after seeing "ok"
reraise(e, None, e.__traceback__)
assert False, "extra pin message received: %r" % req.msg
wg.go(_, wlink)
def _(ctx):
f(ctx)
# cancel _expectPin waiting upon completing f
# -> error that missed pins were not received.
cancel()
wg.go(_)
wg.wait()
# _expectPin asserts that wcfs sends expected pin messages. # _expectPin asserts that wcfs sends expected pin messages.
# #
# expect is {} blk -> at # expect is {} blk -> at
# returns [] of received pin requests. # returns [] of received pin requests.
@func(tWatchLink) @func(tWatchLink)
def _expectPin(twlink, ctx, zf, expect): def _expectPin(twlink, ctx, zf, expect): # -> []tSrvReq
expected = set() # of expected pin messages expected = set() # of expected pin messages
for blk, at in expect.items(): for blk, at in expect.items():
hat = h(at) if at is not None else 'head' hat = h(at) if at is not None else 'head'
...@@ -969,11 +1053,13 @@ def test_wcfs(): ...@@ -969,11 +1053,13 @@ def test_wcfs():
at4 = t.commit() at4 = t.commit()
f.assertCache([1,1,0,1,0,0]) # FIXME a must be invalidated - see δbtree ^^^ f.assertCache([1,1,0,1,0,0]) # FIXME a must be invalidated - see δbtree ^^^
"""
ctx, cancel = with_timeout() ctx, cancel = with_timeout()
wg = sync.WorkGroup(ctx) wg = sync.WorkGroup(ctx)
"""
blk = 2 blk = 2
pinok = {2: at3} pinok = {2: at3} # XXX at3 -> at <= wl.at for zf
# XXX 5, {5: ø} # XXX 5, {5: ø}
# XXX 0, {0, at3} after δbtree works # XXX 0, {0, at3} after δbtree works
...@@ -981,6 +1067,32 @@ def test_wcfs(): ...@@ -981,6 +1067,32 @@ def test_wcfs():
blk_data = f.blk(blk) blk_data = f.blk(blk)
assert f.cached()[blk] == 0 assert f.cached()[blk] == 0
def _(ctx):
assert f.cached()[blk] == 0
ev.append('read pre')
# access data with released GIL so that the thread that reads data from
# head/watch can receive pin message. Be careful to handle cancelation,
# so that on error in another worker we don't stuck and the error is
# successfully propagated to wait and reported.
have_read = chan(1)
def _():
b = read0_nogil(blk_data)
have_read.send(b)
go(_)
_, _rx = select(
ctx.done().recv, # 0
have_read.recv, # 1
)
if _ == 0:
raise ctx.err()
b = _rx
ev.append('read ' + chr(b))
doCheckingPin(_, {wl: pinok})
"""
# XXX dup wrte tWatchLink._watch # XXX dup wrte tWatchLink._watch
ev = [] ev = []
...@@ -1032,7 +1144,8 @@ def test_wcfs(): ...@@ -1032,7 +1144,8 @@ def test_wcfs():
wg.go(_) wg.go(_)
wg.wait() wg.wait()
assert ev == ['read pre', 'pin rx', 'pin ack pre', 'read 4'] # 4 - read @head """
assert ev == ['read pre', 'pin rx', 'pin ack pre', 'read 4'] # 4 - read @head # XXX depend on blk
assert f.cached()[blk] > 0 assert f.cached()[blk] > 0
f.assertBlk(blk, '4c') f.assertBlk(blk, '4c')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment