Commit dd56e679 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c541e437
...@@ -33,7 +33,7 @@ $WENDELIN_CORE_WCFS_AUTOSTART ...@@ -33,7 +33,7 @@ $WENDELIN_CORE_WCFS_AUTOSTART
$WENDELIN_CORE_WCFS_OPTIONS $WENDELIN_CORE_WCFS_OPTIONS
""" """
import os, sys, hashlib, tempfile, subprocess, time import os, sys, hashlib, tempfile, subprocess, time, re
import logging as log import logging as log
from os.path import dirname from os.path import dirname
from errno import ENOENT, EEXIST from errno import ENOENT, EEXIST
...@@ -46,7 +46,7 @@ import threading ...@@ -46,7 +46,7 @@ import threading
from persistent import Persistent from persistent import Persistent
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
from ZODB.utils import z64, u64, p64 from ZODB.utils import z64, u64, p64
from zodbtools.util import ashex as h # XXX -> use "ashex" from zodbtools.util import ashex as h, fromhex # XXX -> use "ashex"
from six import reraise from six import reraise
...@@ -236,6 +236,94 @@ class WatchLink(object): ...@@ -236,6 +236,94 @@ class WatchLink(object):
wlink._wtx.write(pkt) wlink._wtx.write(pkt)
wlink._wtx.flush() wlink._wtx.flush()
# sendReq sends client -> server request and returns server reply.
#
# only 1 sendReq must be used at a time. # XXX relax?
def sendReq(wlink, ctx, req): # -> reply | None when EOF
rxq = wlink._sendReq(ctx, req)
_, _rx = select(
ctx.done().recv, # 0
rxq.recv, # 1
)
if _ == 0:
raise ctx.err()
return _rx
def _sendReq(wlink, ctx, req): # -> rxq
stream = 1 # XXX -> dynamic
rxq = chan()
with wlink._rxmu:
assert stream not in wlink._rxtab # XXX !test assert - recheck
wlink._rxtab[stream] = rxq
wlink._send(stream, req)
return rxq
# recvReq receives client <- server request.
#
# multiple recvReq could be used at a time.
def recvReq(wlink, ctx): # -> SrvReq | None when EOF
_, _rx = select(
ctx.done().recv, # 0
wlink._acceptq.recv, # 1
)
if _ == 0:
raise ctx.err()
rx = _rx
if rx is None:
return rx
stream, msg = rx
return SrvReq(wlink, stream, msg)
# SrvReq represents 1 server-initiated wcfs request received over /head/watch link.
# XXX struct place -> ^^^ (nearby WatchLink) ?
class SrvReq(object):
def __init__(req, wlink, stream, msg):
req.wlink = wlink
req.stream = stream
req.msg = msg
def reply(req, answer):
#print('C: reply %s <- %r ...' % (req, answer))
wlink = req.wlink
with wlink._rxmu:
assert req.stream in wlink._accepted
wlink._send(req.stream, answer)
with wlink._rxmu:
assert req.stream in wlink._accepted
wlink._accepted.remove(req.stream)
# XXX also track as answered? (and don't accept with the same ID ?)
def _parse(req): # -> (foid, blk, at|None)
# pin <foid> #<blk> @(<at>|head)
m = re.match(b"pin (?P<foid>[0-9a-f]{16}) #(?P<blk>[0-9]+) @(?P<at>[^ ]+)$", req.msg)
if m is None:
raise RuntimeError("message is not valid pin request: %s" % qq(req.msg))
foid = fromhex(m.group('foid'))
blk = int(m.group('blk'))
at = m.group('at')
if at == "head":
at = None
else:
at = fromhex(at)
return foid, blk, at
@property
def foid(req): return req._parse()[0]
@property
def blk(req): return req._parse()[1]
@property
def at(req): return req._parse()[2]
# ---- WCFS raw file access ---- # ---- WCFS raw file access ----
......
...@@ -33,7 +33,7 @@ from persistent import Persistent ...@@ -33,7 +33,7 @@ from persistent import Persistent
from persistent.timestamp import TimeStamp from persistent.timestamp import TimeStamp
from ZODB.utils import z64, u64, p64 from ZODB.utils import z64, u64, p64
import sys, os, os.path, subprocess, threading, inspect, traceback, re import sys, os, os.path, subprocess, threading, inspect, traceback
from thread import get_ident as gettid from thread import get_ident as gettid
from time import gmtime from time import gmtime
from errno import EINVAL, ENOENT, ENOTCONN from errno import EINVAL, ENOENT, ENOTCONN
...@@ -731,96 +731,22 @@ class tWatchLink(wcfs.WatchLink): ...@@ -731,96 +731,22 @@ class tWatchLink(wcfs.WatchLink):
t.tdb._wlinks.remove(t) t.tdb._wlinks.remove(t)
super(tWatchLink, t).close() super(tWatchLink, t).close()
# recvReq is the same as WatchLink.recvReq but returns tSrvReq instead of SrvReq.
# ---- message IO ----
# sendReq sends client -> server request and returns server reply.
#
# only 1 sendReq must be used at a time. # XXX relax?
def sendReq(t, ctx, req): # reply | None when EOF
rxq = t._sendReq(ctx, req)
_, _rx = select(
ctx.done().recv, # 0
rxq.recv, # 1
)
if _ == 0:
raise ctx.err()
return _rx
def _sendReq(t, ctx, req): # -> rxq
stream = 1 # XXX -> dynamic
rxq = chan()
with t._rxmu:
assert stream not in t._rxtab
t._rxtab[stream] = rxq
t._send(stream, req)
return rxq
# recvReq receives client <- server request.
#
# multiple recvReq could be used at a time.
def recvReq(t, ctx): # -> tSrvReq | None when EOF def recvReq(t, ctx): # -> tSrvReq | None when EOF
_, _rx = select( req = super(tWatchLink, t).recvReq(ctx)
ctx.done().recv, # 0 if req is not None:
t._acceptq.recv, # 1 assert req.__class__ is wcfs.SrvReq
) req.__class__ = tSrvReq
if _ == 0: return req
raise ctx.err()
class tSrvReq(wcfs.SrvReq):
rx = _rx # _parse is the same as SrvReq._parse, but returns at wrapped with tAt.
if rx is None:
return rx
stream, msg = rx
return tSrvReq(t, stream, msg)
# tSrvReq represents 1 server-initiated wcfs request received over /head/watch link.
class tSrvReq:
def __init__(req, twlink, stream, msg):
req.twlink = twlink
req.stream = stream
req.msg = msg
def reply(req, answer):
#print('C: reply %s <- %r ...' % (req, answer))
t = req.twlink
with t._rxmu:
assert req.stream in t._accepted
t._send(req.stream, answer)
with t._rxmu:
assert req.stream in t._accepted
t._accepted.remove(req.stream)
# XXX also track as answered? (and don't accept with the same ID ?)
def _parse(req): # -> (foid, blk, at|None) def _parse(req): # -> (foid, blk, at|None)
# pin <foid> #<blk> @(<at>|head) foid, blk, at = super(tSrvReq, req)._parse()
m = re.match(b"pin (?P<foid>[0-9a-f]{16}) #(?P<blk>[0-9]+) @(?P<at>[^ ]+)$", req.msg) if at is not None:
if m is None: at = tAt(req.wlink.tdb, at)
raise RuntimeError("message is not valid pin request: %s" % qq(req.msg))
foid = fromhex(m.group('foid'))
blk = int(m.group('blk'))
at = m.group('at')
if at == "head":
at = None
else:
at = tAt(req.twlink.tdb, fromhex(at))
return foid, blk, at return foid, blk, at
@property
def foid(req): return req._parse()[0]
@property
def blk(req): return req._parse()[1]
@property
def at(req): return req._parse()[2]
# ---- infrastructure: watch setup/adjust ---- # ---- infrastructure: watch setup/adjust ----
...@@ -1031,7 +957,7 @@ def doCheckingPin(f, pinokByWLink, pinfunc=None): # -> []event(str) ...@@ -1031,7 +957,7 @@ def doCheckingPin(f, pinokByWLink, pinfunc=None): # -> []event(str)
# expect is {} blk -> at # expect is {} blk -> at
# returns [] of received pin requests. # returns [] of received pin requests.
@func(tWatchLink) @func(tWatchLink)
def _expectPin(twlink, ctx, zf, expect): # -> []tSrvReq def _expectPin(twlink, ctx, zf, expect): # -> []SrvReq
expected = set() # of expected pin messages expected = set() # of expected pin messages
for blk, at in expect.items(): for blk, at in expect.items():
hat = h(at) if at is not None else 'head' hat = h(at) if at is not None else 'head'
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment