Commit 3ba362ba authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c2e56e80
......@@ -155,17 +155,15 @@ struct _Mapping {
// StreamID stands for ID of a stream multiplexed over WatchLink.
typedef uint64_t StreamID;
// rxPkt internally represents data of one message sent/received over WatchLink.
// XXX recv only ok?
// XXX place?
// rxPkt internally represents data of one message received over WatchLink.
struct rxPkt {
// stream over which the data was received; XXX kill: used internally by send
// stream over which the data was received
StreamID stream;
// raw data received/to-be-sent.
// XXX not e.g. string as chan<T> currently does not support types with
// non-trivial copy. Note: we anyway need to limit line length to avoid DoS
// but just for DoS the limit would be higher.
// non-trivial copy. Note: we anyway need to limit rx line length to
// avoid DoS, but just for DoS the limit would be higher.
uint16_t datalen;
char data[128 - sizeof(StreamID) - sizeof(uint16_t)];
......@@ -209,7 +207,7 @@ class WatchLink {
public:
friend tuple<WatchLink*, error> WCFS::_openwatch();
error close();
SrvReq *recvReq(IContext *ctx);
error recvReq(IContext *ctx, SrvReq *rx_into);
tuple<string, error> sendReq(IContext *ctx, const string &req);
private:
......@@ -227,7 +225,7 @@ struct SrvReq {
StreamID stream; // request was received with this stream ID
Oid foid; // request is about this file
int64_t blk; // ----//---- about this block
Tid at; // pin to this at XXX ffff = None = unpin
Tid at; // pin to this at; TidHead means unpin to head
};
......@@ -260,6 +258,31 @@ tuple<Conn*, error> WCFS::connect(Tid at) {
}
// XXX Conn::close
#if 0
# close releases resources associated with wconn.
# XXX what happens to file mmappings?
@func(Conn)
def close(wconn):
wconn._wlink.close()
wconn._pinCancel()
try:
wconn._pinWG.wait()
except Exception as e: # canceled - ok
if e is not context.canceled:
raise
# close all files - both that have no mappings and that still have opened mappings.
# XXX after file is closed mappings continue to survive, but we can no
# longer maintain consistent view.
with wconn._filemu:
for f in wconn._filetab.values():
f.headf.close()
f.headf = None
# XXX stop watching f
wconn._filetab = None
#endif
// _pinner receives pin messages from wcfs and adjusts wconn mappings.
void Conn::_pinner(IContext *ctx) {
......@@ -269,6 +292,7 @@ void Conn::_pinner(IContext *ctx) {
while (1) {
// XXX -> recv inplace into on-stack req ?
// XXX -> err, handle EOF, abort on *
SrvReq *req = wconn._wlink->recvReq(ctx);
if (req == NULL)
return // XXX ok? (EOF - when wcfs closes wlink)
......@@ -327,6 +351,10 @@ void Conn::_pin1(SrvReq *req) {
wconn._filemu.unlock();
}
// XXX Conn::mmap
// XXX Conn::resync
// _remmapblk remmaps mapping memory for file[blk] to be viewing database as of @at state.
//
// at=TidHead means unpin to head/ .
......@@ -383,6 +411,10 @@ error _Mapping::_remmapblk(int64_t blk, Tid at) {
return nil;
}
// XXX _Mapping::remmap_blk
// XXX _Mapping::unmap
// ---- WatchLink ----
......@@ -630,6 +662,28 @@ tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(IContext *ctx, const string
return make_tuple(rxq, err);
}
// recvReq receives client <- server request.
error WatchLink::recvReq(IContext *ctx, SrvReq *rx) {
WatchLink& wlink = *this;
rxPkt pkt;
_ = select(
ctx->done().recvs(), // 0
wlink._acceptq.recvs(&pkt), // 1
)
if (_ == 0)
return ctx.err();
rx = _rx
if rx is None: // XXX recheck _serveRX vs EOF signalling
return rx
rx.stream = pkt.stream;
pkt.to_sting();
stream, msg = rx
return SrvReq(wlink, stream, msg)
}
// _readline reads next raw line sent from wcfs.
tuple<string, error> WatchLink::_readline() {
WatchLink& wlink = *this;
......@@ -639,8 +693,8 @@ tuple<string, error> WatchLink::_readline() {
while (1) {
auto nl = wlink._rxbuf.find('\n', nl_searchfrom);
if (nl != string::npos) {
auto line = string(wlink._rxbuf, 0, nl+1);
wlink._rxbuf = string(wlink._rxbuf, nl+1);
auto line = wlink._rxbuf.substr(0, nl+1);
wlink._rxbuf = wlink._rxbuf.substr(nl+1);
return make_tuple(line, nil);
}
nl_searchfrom = wlink._rxbuf.length();
......@@ -669,8 +723,8 @@ error rxPkt::from_string(const string &rx) {
auto sp = rx.find(' ');
if (sp == string::npos)
return fmt::errorf("invalid pkt: no SP");
string sid = string(rx, 0, sp);
string smsg = string(rx, sp+1);
string sid = rx.substr(0, sp);
string smsg = rx.substr(sp+1);
sscanf(sid.c_str(), "%" SCNu64, &pkt.stream);
if (std::to_string(pkt.stream) != sid)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment