Commit e6e6644a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent efdf1420
...@@ -118,6 +118,12 @@ static global<error> errConnClosed = errors::New("connection closed"); ...@@ -118,6 +118,12 @@ static global<error> errConnClosed = errors::New("connection closed");
// opened fileh and mappings become invalid to use except close and unmap. // opened fileh and mappings become invalid to use except close and unmap.
error _Conn::close() { error _Conn::close() {
_Conn& wconn = *this; _Conn& wconn = *this;
wconn._atMu.RLock();
defer([&]() {
wconn._atMu.RUnlock();
});
xerr::Contextf E("wcfs %s: close conn @%s", v(wconn._wc->mountpoint), v(wconn.at)); xerr::Contextf E("wcfs %s: close conn @%s", v(wconn._wc->mountpoint), v(wconn.at));
// XXX + conn # e.g. from wconn._wlink.id? or wlink.close should include its id itself? // XXX + conn # e.g. from wconn._wlink.id? or wlink.close should include its id itself?
// (or ._wlink._f.fd() ?) // (or ._wlink._f.fd() ?)
...@@ -128,15 +134,18 @@ error _Conn::close() { ...@@ -128,15 +134,18 @@ error _Conn::close() {
eret = err; eret = err;
}; };
wconn._downMu.lock(); wconn._mu.lock();
defer([&]() {
wconn._mu.unlock();
});
wconn._downErr = errConnClosed; // XXX ok to change even if it was !nil before? wconn._downErr = errConnClosed; // XXX ok to change even if it was !nil before?
wconn._downMu.unlock();
err = wconn._wlink->close(); err = wconn._wlink->close(); // XXX ok under mu?
if (err != nil) if (err != nil)
reterr1(err); reterr1(err);
wconn._pinCancel(); wconn._pinCancel();
err = wconn._pinWG->wait(); err = wconn._pinWG->wait(); // XXX ok under mu?
if (!errors::Is(err, context::canceled)) // canceled - ok if (!errors::Is(err, context::canceled)) // canceled - ok
reterr1(err); reterr1(err);
...@@ -145,10 +154,6 @@ error _Conn::close() { ...@@ -145,10 +154,6 @@ error _Conn::close() {
// NOTE after file is closed mappings could continue to survive, but we can no // NOTE after file is closed mappings could continue to survive, but we can no
// longer maintain consistent view. For this reason we change mappings to // longer maintain consistent view. For this reason we change mappings to
// something that gives EFAULT on access. XXX implement // something that gives EFAULT on access. XXX implement
wconn._filehmu.lock();
defer([&]() {
wconn._filehmu.unlock();
});
for (auto _ : wconn._filehtab) { for (auto _ : wconn._filehtab) {
auto f = _.second; auto f = _.second;
...@@ -157,7 +162,7 @@ error _Conn::close() { ...@@ -157,7 +162,7 @@ error _Conn::close() {
reterr1(err); reterr1(err);
f->_headf = nil; f->_headf = nil;
// XXX stop watching f // XXX stop watching f XXX ok under mu?
} }
wconn._filehtab.clear(); wconn._filehtab.clear();
...@@ -182,12 +187,12 @@ error _Conn::_pinner(context::Context ctx) { ...@@ -182,12 +187,12 @@ error _Conn::_pinner(context::Context ctx) {
} }
// mark the connection non-operational if the pinner fails // mark the connection non-operational if the pinner fails
wconn._downMu.lock(); wconn._mu.lock(); // XXX locking ok? -> merge into below where lock is held? XXX + atMu.R ?
if (wconn._downErr == nil) { if (wconn._downErr == nil) {
wconn._downErr = fmt::errorf("no longer operational due to: %w", err); wconn._downErr = fmt::errorf("no longer operational due to: %w", err);
// XXX make all fileh and mapping invalid. // XXX make all fileh and mapping invalid.
} }
wconn._downMu.unlock(); wconn._mu.unlock();
return err; return err;
} }
...@@ -204,9 +209,9 @@ error _Conn::__pinner(context::Context ctx) { ...@@ -204,9 +209,9 @@ error _Conn::__pinner(context::Context ctx) {
if (err != nil) { if (err != nil) {
// it is ok if we receive EOF due to us closing the connection // it is ok if we receive EOF due to us closing the connection
if (err == io::EOF_) { if (err == io::EOF_) {
wconn._downMu.lock(); wconn._mu.lock();
err = (wconn._downErr == errConnClosed) ? nil : io::ErrUnexpectedEOF; err = (wconn._downErr == errConnClosed) ? nil : io::ErrUnexpectedEOF;
wconn._downMu.unlock(); wconn._mu.unlock();
} }
return E(err); return E(err);
} }
...@@ -230,7 +235,8 @@ error _Conn::_pin1(PinReq *req) { ...@@ -230,7 +235,8 @@ error _Conn::_pin1(PinReq *req) {
string ack = "ack"; string ack = "ack";
if (err != nil) if (err != nil)
ack = fmt::sprintf("nak: %s", v(err)); ack = fmt::sprintf("nak: %s", v(err));
error err2 = wconn._wlink->replyReq(context::background(), req, ack); // XXX ctx ok? // NOTE ctx=bg to always send reply even if we are canceled
error err2 = wconn._wlink->replyReq(context::background(), req, ack);
if (err == nil) if (err == nil)
err = err2; err = err2;
...@@ -242,22 +248,28 @@ error _Conn::__pin1(PinReq *req) { ...@@ -242,22 +248,28 @@ error _Conn::__pin1(PinReq *req) {
FileH f; FileH f;
bool ok; bool ok;
wconn._atMu.RLock(); // XXX deadlock wrt Conn.open, Conn.resync ?
defer([&]() {
wconn._atMu.RUnlock();
});
// XXX deadlock wrt Conn.open which locks wconn.filehmu and starts initial "watch" // XXX deadlock wrt Conn.open which locks wconn.filehmu and starts initial "watch"
wconn._filehmu.lock(); wconn._mu.lock();
// XXX +incref f, so that simultaneous close does not remove f from wconn.filehTab ? // XXX +incref f, so that simultaneous close does not remove f from wconn.filehTab ?
// XXX or just make FileH.close lock f too to synchronizw with pinner?
tie(f, ok) = wconn._filehtab.get_(req->foid); tie(f, ok) = wconn._filehtab.get_(req->foid);
if (!ok) { if (!ok) {
wconn._filehmu.unlock(); wconn._mu.unlock();
// why wcfs sent us this update? // why wcfs sent us this update?
return fmt::errorf("unexpected pin: f<%s> not watched", v(req->foid)); return fmt::errorf("unexpected pin: f<%s> not watched", v(req->foid));
} }
// XXX relock wconn -> f wconn._mu.unlock(); // XXX maybe `f.mu.lock() -> wconn.mu.unlock()` to avoid race with FileH close?
// wconn.mu.unlock() f._mu.lock();
// f.mu.lock() defer([&]() {
// f._mu.unlock();
// XXX (?) NOTE: _not_ taking wconn.atMu at all (think more / explain why) });
// (e.g. deadlock with Conn.resync (wlocks atMu))
for (auto mmap : f->_mmaps) { // TODO use ↑blk_start for binary search for (auto mmap : f->_mmaps) { // TODO use ↑blk_start for binary search
if (!(mmap->blk_start <= req->blk && req->blk < mmap->blk_stop())) if (!(mmap->blk_start <= req->blk && req->blk < mmap->blk_stop()))
...@@ -302,8 +314,6 @@ error _Conn::__pin1(PinReq *req) { ...@@ -302,8 +314,6 @@ error _Conn::__pin1(PinReq *req) {
} }
// update f._pinned // update f._pinned
// XXX do it before ^^^ remmapblk (so that e.g. concurrent
// discard/writeout see correct f._pinned) ?
if (req->at == TidHead) { if (req->at == TidHead) {
f->_pinned.erase(req->blk); // unpin to @head f->_pinned.erase(req->blk); // unpin to @head
} }
...@@ -311,7 +321,6 @@ error _Conn::__pin1(PinReq *req) { ...@@ -311,7 +321,6 @@ error _Conn::__pin1(PinReq *req) {
f->_pinned[req->blk] = req->at; f->_pinned[req->blk] = req->at;
} }
wconn._filehmu.unlock();
return nil; return nil;
} }
...@@ -324,30 +333,60 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) { ...@@ -324,30 +333,60 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) {
// XXX wconn._atMu.RLock() // XXX wconn._atMu.RLock()
// XXX defer wconn._atMu.RUnlock() // XXX defer wconn._atMu.RUnlock()
wconn._filehmu.lock(); wconn._mu.lock();
defer([&]() {
wconn._filehmu.unlock();
});
if (wconn._downErr != nil) // XXX under filehmu or downMu ? if (wconn._downErr != nil) {
wconn._mu.unlock();
return make_pair(nil, E(wconn._downErr)); return make_pair(nil, E(wconn._downErr));
}
FileH f; bool ok; FileH f; bool ok;
tie(f, ok) = wconn._filehtab.get_(foid); tie(f, ok) = wconn._filehtab.get_(foid);
if (ok) { if (ok) {
wconn._mu.unlock();
f._openReady.recv();
if (f._openErr != nil)
return make_pair(nil, E(f._openErr));
// XXX incref open count // XXX incref open count
return make_pair(f, nil); return make_pair(f, nil);
} }
// TODO perform open with filehmu released // create in-flight-opening FileH entry and perform open with wconn._mu released
f = adoptref(new _FileH()); f = adoptref(new _FileH());
f->wconn = newref(&wconn); f->wconn = newref(&wconn);
f->foid = foid; f->foid = foid;
f->_openReady = makechan<structZ>();
f->_openErr = nil;
bool retok = false;
wconn._filehtab[foid] = f;
defer([&]() {
if (!retok) {
wconn._mu.lock();
wconn._filehtab.erase(foid);
wconn._mu.unlock();
}
f._openReady.close();
});
wconn._mu.unlock();
f._openErr = f._open();
if (f._openErr != nil)
return make_pair(nil, E(f._openErr));
retok = true;
return make_pair(f, nil);
}
error _FileH::_open() {
_FileH* f = this;
tie(f->_headf, err) tie(f->_headf, err)
= wconn._wc->_open(fmt::sprintf("head/bigfile/%s", v(foid))); = wconn._wc->_open(fmt::sprintf("head/bigfile/%s", v(foid)));
if (err != nil) if (err != nil)
return make_pair(nil, E(err)); return make_pair(nil, err);
bool retok = false; bool retok = false;
defer([&]() { defer([&]() {
...@@ -358,26 +397,19 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) { ...@@ -358,26 +397,19 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) {
struct stat st; struct stat st;
err = f->_headf->stat(&st); err = f->_headf->stat(&st);
if (err != nil) if (err != nil)
return make_pair(nil, E(err)); return make_pair(nil, err);
f->blksize = st.st_blksize; f->blksize = st.st_blksize;
f->_headfsize = st.st_size; f->_headfsize = st.st_size;
if (!(f->_headfsize % f->blksize == 0)) if (!(f->_headfsize % f->blksize == 0))
return make_pair(nil, E(fmt::errorf("wcfs bug: %s size (%d) %% blksize (%d) != 0", return make_pair(nil, fmt::errorf("wcfs bug: %s size (%d) %% blksize (%d) != 0",
v(f->_headf->name()), f->_headfsize, f->blksize))); v(f->_headf->name()), f->_headfsize, f->blksize));
wconn._filehtab[foid] = f;
defer([&]() {
if (!retok)
wconn._filehtab.erase(foid);
});
// start watching f // start watching f
// XXX if we start watching with holding either wconn.filehMu or f.Mu, then // NOTE we are _not_ holding wconn.mu nor f.mu XXX wconn.atMu?
// the pinner will deadlock, trying to take wconn.filehMu or f.Mu
string ack; string ack;
tie(ack, err) = wconn._wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(wconn.at))); tie(ack, err) = wconn._wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(wconn.at)));
if (err != nil) if (err != nil)
return make_pair(nil, E(err)); return make_pair(nil, err);
if (ack != "ok") { if (ack != "ok") {
return make_pair(nil, fmt::errorf("watch: %s", v(ack))); return make_pair(nil, fmt::errorf("watch: %s", v(ack)));
} }
...@@ -616,15 +648,16 @@ error _Conn::resync(zodb::Tid at) { ...@@ -616,15 +648,16 @@ error _Conn::resync(zodb::Tid at) {
// //
// at=TidHead means unpin to head/ . // at=TidHead means unpin to head/ .
// NOTE this does not check whether virtmem already mapped blk as RW. // NOTE this does not check whether virtmem already mapped blk as RW.
//
// The following locks must be held by caller:
// - f.wconn.atMu
// - f._mu
error _Mapping::_remmapblk(int64_t blk, zodb::Tid at) { error _Mapping::_remmapblk(int64_t blk, zodb::Tid at) {
_Mapping *mmap = this; _Mapping *mmap = this;
FileH f = mmap->fileh; FileH f = mmap->fileh;
xerr::Contextf E("wcfs %s: conn @%s: f<%s>: remmapblk #%ld @%s", xerr::Contextf E("wcfs %s: conn @%s: f<%s>: remmapblk #%ld @%s",
v(f->wconn->_wc->mountpoint), v(f->wconn->at), v(f->foid), blk, v(at)); v(f->wconn->_wc->mountpoint), v(f->wconn->at), v(f->foid), blk, v(at));
// XXX locking done by callers (document)
// XXX cannot use wconn.at ^^^ because pinner does not lock wconn.atMu at all?
ASSERT(mmap->blk_start <= blk && blk < mmap->blk_stop()); ASSERT(mmap->blk_start <= blk && blk < mmap->blk_stop());
error err; error err;
...@@ -677,16 +710,23 @@ error _Mapping::_remmapblk(int64_t blk, zodb::Tid at) { ...@@ -677,16 +710,23 @@ error _Mapping::_remmapblk(int64_t blk, zodb::Tid at) {
// RW dirty page was e.g. discarded. // RW dirty page was e.g. discarded.
error _Mapping::remmap_blk(int64_t blk) { error _Mapping::remmap_blk(int64_t blk) {
_Mapping& mmap = *this; _Mapping& mmap = *this;
FileH f = mmap->fileh;
// NOTE virtmem lock is held by virtmem caller // NOTE virtmem lock is held by virtmem caller
// XXX locking // XXX locking ok?
f.wconn._atMu.RLock();
f._mu.lock();
defer([&]() {
f._mu.unlock();
f.wconn._atMu.RUnlock();
});
if (!(mmap.blk_start <= blk && blk < mmap.blk_stop())) if (!(mmap.blk_start <= blk && blk < mmap.blk_stop()))
panic("remmap_blk: blk out of Mapping range"); panic("remmap_blk: blk out of Mapping range");
// blkrev = rev | @head // blkrev = rev | @head
zodb::Tid blkrev; bool ok; zodb::Tid blkrev; bool ok;
tie(blkrev, ok) = mmap.fileh->_pinned.get_(blk); tie(blkrev, ok) = f->_pinned.get_(blk);
if (!ok) if (!ok)
blkrev = TidHead; blkrev = TidHead;
......
...@@ -174,17 +174,17 @@ struct _Conn : object { ...@@ -174,17 +174,17 @@ struct _Conn : object {
// viewing the database at particular state. .resync write-locks this and // viewing the database at particular state. .resync write-locks this and
// knows noone is using the connection for reading simultaneously. // knows noone is using the connection for reading simultaneously.
// //
// XXX deadlock with pinner? // XXX deadlock with pinner? -> use deadlock-avoidance similar to zwatcher in wcfs.go
// sync::RWMutex _atMu
// sync::RWMutex _atMu
zodb::Tid at; zodb::Tid at;
// XXX kill downMu? (move under filehmu so that e.g. .open() can check downErr without race) // XXX kill downMu? (move under filehmu so that e.g. .open() can check downErr without race)
sync::Mutex _downMu; // sync::Mutex _downMu;
error _downErr; // !nil if connection is closed or no longer operational // error _downErr; // !nil if connection is closed or no longer operational
sync::Mutex _filehmu; // _atMu.W | _atMu.R + _filehMu sync::Mutex _mu; // _atMu.W | _atMu.R + _mu
dict<zodb::Oid, FileH> _filehtab; // {} foid -> fileh error _downErr; // !nil if connection is closed or no longer operational
dict<zodb::Oid, FileH> _filehTab; // {} foid -> fileh
sync::WorkGroup _pinWG; // pin/unpin messages from wcfs are served by _pinner sync::WorkGroup _pinWG; // pin/unpin messages from wcfs are served by _pinner
func<void()> _pinCancel; // spawned under _pinWG. func<void()> _pinCancel; // spawned under _pinWG.
...@@ -225,8 +225,11 @@ struct _FileH : object { ...@@ -225,8 +225,11 @@ struct _FileH : object {
size_t blksize; // block size of this file (does not change after fileh open) size_t blksize; // block size of this file (does not change after fileh open)
os::File _headf; // file object of head/file os::File _headf; // file object of head/file
off_t _headfsize; // head/file size is known to be at least headfsize (size ↑=) // head/file size is known to be at least headfsize (size ↑=)
// protected by .wconn._atMu
off_t _headfsize;
sync::Mutex _mu; // atMu.W | atMu.R + _mu
dict<int64_t, zodb::Tid> _pinned; // {} blk -> rev that wcfs already sent us for this file dict<int64_t, zodb::Tid> _pinned; // {} blk -> rev that wcfs already sent us for this file
vector<Mapping> _mmaps; // []Mapping ↑blk_start mappings of this file vector<Mapping> _mmaps; // []Mapping ↑blk_start mappings of this file
...@@ -248,7 +251,7 @@ public: ...@@ -248,7 +251,7 @@ public:
// The mapped memory is [.mem_start, .mem_stop) // The mapped memory is [.mem_start, .mem_stop)
// Use .unmap to release virtual memory resources used by mapping. // Use .unmap to release virtual memory resources used by mapping.
// //
// It is safe to use Mapping from multiple threads simultaneously. // Except unmap, it is safe to use Mapping from multiple threads simultaneously.
typedef refptr<struct _Mapping> Mapping; typedef refptr<struct _Mapping> Mapping;
struct _Mapping : object { struct _Mapping : object {
FileH fileh; FileH fileh;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment