Commit e8f67851 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 6bf24d85
...@@ -137,15 +137,11 @@ ...@@ -137,15 +137,11 @@
// client process // client process
// //
// This creates the necessity to use RWMutex for locks that pinner and other // This creates the necessity to use RWMutex for locks that pinner and other
// parts of the code could be using at the same time in synchronous mode similar // parts of the code could be using at the same time in synchronous scenarious
// to the above. This locks are: // similar to the above. This locks are:
// //
// - Conn.atMu // - Conn.atMu
// - Conn.mu // - Conn.mu
//
//
// XXX link to bigfile/file_zodb.cpp to show how wcfs/client is used for
// ZBigFile on client side.
#include "wcfs_misc.h" #include "wcfs_misc.h"
...@@ -204,10 +200,8 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) { ...@@ -204,10 +200,8 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) {
return wconn->_pinner(ctx); return wconn->_pinner(ctx);
}); });
// NOTE no need to wait till `wcfs/head/at ≥ at` because Conn.open does it. // need to wait till `wcfs/head/at ≥ at` because e.g. Conn.open stats stats
// FIXME ^^^ not right - Conn.open waits for wcfs/head/at only in the end // head/f to get f.headfsize.
// and stats head/f to get f.headfsize _before_ that.
// -> just wait here.
// XXX atMu.RLock ? // XXX atMu.RLock ?
err = wconn->_headWait(at); err = wconn->_headWait(at);
if (err != nil) { if (err != nil) {
...@@ -458,24 +452,25 @@ error _Conn::resync(zodb::Tid at) { ...@@ -458,24 +452,25 @@ error _Conn::resync(zodb::Tid at) {
_Conn& wconn = *this; _Conn& wconn = *this;
error err; error err;
wconn._atMu.Lock(); wconn._atMu.RLock();
xerr::Contextf E("%s: resync -> @%s", v(wconn), v(at)); xerr::Contextf E("%s: resync -> @%s", v(wconn), v(at));
wconn._atMu.Unlock(); wconn._atMu.RUnlock();
// XXX downErr -> E // XXX downErr -> E
// XXX at ^ (increases) // XXX at ^ (increases)
// first wait for wcfs/head to be >= at // first wait for wcfs/head to be >= at.
// we need this e.g. to be sure that head/f.size is at least as big that it will be @at state.
err = wconn._headWait(at); err = wconn._headWait(at);
if (err != nil) if (err != nil)
return err; return err; // XXX -> wconn down on err ?
// write-lock wconn._atMu . This excludes everything else, and in // write-lock wconn._atMu . This excludes everything else, and in
// particular _pinner_, from running and mutating files and mappings. // particular _pinner_, from running and mutating files and mappings.
// //
// NOTE we'll relock atMu as R in the second part of resync, so we prelock // NOTE we'll relock atMu as R in the second part of resync, so we prelock
// wconn._mu as well while under atMu.W, so that we know that set of opened // wconn._mu.R as well while under atMu.W, to be sure that set of opened
// files stays the same during whole resync. // files stays the same during whole resync.
bool atMuWLocked = true; bool atMuWLocked = true;
wconn._atMu.Lock(); wconn._atMu.Lock();
...@@ -500,17 +495,18 @@ error _Conn::resync(zodb::Tid at) { ...@@ -500,17 +495,18 @@ error _Conn::resync(zodb::Tid at) {
}); });
// set new wconn.at early, so that e.g. Conn.open running simultaneously // set new wconn.at early, so that e.g. Conn.open running simultaneously
// to second part of resync (see below) uses new at. XXX no need since wconn._mu is locked? // to second part of resync (see below) uses new at.
// XXX no need since wconn._mu is locked? -> no - it is *needed* after wconn.mu became RWMutex
wconn.at = at; wconn.at = at;
// go through all files opened under wconn and pre-adjust files and their // go through all files opened under wconn and pre-adjust their mappings
// mappings for viewing data as of new @at state. // for viewing data as of new @at state.
// //
// We are still holding atMu.W, so we are the only mutators of mappings, // We are still holding atMu.W, so we are the only mutators of mappings,
// because, in particular, pinner is not running. // because, in particular, pinner is not running.
// //
// Don't send watch updates for opened files to wcfs yet - without running // Don't send watch updates for opened files to wcfs yet - without running
// pinner those updates will be stuck. // pinner those updates will get stuck.
for (auto fit : wconn._filehTab) { for (auto fit : wconn._filehTab) {
//zodb::Oid foid = fit.first; //zodb::Oid foid = fit.first;
FileH f = fit.second; FileH f = fit.second;
...@@ -518,6 +514,8 @@ error _Conn::resync(zodb::Tid at) { ...@@ -518,6 +514,8 @@ error _Conn::resync(zodb::Tid at) {
// TODO if file has no mappings and was not used during whole prev // TODO if file has no mappings and was not used during whole prev
// cycle - forget and stop watching it // cycle - forget and stop watching it
// XXX not yet ready f ?
// update f._headfsize and remmap to head/f zero regions that are now covered by head/f // update f._headfsize and remmap to head/f zero regions that are now covered by head/f
struct stat st; struct stat st;
err = f->_headf->stat(&st); err = f->_headf->stat(&st);
...@@ -527,7 +525,7 @@ error _Conn::resync(zodb::Tid at) { ...@@ -527,7 +525,7 @@ error _Conn::resync(zodb::Tid at) {
if ((size_t)st.st_blksize != f->blksize) // blksize must not change if ((size_t)st.st_blksize != f->blksize) // blksize must not change
return E(fmt::errorf("wcfs bug: blksize changed: %zd -> %ld", f->blksize, st.st_blksize)); return E(fmt::errorf("wcfs bug: blksize changed: %zd -> %ld", f->blksize, st.st_blksize));
auto headfsize = st.st_size; auto headfsize = st.st_size;
if (!(f->_headfsize <= headfsize)) // head/file size ↑= if (!(f->_headfsize <= headfsize)) // head/file size ↑=
return E(fmt::errorf("wcfs bug: head/file size not ↑=")); return E(fmt::errorf("wcfs bug: head/file size not ↑="));
if (!(headfsize % f->blksize == 0)) if (!(headfsize % f->blksize == 0))
return E(fmt::errorf("wcfs bug: head/file size %% blksize != 0")); return E(fmt::errorf("wcfs bug: head/file size %% blksize != 0"));
...@@ -559,9 +557,10 @@ error _Conn::resync(zodb::Tid at) { ...@@ -559,9 +557,10 @@ error _Conn::resync(zodb::Tid at) {
// Now other calls, e.g. Conn.open, can be running simultaneously to us, // Now other calls, e.g. Conn.open, can be running simultaneously to us,
// but since we already set wconn.at to new value it is ok. For example // but since we already set wconn.at to new value it is ok. For example
// Conn.open, for not-yet-opened file, will use new at to send "watch". // Conn.open, for not-yet-opened file, will use new at to send "watch".
// XXX ^^^ not possible since wconn._mu is locked ? -> no, possible, wconn._mu became RWMutex // XXX ^^^ not possible since wconn._mu is locked ?
// -> no, possible, wconn._mu became RWMutex
// //
// XXX we are still holding wconn._mu locked, so wconn._filehTab is the // XXX we are still holding wconn._mu.R, so wconn._filehTab is the
// same as in previous pass above. // same as in previous pass above.
wconn._atMu.UnlockToRLock(); wconn._atMu.UnlockToRLock();
atMuWLocked = false; atMuWLocked = false;
...@@ -625,6 +624,9 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) { ...@@ -625,6 +624,9 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) {
f->foid = foid; f->foid = foid;
f->_openReady = makechan<structZ>(); f->_openReady = makechan<structZ>();
f->_openErr = nil; f->_openErr = nil;
f->_headf = nil;
f->blksize = 0;
f->_headfsize = 0;
bool retok = false; bool retok = false;
wconn._filehTab[foid] = f; wconn._filehTab[foid] = f;
...@@ -674,7 +676,7 @@ error _FileH::_open() { ...@@ -674,7 +676,7 @@ error _FileH::_open() {
if (err != nil) if (err != nil)
return err; return err;
f->blksize = st.st_blksize; f->blksize = st.st_blksize;
f->_headfsize = st.st_size; // FIXME getting headfsize _before_ waiting for wcfs/head/at ≥ wconn.at f->_headfsize = st.st_size;
if (!(f->_headfsize % f->blksize == 0)) if (!(f->_headfsize % f->blksize == 0))
return fmt::errorf("wcfs bug: %s size (%d) %% blksize (%d) != 0", return fmt::errorf("wcfs bug: %s size (%d) %% blksize (%d) != 0",
v(f->_headf->name()), f->_headfsize, f->blksize); v(f->_headf->name()), f->_headfsize, f->blksize);
......
...@@ -181,7 +181,6 @@ struct _Conn : object { ...@@ -181,7 +181,6 @@ struct _Conn : object {
sync::RWMutex _atMu; sync::RWMutex _atMu;
zodb::Tid at; zodb::Tid at;
// sync::Mutex _mu; // _atMu.W | _atMu.R + _mu
sync::RWMutex _mu; // _atMu.W | _atMu.R + _mu sync::RWMutex _mu; // _atMu.W | _atMu.R + _mu
error _downErr; // !nil if connection is closed or no longer operational error _downErr; // !nil if connection is closed or no longer operational
dict<zodb::Oid, FileH> _filehTab; // {} foid -> fileh dict<zodb::Oid, FileH> _filehTab; // {} foid -> fileh
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment