Commit 8f706761 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e328d422
...@@ -79,7 +79,7 @@ static error mmap_into_ro(void *addr, size_t size, os::File f, off_t offset); ...@@ -79,7 +79,7 @@ static error mmap_into_ro(void *addr, size_t size, os::File f, off_t offset);
// connect creates new Conn viewing WCFS state as of @at. // connect creates new Conn viewing WCFS state as of @at.
pair<Conn, error> WCFS::connect(zodb::Tid at) { pair<Conn, error> WCFS::connect(zodb::Tid at) {
WCFS *wc = this; WCFS *wc = this;
// XXX err ctx // XXX errctx "connect @%s", at
// TODO support !isolated mode // TODO support !isolated mode
...@@ -108,20 +108,26 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) { ...@@ -108,20 +108,26 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) {
// XXX what happens to file mmappings? // XXX what happens to file mmappings?
error _Conn::close() { error _Conn::close() {
_Conn& wconn = *this; _Conn& wconn = *this;
// XXX err ctx // XXX errctx "close conn @%s", wconn.at
error err; // XXX + conn # e.g. from wconn._wlink.id? or wlink.close should include its id itself?
error err, eret;
auto reterr1 = [&eret](const error& err) {
if (eret == nil && err != nil)
eret = err;
};
err = wconn._wlink->close(); err = wconn._wlink->close();
if (err != nil) if (err != nil)
return err; reterr1(err);
wconn._pinCancel(); wconn._pinCancel();
err = wconn._pinWG->wait(); err = wconn._pinWG->wait();
if (err != context::canceled) // canceled - ok if (err != context::canceled) // canceled - ok
return err; reterr1(err);
// close all files - both that have no mappings and that still have opened mappings. // close all files - both that have no mappings and that still have opened mappings.
// XXX after file is closed mappings continue to survive, but we can no // XXX after file is closed mappings continue to survive, but we can no
// longer maintain consistent view. // longer maintain consistent view.
// XXX change mapping to something that gives EFAULT on access?
wconn._filehmu.lock(); wconn._filehmu.lock();
defer([&]() { defer([&]() {
wconn._filehmu.unlock(); wconn._filehmu.unlock();
...@@ -129,14 +135,16 @@ error _Conn::close() { ...@@ -129,14 +135,16 @@ error _Conn::close() {
for (auto _ : wconn._filehtab) { for (auto _ : wconn._filehtab) {
auto f = _.second; auto f = _.second;
f->_headf->close(); // XXX err err = f->_headf->close();
if (err != nil)
reterr1(err);
f->_headf = nil; f->_headf = nil;
// XXX stop watching f // XXX stop watching f
} }
wconn._filehtab.clear(); wconn._filehtab.clear();
return nil; return eret;
} }
// _pinner receives pin messages from wcfs and adjusts wconn mappings. // _pinner receives pin messages from wcfs and adjusts wconn mappings.
...@@ -184,7 +192,7 @@ void _Conn::_pin1(PinReq *req) { ...@@ -184,7 +192,7 @@ void _Conn::_pin1(PinReq *req) {
string ack = "ack"; string ack = "ack";
if (err != nil) if (err != nil)
ack = fmt::sprintf("nak: %s", err->Error().c_str()); ack = fmt::sprintf("nak: %s", err->Error().c_str());
wconn._wlink->replyReq(context::background(), req, ack); // XXX ctx ok? wconn._wlink->replyReq(context::background(), req, ack); // XXX ctx ok?
} }
error _Conn::__pin1(PinReq *req) { error _Conn::__pin1(PinReq *req) {
...@@ -197,7 +205,7 @@ error _Conn::__pin1(PinReq *req) { ...@@ -197,7 +205,7 @@ error _Conn::__pin1(PinReq *req) {
if (!ok) { if (!ok) {
wconn._filehmu.unlock(); wconn._filehmu.unlock();
// XXX err = we are not watching the file - why wcfs sent us this update? // XXX err = we are not watching the file - why wcfs sent us this update?
return fmt::errorf("err TODO"); return fmt::errorf("err TODO");
} }
......
...@@ -125,7 +125,7 @@ public: ...@@ -125,7 +125,7 @@ public:
private: private:
error _pinner(context::Context ctx); error _pinner(context::Context ctx);
void _pin1(PinReq *req); void _pin1(PinReq *req);
error __pin1(PinReq *req); error __pin1(PinReq *req);
}; };
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
namespace wcfs { namespace wcfs {
// v mimics %v for error
// XXX temp, place, ok=? // XXX temp, place, ok=?
const char *v(error err) { const char *v(error err) {
if (err != nil) if (err != nil)
...@@ -47,6 +48,7 @@ void _WatchLink::decref() { ...@@ -47,6 +48,7 @@ void _WatchLink::decref() {
// _openwatch opens new watch link on wcfs. // _openwatch opens new watch link on wcfs.
pair<WatchLink, error> WCFS::_openwatch() { pair<WatchLink, error> WCFS::_openwatch() {
WCFS *wc = this; WCFS *wc = this;
// XXX errctx += "wcfs %s: openwatch", wc.mountpoint ?
// head/watch handle. // head/watch handle.
os::File f; os::File f;
...@@ -95,7 +97,7 @@ error _WatchLink::closeWrite() { ...@@ -95,7 +97,7 @@ error _WatchLink::closeWrite() {
// close closes the link. // close closes the link.
error _WatchLink::close() { error _WatchLink::close() {
_WatchLink& wlink = *this; _WatchLink& wlink = *this;
// XXX err ctx? // XXX errctx?
error err = wlink.closeWrite(); error err = wlink.closeWrite();
wlink._serveCancel(); wlink._serveCancel();
...@@ -249,7 +251,7 @@ error _WatchLink::_write(const string &pkt) { ...@@ -249,7 +251,7 @@ error _WatchLink::_write(const string &pkt) {
// XXX -> reply | None when EOF // XXX -> reply | None when EOF
pair<string, error> _WatchLink::sendReq(context::Context ctx, const string &req) { pair<string, error> _WatchLink::sendReq(context::Context ctx, const string &req) {
_WatchLink *wlink = this; _WatchLink *wlink = this;
// XXX err ctx // XXX errctx
//printf("wlink sendReq '%s'\n", req.c_str()); //printf("wlink sendReq '%s'\n", req.c_str());
...@@ -278,7 +280,7 @@ pair<string, error> _WatchLink::sendReq(context::Context ctx, const string &req) ...@@ -278,7 +280,7 @@ pair<string, error> _WatchLink::sendReq(context::Context ctx, const string &req)
tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, const string &req) { tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, const string &req) {
_WatchLink *wlink = this; _WatchLink *wlink = this;
// XXX err ctx? // XXX errctx?
wlink->_txmu.lock(); // XXX -> atomic (currently uses arbitrary lock) wlink->_txmu.lock(); // XXX -> atomic (currently uses arbitrary lock)
StreamID stream = wlink->_req_next; StreamID stream = wlink->_req_next;
...@@ -317,7 +319,7 @@ tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, cons ...@@ -317,7 +319,7 @@ tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, cons
// XXX document EOF. // XXX document EOF.
error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string& answer) { error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string& answer) {
_WatchLink *wlink = this; _WatchLink *wlink = this;
// XXX err ctx? // XXX errctx?
//print('C: reply %s <- %r ...' % (req, answer)) //print('C: reply %s <- %r ...' % (req, answer))
wlink->_rxmu.lock(); wlink->_rxmu.lock();
...@@ -345,7 +347,7 @@ error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string ...@@ -345,7 +347,7 @@ error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string
static error _parsePinReq(PinReq *pin, const rxPkt *pkt); static error _parsePinReq(PinReq *pin, const rxPkt *pkt);
error _WatchLink::recvReq(context::Context ctx, PinReq *prx) { error _WatchLink::recvReq(context::Context ctx, PinReq *prx) {
_WatchLink& wlink = *this; _WatchLink& wlink = *this;
// XXX err ctx? // XXX errctx?
rxPkt pkt; rxPkt pkt;
bool ok; bool ok;
...@@ -364,7 +366,7 @@ error _WatchLink::recvReq(context::Context ctx, PinReq *prx) { ...@@ -364,7 +366,7 @@ error _WatchLink::recvReq(context::Context ctx, PinReq *prx) {
// _parsePinReq parses message into PinReq according to wcfs invalidation protocol. // _parsePinReq parses message into PinReq according to wcfs invalidation protocol.
static error _parsePinReq(PinReq *pin, const rxPkt *pkt) { static error _parsePinReq(PinReq *pin, const rxPkt *pkt) {
// XXX err ctx "bad pin" // XXX errctx "bad pin"
//printf("parse pinreq: stream=%lu msg='%s'\n", pkt->stream, &pkt->data[0]); //printf("parse pinreq: stream=%lu msg='%s'\n", pkt->stream, &pkt->data[0]);
pin->stream = pkt->stream; pin->stream = pkt->stream;
string msg = pkt->to_string(); string msg = pkt->to_string();
......
...@@ -1802,7 +1802,7 @@ def test_wcfs_virtmem(): ...@@ -1802,7 +1802,7 @@ def test_wcfs_virtmem():
assert pinned(fh) == {3:at1} assert pinned(fh) == {3:at1}
tm1.assertBlk(2, 'c2', { 3:at1}) tm1.assertBlk(2, 'c2', { 3:at1})
tm1.assertBlk(3, 'd1', { 3:at1}) tm1.assertBlk(3, 'd1', { 3:at1})
tm1.assertBlk(4, '', { 3:at1, 4:at0}) # XXX 4->ø ? tm1.assertBlk(4, '', { 3:at1, 4:at0}) # XXX at0->ø ?
# resync at2 -> at3: #3 must unpin to @head; #4 - start to read with data # resync at2 -> at3: #3 must unpin to @head; #4 - start to read with data
wconn.resync(at3) wconn.resync(at3)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment