Commit a4a66607 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 10241e3f
...@@ -52,6 +52,7 @@ ...@@ -52,6 +52,7 @@
#include <wendelin/bigfile/virtmem.h> #include <wendelin/bigfile/virtmem.h>
#include <wendelin/bigfile/ram.h> #include <wendelin/bigfile/ram.h>
#include <golang/errors.h>
#include <golang/fmt.h> #include <golang/fmt.h>
#include <algorithm> #include <algorithm>
...@@ -102,6 +103,9 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) { ...@@ -102,6 +103,9 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) {
return make_pair(wconn, nil); return make_pair(wconn, nil);
} }
static global<error> errConnClosed = errors::New("connection closed");
// close releases resources associated with wconn. // close releases resources associated with wconn.
// //
// opened fileh and mappings becomes invalid to use. // opened fileh and mappings becomes invalid to use.
...@@ -118,12 +122,16 @@ error _Conn::close() { ...@@ -118,12 +122,16 @@ error _Conn::close() {
eret = err; eret = err;
}; };
wconn._downMu.lock();
wconn._downErr = errConnClosed; // XXX ok to change even if it was !nil before?
wconn._downMu.unlock();
err = wconn._wlink->close(); err = wconn._wlink->close();
if (err != nil) if (err != nil)
reterr1(err); reterr1(err);
wconn._pinCancel(); wconn._pinCancel();
err = wconn._pinWG->wait(); err = wconn._pinWG->wait();
if (err != context::canceled) // canceled - ok if (err != context::canceled) // canceled - ok XXX check ok?
reterr1(err); reterr1(err);
// close all files - both that have no mappings and that still have opened mappings. // close all files - both that have no mappings and that still have opened mappings.
...@@ -152,6 +160,20 @@ error _Conn::close() { ...@@ -152,6 +160,20 @@ error _Conn::close() {
// _pinner receives pin messages from wcfs and adjusts wconn mappings. // _pinner receives pin messages from wcfs and adjusts wconn mappings.
error _Conn::_pinner(context::Context ctx) { error _Conn::_pinner(context::Context ctx) {
_Conn& wconn = *this;
error err = wconn.__pinner(ctx);
// mark the connection non-operational if the pinner fails
wconn._downMu.lock();
if (wconn._downErr == nil)
wconn._downErr = fmt::errorf("no longer operational due to: %s", v(err));
wconn._downMu.unlock();
return err;
}
error _Conn::__pinner(context::Context ctx) {
_Conn& wconn = *this; _Conn& wconn = *this;
xerr::Contextf E("pinner"); // NOTE pinner error goes to Conn::close who has its own context xerr::Contextf E("pinner"); // NOTE pinner error goes to Conn::close who has its own context
...@@ -177,8 +199,13 @@ error _Conn::_pinner(context::Context ctx) { ...@@ -177,8 +199,13 @@ error _Conn::_pinner(context::Context ctx) {
while (1) { while (1) {
err = wconn._wlink->recvReq(ctx, &req); err = wconn._wlink->recvReq(ctx, &req);
if (err != nil) { if (err != nil) {
// XXX -> err, handle EOF, abort on * // it is ok if we receive EOF due to us closing the connection
return E(err); // XXX ok? (EOF - when wcfs closes wlink) if (err == io::EOF_) {
wconn._downMu.lock();
err = (wconn._downErr == errConnClosed) ? nil : io::ErrUnexpectedEOF;
wconn._downMu.unlock();
}
return E(err);
} }
// we received request to pin/unpin file block. handle it // we received request to pin/unpin file block. handle it
...@@ -273,6 +300,9 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) { ...@@ -273,6 +300,9 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) {
wconn._filehmu.unlock(); wconn._filehmu.unlock();
}); });
if (wconn._downErr != nil)
return make_pair(nil, E(wconn._downErr));
FileH f; bool ok; FileH f; bool ok;
tie(f, ok) = wconn._filehtab.get(foid); tie(f, ok) = wconn._filehtab.get(foid);
if (f != nil) { if (f != nil) {
...@@ -395,6 +425,9 @@ error _Conn::resync(zodb::Tid at) { ...@@ -395,6 +425,9 @@ error _Conn::resync(zodb::Tid at) {
// XXX locking // XXX locking
if (wconn._downErr != nil) // XXX lock with _filehmu
return E(wconn._downErr);
for (auto fit : wconn._filehtab) { for (auto fit : wconn._filehtab) {
zodb::Oid foid = fit.first; zodb::Oid foid = fit.first;
FileH f = fit.second; FileH f = fit.second;
......
...@@ -104,6 +104,9 @@ struct _Conn : object { ...@@ -104,6 +104,9 @@ struct _Conn : object {
zodb::Tid at; zodb::Tid at;
WatchLink _wlink; // watch/receive pins for mappings created under this conn WatchLink _wlink; // watch/receive pins for mappings created under this conn
sync::Mutex _downMu;
error _downErr; // !nil if connection is closed or no longer operational
sync::Mutex _filehmu; sync::Mutex _filehmu;
dict<zodb::Oid, FileH> _filehtab; // {} foid -> fileh dict<zodb::Oid, FileH> _filehtab; // {} foid -> fileh
...@@ -125,6 +128,7 @@ public: ...@@ -125,6 +128,7 @@ public:
private: private:
error _pinner(context::Context ctx); error _pinner(context::Context ctx);
error __pinner(context::Context ctx);
error _pin1(PinReq *req); error _pin1(PinReq *req);
error __pin1(PinReq *req); error __pin1(PinReq *req);
}; };
...@@ -139,7 +143,7 @@ private: ...@@ -139,7 +143,7 @@ private:
typedef refptr<struct _FileH> FileH; typedef refptr<struct _FileH> FileH;
struct _FileH : object { struct _FileH : object {
Conn wconn; Conn wconn;
zodb::Oid foid; // ZBigFile root object ID zodb::Oid foid; // ZBigFile root object ID
size_t blksize; // block size of this file XXX -> off_t ? size_t blksize; // block size of this file XXX -> off_t ?
os::File _headf; // file object of head/file os::File _headf; // file object of head/file
off_t _headfsize; // head/file size is known to be at least headfsize (size ↑=) off_t _headfsize; // head/file size is known to be at least headfsize (size ↑=)
......
...@@ -311,7 +311,7 @@ tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, cons ...@@ -311,7 +311,7 @@ tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, cons
// replyReq sends reply to client <- server request received by recvReq. // replyReq sends reply to client <- server request received by recvReq.
// //
// XXX document EOF. // XXX document EOF. XXX -> no EOF here - only ErrUnexpectedEOF
error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string& answer) { error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string& answer) {
_WatchLink *wlink = this; _WatchLink *wlink = this;
xerr::Contextf E("wlink X: replyReq"); // XXX +wlink details xerr::Contextf E("wlink X: replyReq"); // XXX +wlink details
...@@ -339,6 +339,8 @@ error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string ...@@ -339,6 +339,8 @@ error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string
} }
// recvReq receives client <- server request. // recvReq receives client <- server request.
//
// it returns EOF when server closes the link.
static error _parsePinReq(PinReq *pin, const rxPkt *pkt); static error _parsePinReq(PinReq *pin, const rxPkt *pkt);
error _WatchLink::recvReq(context::Context ctx, PinReq *prx) { error _WatchLink::recvReq(context::Context ctx, PinReq *prx) {
_WatchLink& wlink = *this; _WatchLink& wlink = *this;
......
...@@ -1821,6 +1821,9 @@ def test_wcfs_virtmem(): ...@@ -1821,6 +1821,9 @@ def test_wcfs_virtmem():
# XXX fh close then open again and use # XXX fh close then open again and use
# XXX wconn.open() after wconn.close() -> error
# XXX wconn.resync() after wconn.close() -> error
# ---- misc --- # ---- misc ---
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment