Commit c97596f1 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 158a0c1c
...@@ -47,6 +47,8 @@ ...@@ -47,6 +47,8 @@
// XXX locking -> explain atMu + slaves and refer to "Locking" in wcfs.go // XXX locking -> explain atMu + slaves and refer to "Locking" in wcfs.go
// //
// Conn.atMu > Conn.mu > FileH.mu
//
// XXX link to bigfile/file_zodb.cpp to show how wcfs/client is used for // XXX link to bigfile/file_zodb.cpp to show how wcfs/client is used for
// ZBigFile on client side. // ZBigFile on client side.
...@@ -107,6 +109,11 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) { ...@@ -107,6 +109,11 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) {
return wconn->_pinner(ctx); return wconn->_pinner(ctx);
}); });
// NOTE no need to wait till `wcfs/head/at ≥ at` because Conn.open does it.
// FIXME ^^^ not right - Conn.open waits for wcfs/head/at only in the end
// and stats head/f to get f.headfsize _before_ that.
// -> just wait here.
return make_pair(wconn, nil); return make_pair(wconn, nil);
} }
...@@ -271,6 +278,8 @@ error _Conn::__pin1(PinReq *req) { ...@@ -271,6 +278,8 @@ error _Conn::__pin1(PinReq *req) {
return fmt::errorf("unexpected pin: f<%s> not watched", v(req->foid)); return fmt::errorf("unexpected pin: f<%s> not watched", v(req->foid));
} }
// XXX <- f._openReady ?
wconn._mu.unlock(); // XXX maybe `f.mu.lock() -> wconn.mu.unlock()` to avoid race with FileH close? wconn._mu.unlock(); // XXX maybe `f.mu.lock() -> wconn.mu.unlock()` to avoid race with FileH close?
f->_mu.lock(); f->_mu.lock();
defer([&]() { defer([&]() {
...@@ -417,13 +426,14 @@ error _FileH::_open() { ...@@ -417,13 +426,14 @@ error _FileH::_open() {
if (err != nil) if (err != nil)
return err; return err;
f->blksize = st.st_blksize; f->blksize = st.st_blksize;
f->_headfsize = st.st_size; f->_headfsize = st.st_size; // FIXME getting headfsize _before_ waiting for wcfs/head/at ≥ wconn.at
if (!(f->_headfsize % f->blksize == 0)) if (!(f->_headfsize % f->blksize == 0))
return fmt::errorf("wcfs bug: %s size (%d) %% blksize (%d) != 0", return fmt::errorf("wcfs bug: %s size (%d) %% blksize (%d) != 0",
v(f->_headf->name()), f->_headfsize, f->blksize); v(f->_headf->name()), f->_headfsize, f->blksize);
// start watching f // start watching f
// NOTE we are _not_ holding wconn.mu nor f.mu - only wconn.atMu to rely on wconn.at being stable. // NOTE we are _not_ holding wconn.mu nor f.mu - only wconn.atMu to rely on wconn.at being stable.
// NOTE wcfs will reply "ok" only after wcfs/head/at ≥ wconn.at
string ack; string ack;
tie(ack, err) = wconn->_wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(wconn->at))); tie(ack, err) = wconn->_wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(wconn->at)));
if (err != nil) if (err != nil)
...@@ -585,34 +595,58 @@ error _Mapping::unmap() { ...@@ -585,34 +595,58 @@ error _Mapping::unmap() {
// resync resyncs connection and its mappings onto different database view. // resync resyncs connection and its mappings onto different database view.
// XXX place=? // XXX place=? -> closer to pinner & connect
error _Conn::resync(zodb::Tid at) { error _Conn::resync(zodb::Tid at) {
_Conn& wconn = *this; _Conn& wconn = *this;
// XXX locking // FIXME first wait for wcfs/head to be >= at
// wconn.atMu.Lock()
// defer wconn.atMu.Unlock() // write-lock wconn._atMu . This excludes everything else, and in
// particular _pinner_, from running and mutating files and mappings.
//
// NOTE we'll relock atMu as R in the second part of resync, so we prelock
// wconn._mu as well while under atMu.W, so that we know that set of opened
// files stays the same during whole resync.
bool atMuWLocked = true;
wconn._atMu.Lock();
wconn._mu.lock();
defer([&]() {
wconn._mu.unlock();
if (atMuWLocked)
wconn._atMu.Unlock();
else
wconn._atMu.RUnlock();
});
xerr::Contextf E("%s: resync -> @%s", v(wconn), v(at)); xerr::Contextf E("%s: resync -> @%s", v(wconn), v(at));
error err; error err;
// wconn._downMu.lock(); XXX
err = wconn._downErr; err = wconn._downErr;
// wconn._downMu.unlock(); XXX
if (err != nil) if (err != nil)
return E(err); return E(err);
bool retok = false; bool retok = false;
defer([&]() { defer([&]() {
if (!retok) if (!retok)
; // XXX bring wconn + fileh + mmaps down on errror panic("TODO: bring wconn + fileh + mmaps down on errror"); // XXX
}); });
// set new wconn.at early, so that e.g. Conn.open running simultaneously
// to second part of resync (see below) uses new at. XXX no need since wconn._mu is locked?
wconn.at = at;
// go through all files opened under wconn and pre-adjust files and their
// mappings for viewing data as of new @at state.
//
// We are still holding atMu.W, so we are the only mutators of mappings,
// because, in particular, pinner is not running.
//
// Don't send watch updates for opened files to wcfs yet - without running
// pinner those updates will be stuck.
for (auto fit : wconn._filehTab) { for (auto fit : wconn._filehTab) {
zodb::Oid foid = fit.first; //zodb::Oid foid = fit.first;
FileH f = fit.second; FileH f = fit.second;
// TODO if file has no mappings and was not used during whole prev // TODO if file has no mappings and was not used during whole prev
// cycle - forget and stop watching it // cycle - forget and stop watching it
...@@ -632,11 +666,11 @@ error _Conn::resync(zodb::Tid at) { ...@@ -632,11 +666,11 @@ error _Conn::resync(zodb::Tid at) {
return E(fmt::errorf("wcfs bug: head/file size %% blksize != 0")); return E(fmt::errorf("wcfs bug: head/file size %% blksize != 0"));
for (auto mmap : f->_mmaps) { for (auto mmap : f->_mmaps) {
printf(" resync -> %s: unzero [%lu:%lu)", v(at), f->_headfsize/f->blksize, headfsize/f->blksize); //printf(" resync -> %s: unzero [%lu:%lu)", v(at), f->_headfsize/f->blksize, headfsize/f->blksize);
uint8_t *mem_unzero_start = min(mmap->mem_stop, uint8_t *mem_unzero_start = min(mmap->mem_stop,
mmap->mem_start + (f->_headfsize - mmap->blk_start*f->blksize)); mmap->mem_start + (f->_headfsize - mmap->blk_start*f->blksize));
uint8_t *mem_unzero_stop = min(mmap->mem_stop, uint8_t *mem_unzero_stop = min(mmap->mem_stop,
mmap->mem_start + (headfsize - mmap->blk_start*f->blksize)); mmap->mem_start + ( headfsize - mmap->blk_start*f->blksize));
if (mem_unzero_stop - mem_unzero_start > 0) { if (mem_unzero_stop - mem_unzero_start > 0) {
err = mmap_into_ro(mem_unzero_start, mem_unzero_stop-mem_unzero_start, f->_headf, f->_headfsize); err = mmap_into_ro(mem_unzero_start, mem_unzero_stop-mem_unzero_start, f->_headf, f->_headfsize);
...@@ -646,7 +680,30 @@ error _Conn::resync(zodb::Tid at) { ...@@ -646,7 +680,30 @@ error _Conn::resync(zodb::Tid at) {
} }
f->_headfsize = headfsize; f->_headfsize = headfsize;
}
// atomically downgrade atMu.W to atMu.R before issuing watch updates to wcfs.
// - we need atMu to be not Wlocked, because under atMu.W pinner cannot run simultanesouly to us.
// - we need to hold atMu.R to avoid race wrt e.g. other resync which changes at.
// - we cannot just do regular `atMu.Unlock + atMu.RLock()` because then
// there is a race window inbetween Unlock and RLock where wconn.at can be changed.
// XXX also deadlock, because it will become wconn._mu.lock + wconn._atMu lock
//
// Now other calls, e.g. Conn.open, can be running simultaneously to us,
// but since we already set wconn.at to new value it is ok. For example
// Conn.open, for not-yet-opened file, will use new at to send "watch".
// XXX ^^^ not possible since wconn._mu is locked ?
//
// XXX we are still holding wconn._mu locked, so wconn._filehTab is the
// same as in previous pass above.
wconn._atMu.UnlockAndRLock(); // XXX name
atMuWLocked = false;
// send watch updates to wcfs.
// the pinner is now running and will be able to serve pin requests triggered by out watch.
for (auto fit : wconn._filehTab) {
zodb::Oid foid = fit.first;
//FileH f = fit.second;
string ack; string ack;
tie(ack, err) = wconn._wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(at))); tie(ack, err) = wconn._wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(at)));
...@@ -657,7 +714,7 @@ error _Conn::resync(zodb::Tid at) { ...@@ -657,7 +714,7 @@ error _Conn::resync(zodb::Tid at) {
} }
} }
wconn.at = at; // wconn.at = at;
retok = true; retok = true;
return nil; return nil;
} }
......
...@@ -180,10 +180,6 @@ struct _Conn : object { ...@@ -180,10 +180,6 @@ struct _Conn : object {
sync::RWMutex _atMu; sync::RWMutex _atMu;
zodb::Tid at; zodb::Tid at;
// XXX kill downMu? (move under filehmu so that e.g. .open() can check downErr without race)
// sync::Mutex _downMu;
// error _downErr; // !nil if connection is closed or no longer operational
sync::Mutex _mu; // _atMu.W | _atMu.R + _mu sync::Mutex _mu; // _atMu.W | _atMu.R + _mu
error _downErr; // !nil if connection is closed or no longer operational error _downErr; // !nil if connection is closed or no longer operational
dict<zodb::Oid, FileH> _filehTab; // {} foid -> fileh dict<zodb::Oid, FileH> _filehTab; // {} foid -> fileh
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment