Commit a41fe05a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d41689f6
...@@ -257,10 +257,10 @@ error _Conn::close() { ...@@ -257,10 +257,10 @@ error _Conn::close() {
}; };
bool alreadyClosed = false; bool alreadyClosed = false;
wconn._mu.Lock(); wconn._filehMu.Lock();
alreadyClosed = (wconn._downErr == errConnClosed); alreadyClosed = (wconn._downErr == errConnClosed);
wconn._downErr = errConnClosed; wconn._downErr = errConnClosed;
wconn._mu.Unlock(); wconn._filehMu.Unlock();
if (alreadyClosed) if (alreadyClosed)
return nil; return nil;
...@@ -279,9 +279,9 @@ error _Conn::close() { ...@@ -279,9 +279,9 @@ error _Conn::close() {
// NOTE after file is closed mappings could continue to survive, but we can no // NOTE after file is closed mappings could continue to survive, but we can no
// longer maintain consistent view. For this reason we change mappings to // longer maintain consistent view. For this reason we change mappings to
// something that gives EFAULT on access. XXX implement // something that gives EFAULT on access. XXX implement
wconn._mu.Lock(); wconn._filehMu.Lock();
defer([&]() { defer([&]() {
wconn._mu.Unlock(); wconn._filehMu.Unlock();
}); });
// XXX f locking // XXX f locking
...@@ -319,13 +319,13 @@ error _Conn::_pinner(context::Context ctx) { ...@@ -319,13 +319,13 @@ error _Conn::_pinner(context::Context ctx) {
// mark the connection non-operational if the pinner fails // mark the connection non-operational if the pinner fails
// XXX deadlock wrt resync? (who read-locks wconn.mu) // XXX deadlock wrt resync? (who read-locks wconn.mu)
// XXX -> mu -> downMu ? // XXX -> mu -> downMu ?
wconn._mu.Lock(); // XXX locking ok? -> merge into below where lock is held? wconn._filehMu.Lock(); // XXX locking ok? -> merge into below where lock is held?
if (wconn._downErr == nil) { if (wconn._downErr == nil) {
wconn._downErr = fmt::errorf("no longer operational due to: %w", wconn._downErr = fmt::errorf("no longer operational due to: %w",
err != nil ? err : fmt::errorf("pinner exit")); err != nil ? err : fmt::errorf("pinner exit"));
// XXX make all fileh and mapping invalid. // XXX make all fileh and mapping invalid.
} }
wconn._mu.Unlock(); wconn._filehMu.Unlock();
return err; return err;
} }
...@@ -342,9 +342,9 @@ error _Conn::__pinner(context::Context ctx) { ...@@ -342,9 +342,9 @@ error _Conn::__pinner(context::Context ctx) {
if (err != nil) { if (err != nil) {
// it is ok if we receive EOF due to us (client) closing the connection // it is ok if we receive EOF due to us (client) closing the connection
if (err == io::EOF_) { if (err == io::EOF_) {
wconn._mu.RLock(); wconn._filehMu.RLock();
err = (wconn._downErr == errConnClosed) ? nil : io::ErrUnexpectedEOF; err = (wconn._downErr == errConnClosed) ? nil : io::ErrUnexpectedEOF;
wconn._mu.RUnlock(); wconn._filehMu.RUnlock();
} }
return E(err); return E(err);
} }
...@@ -388,19 +388,19 @@ error _Conn::__pin1(PinReq *req) { ...@@ -388,19 +388,19 @@ error _Conn::__pin1(PinReq *req) {
}); });
// XXX deadlock wrt Conn.resync which locks wconn.mu and does "watch" ? // XXX deadlock wrt Conn.resync which locks wconn.mu and does "watch" ?
wconn._mu.RLock(); wconn._filehMu.RLock();
// XXX +incref f, so that simultaneous close does not remove f from wconn.filehTab ? // XXX +incref f, so that simultaneous close does not remove f from wconn.filehTab ?
// XXX or just make FileH.close lock f too to synchronize with pinner? // XXX or just make FileH.close lock f too to synchronize with pinner?
tie(f, ok) = wconn._filehTab.get_(req->foid); tie(f, ok) = wconn._filehTab.get_(req->foid);
if (!ok) { if (!ok) {
wconn._mu.RUnlock(); wconn._filehMu.RUnlock();
// why wcfs sent us this update? // why wcfs sent us this update?
return fmt::errorf("unexpected pin: f<%s> not watched", v(req->foid)); return fmt::errorf("unexpected pin: f<%s> not watched", v(req->foid));
} }
// XXX <- f._openReady ? // XXX <- f._openReady ?
wconn._mu.RUnlock(); // XXX maybe `f.mu.lock() -> wconn.mu.unlock()` to avoid race with FileH close? wconn._filehMu.RUnlock(); // XXX maybe `f.mu.lock() -> wconn.mu.unlock()` to avoid race with FileH close?
f->_mu.lock(); f->_mu.lock();
defer([&]() { defer([&]() {
f->_mu.unlock(); f->_mu.unlock();
...@@ -441,7 +441,7 @@ error _Conn::__pin1(PinReq *req) { ...@@ -441,7 +441,7 @@ error _Conn::__pin1(PinReq *req) {
// on error don't need to continue with other mappings - all fileh and // on error don't need to continue with other mappings - all fileh and
// all mappings become marked invalid on pinner failure. // all mappings become marked invalid on pinner failure.
// XXX all call wconn._down from here under wconn._mu lock? // XXX all call wconn._down from here under wconn._filehMu lock?
if (err != nil) if (err != nil)
return err; return err;
...@@ -485,13 +485,13 @@ error _Conn::resync(zodb::Tid at) { ...@@ -485,13 +485,13 @@ error _Conn::resync(zodb::Tid at) {
// particular _pinner_, from running and mutating files and mappings. // particular _pinner_, from running and mutating files and mappings.
// //
// NOTE we'll relock atMu as R in the second part of resync, so we prelock // NOTE we'll relock atMu as R in the second part of resync, so we prelock
// wconn._mu.R as well while under atMu.W, to be sure that set of opened // wconn._filehMu.R as well while under atMu.W, to be sure that set of opened
// files stays the same during whole resync. // files stays the same during whole resync.
bool atMuWLocked = true; bool atMuWLocked = true;
wconn._atMu.Lock(); wconn._atMu.Lock();
wconn._mu.RLock(); wconn._filehMu.RLock();
defer([&]() { defer([&]() {
wconn._mu.RUnlock(); wconn._filehMu.RUnlock();
if (atMuWLocked) if (atMuWLocked)
wconn._atMu.Unlock(); wconn._atMu.Unlock();
else else
...@@ -511,7 +511,7 @@ error _Conn::resync(zodb::Tid at) { ...@@ -511,7 +511,7 @@ error _Conn::resync(zodb::Tid at) {
// set new wconn.at early, so that e.g. Conn.open running simultaneously // set new wconn.at early, so that e.g. Conn.open running simultaneously
// to second part of resync (see below) uses new at. // to second part of resync (see below) uses new at.
// XXX no need since wconn._mu is locked? -> no - it is *needed* after wconn.mu became RWMutex // XXX no need since wconn._filehMu is locked? -> no - it is *needed* after wconn.mu became RWMutex
wconn.at = at; wconn.at = at;
// go through all files opened under wconn and pre-adjust their mappings // go through all files opened under wconn and pre-adjust their mappings
...@@ -569,15 +569,15 @@ error _Conn::resync(zodb::Tid at) { ...@@ -569,15 +569,15 @@ error _Conn::resync(zodb::Tid at) {
// - we need to hold atMu.R to avoid race wrt e.g. other resync which changes at. // - we need to hold atMu.R to avoid race wrt e.g. other resync which changes at.
// - we cannot just do regular `atMu.Unlock + atMu.RLock()` because then // - we cannot just do regular `atMu.Unlock + atMu.RLock()` because then
// there is e.g. a race window in between Unlock and RLock where wconn.at can be changed. // there is e.g. a race window in between Unlock and RLock where wconn.at can be changed.
// XXX also deadlock, because it will become wconn._mu.lock + wconn._atMu lock // XXX also deadlock, because it will become wconn._filehMu.lock + wconn._atMu lock
// //
// Now other calls, e.g. Conn.open, can be running simultaneously to us, // Now other calls, e.g. Conn.open, can be running simultaneously to us,
// but since we already set wconn.at to new value it is ok. For example // but since we already set wconn.at to new value it is ok. For example
// Conn.open, for not-yet-opened file, will use new at to send "watch". // Conn.open, for not-yet-opened file, will use new at to send "watch".
// XXX ^^^ not possible since wconn._mu is locked ? // XXX ^^^ not possible since wconn._filehMu is locked ?
// -> no, possible, wconn._mu became RWMutex // -> no, possible, wconn._filehMu became RWMutex
// //
// XXX we are still holding wconn._mu.R, so wconn._filehTab is the // XXX we are still holding wconn._filehMu.R, so wconn._filehTab is the
// same as in previous pass above. // same as in previous pass above.
wconn._atMu.UnlockToRLock(); wconn._atMu.UnlockToRLock();
atMuWLocked = false; atMuWLocked = false;
...@@ -592,7 +592,7 @@ error _Conn::resync(zodb::Tid at) { ...@@ -592,7 +592,7 @@ error _Conn::resync(zodb::Tid at) {
zodb::Oid foid = fit.first; zodb::Oid foid = fit.first;
FileH f = fit.second; FileH f = fit.second;
// XXX locking ok? (protected by wconn._mu ?) // XXX locking ok? (protected by wconn._filehMu ?)
if (f->_state != _FileHOpened) if (f->_state != _FileHOpened)
continue; continue;
...@@ -623,11 +623,11 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) { ...@@ -623,11 +623,11 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) {
xerr::Contextf E("%s: open f<%s>", v(wconn), v(foid)); xerr::Contextf E("%s: open f<%s>", v(wconn), v(foid));
wconn._mu.Lock(); wconn._filehMu.Lock();
if (wconn._downErr != nil) { if (wconn._downErr != nil) {
err = wconn._downErr; err = wconn._downErr;
wconn._mu.Unlock(); wconn._filehMu.Unlock();
return make_pair(nil, E(err)); return make_pair(nil, E(err));
} }
...@@ -643,7 +643,7 @@ retry: ...@@ -643,7 +643,7 @@ retry:
} else { } else {
closing = true; closing = true;
} }
wconn._mu.Unlock(); wconn._filehMu.Unlock();
// if the file was closing|closed, we should wait for the close to // if the file was closing|closed, we should wait for the close to
// complete and retry the open. // complete and retry the open.
...@@ -663,7 +663,7 @@ retry: ...@@ -663,7 +663,7 @@ retry:
return make_pair(f, nil); return make_pair(f, nil);
} }
// create "opening" FileH entry and perform open with wconn._mu released // create "opening" FileH entry and perform open with wconn._filehMu released
// NOTE wconn._atMu.R is still held because FileH._open relies on wconn.at being stable. // NOTE wconn._atMu.R is still held because FileH._open relies on wconn.at being stable.
f = adoptref(new _FileH()); f = adoptref(new _FileH());
f->wconn = newref(&wconn); f->wconn = newref(&wconn);
...@@ -680,9 +680,9 @@ retry: ...@@ -680,9 +680,9 @@ retry:
bool retok = false; bool retok = false;
wconn._filehTab[foid] = f; wconn._filehTab[foid] = f;
defer([&]() { defer([&]() {
wconn._mu.Lock(); wconn._filehMu.Lock();
if (wconn._filehTab.get(foid) != f) { if (wconn._filehTab.get(foid) != f) {
wconn._mu.Unlock(); wconn._filehMu.Unlock();
panic("BUG: wconn.open: wconn.filehTab[foid] mutated while file open was in progress"); panic("BUG: wconn.open: wconn.filehTab[foid] mutated while file open was in progress");
} }
if (!retok) { if (!retok) {
...@@ -691,10 +691,10 @@ retry: ...@@ -691,10 +691,10 @@ retry:
} else { } else {
f->_state = _FileHOpened; f->_state = _FileHOpened;
} }
wconn._mu.Unlock(); wconn._filehMu.Unlock();
f->_openReady.close(); f->_openReady.close();
}); });
wconn._mu.Unlock(); wconn._filehMu.Unlock();
f->_openErr = f->_open(); f->_openErr = f->_open();
if (f->_openErr != nil) if (f->_openErr != nil)
...@@ -760,9 +760,9 @@ error _FileH::close() { ...@@ -760,9 +760,9 @@ error _FileH::close() {
Conn wconn = fileh.wconn; Conn wconn = fileh.wconn;
wconn->_atMu.RLock(); wconn->_atMu.RLock();
wconn->_mu.Lock() wconn->_filehMu.Lock();
defer([&]() { defer([&]() {
wconn->_mu.Unlock(); wconn->_filehMu.Unlock();
wconn->_atMu.RUnlock(); wconn->_atMu.RUnlock();
}); });
...@@ -785,7 +785,7 @@ error _FileH::close() { ...@@ -785,7 +785,7 @@ error _FileH::close() {
// unlock wconn.mu to stop watching outside of this lock. // unlock wconn.mu to stop watching outside of this lock.
// we'll relock it again before updating wconn.filehTab. // we'll relock it again before updating wconn.filehTab.
wconn->_mu.Unlock(); wconn->_filehMu.Unlock();
error err, eret; error err, eret;
...@@ -806,8 +806,8 @@ error _FileH::close() { ...@@ -806,8 +806,8 @@ error _FileH::close() {
reterr1(fmt::errorf("unwatch: %s", v(ack))); reterr1(fmt::errorf("unwatch: %s", v(ack)));
// relock wconn._mu again and remove fileh from wconn._filehTab // relock wconn._filehMu again and remove fileh from wconn._filehTab
wconn->_mu.Lock(); wconn->_filehMu.Lock();
if (wconn->_filehTab.get(fileh.foid)._ptr() != &fileh) if (wconn->_filehTab.get(fileh.foid)._ptr() != &fileh)
panic("BUG: fileh.close: wconn.filehTab[fileh.foid] != fileh"); panic("BUG: fileh.close: wconn.filehTab[fileh.foid] != fileh");
wconn->_filehTab.erase(fileh.foid); wconn->_filehTab.erase(fileh.foid);
......
...@@ -181,7 +181,7 @@ struct _Conn : object { ...@@ -181,7 +181,7 @@ struct _Conn : object {
sync::RWMutex _atMu; sync::RWMutex _atMu;
zodb::Tid at; zodb::Tid at;
sync::RWMutex _mu; // _atMu.W | _atMu.R + _mu XXX -> _filehMu ? -> y sync::RWMutex _filehMu; // _atMu.W | _atMu.R + _filehMu
error _downErr; // !nil if connection is closed or no longer operational error _downErr; // !nil if connection is closed or no longer operational
dict<zodb::Oid, FileH> _filehTab; // {} foid -> fileh dict<zodb::Oid, FileH> _filehTab; // {} foid -> fileh
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment