Commit 9212b13c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e6e6644a
...@@ -155,7 +155,7 @@ error _Conn::close() { ...@@ -155,7 +155,7 @@ error _Conn::close() {
// longer maintain consistent view. For this reason we change mappings to // longer maintain consistent view. For this reason we change mappings to
// something that gives EFAULT on access. XXX implement // something that gives EFAULT on access. XXX implement
for (auto _ : wconn._filehtab) { for (auto _ : wconn._filehTab) {
auto f = _.second; auto f = _.second;
err = f->_headf->close(); // XXX mark fileh as down so that fileh.close does not say "bad fd" err = f->_headf->close(); // XXX mark fileh as down so that fileh.close does not say "bad fd"
if (err != nil) if (err != nil)
...@@ -165,7 +165,7 @@ error _Conn::close() { ...@@ -165,7 +165,7 @@ error _Conn::close() {
// XXX stop watching f XXX ok under mu? // XXX stop watching f XXX ok under mu?
} }
wconn._filehtab.clear(); wconn._filehTab.clear();
return E(eret); return E(eret);
} }
...@@ -258,7 +258,7 @@ error _Conn::__pin1(PinReq *req) { ...@@ -258,7 +258,7 @@ error _Conn::__pin1(PinReq *req) {
wconn._mu.lock(); wconn._mu.lock();
// XXX +incref f, so that simultaneous close does not remove f from wconn.filehTab ? // XXX +incref f, so that simultaneous close does not remove f from wconn.filehTab ?
// XXX or just make FileH.close lock f too to synchronizw with pinner? // XXX or just make FileH.close lock f too to synchronizw with pinner?
tie(f, ok) = wconn._filehtab.get_(req->foid); tie(f, ok) = wconn._filehTab.get_(req->foid);
if (!ok) { if (!ok) {
wconn._mu.unlock(); wconn._mu.unlock();
// why wcfs sent us this update? // why wcfs sent us this update?
...@@ -266,9 +266,9 @@ error _Conn::__pin1(PinReq *req) { ...@@ -266,9 +266,9 @@ error _Conn::__pin1(PinReq *req) {
} }
wconn._mu.unlock(); // XXX maybe `f.mu.lock() -> wconn.mu.unlock()` to avoid race with FileH close? wconn._mu.unlock(); // XXX maybe `f.mu.lock() -> wconn.mu.unlock()` to avoid race with FileH close?
f._mu.lock(); f->_mu.lock();
defer([&]() { defer([&]() {
f._mu.unlock(); f->_mu.unlock();
}); });
for (auto mmap : f->_mmaps) { // TODO use ↑blk_start for binary search for (auto mmap : f->_mmaps) { // TODO use ↑blk_start for binary search
...@@ -341,13 +341,13 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) { ...@@ -341,13 +341,13 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) {
} }
FileH f; bool ok; FileH f; bool ok;
tie(f, ok) = wconn._filehtab.get_(foid); tie(f, ok) = wconn._filehTab.get_(foid);
if (ok) { if (ok) {
wconn._mu.unlock(); wconn._mu.unlock();
f._openReady.recv(); f->_openReady.recv();
if (f._openErr != nil) if (f->_openErr != nil)
return make_pair(nil, E(f._openErr)); return make_pair(nil, E(f->_openErr));
// XXX incref open count // XXX incref open count
return make_pair(f, nil); return make_pair(f, nil);
...@@ -361,32 +361,35 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) { ...@@ -361,32 +361,35 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) {
f->_openErr = nil; f->_openErr = nil;
bool retok = false; bool retok = false;
wconn._filehtab[foid] = f; wconn._filehTab[foid] = f;
defer([&]() { defer([&]() {
if (!retok) { if (!retok) {
wconn._mu.lock(); wconn._mu.lock();
wconn._filehtab.erase(foid); wconn._filehTab.erase(foid);
wconn._mu.unlock(); wconn._mu.unlock();
} }
f._openReady.close(); f->_openReady.close();
}); });
wconn._mu.unlock(); wconn._mu.unlock();
f._openErr = f._open(); f->_openErr = f->_open();
if (f._openErr != nil) if (f->_openErr != nil)
return make_pair(nil, E(f._openErr)); return make_pair(nil, E(f->_openErr));
retok = true; retok = true;
return make_pair(f, nil); return make_pair(f, nil);
} }
// _open performs actual open of FileH marked as "in-flight-open" in wconn.filehTab.
error _FileH::_open() { error _FileH::_open() {
_FileH* f = this; _FileH* f = this;
Conn wconn = f->wconn;
error err;
tie(f->_headf, err) tie(f->_headf, err)
= wconn._wc->_open(fmt::sprintf("head/bigfile/%s", v(foid))); = wconn->_wc->_open(fmt::sprintf("head/bigfile/%s", v(foid)));
if (err != nil) if (err != nil)
return make_pair(nil, err); return err;
bool retok = false; bool retok = false;
defer([&]() { defer([&]() {
...@@ -397,25 +400,25 @@ error _FileH::_open() { ...@@ -397,25 +400,25 @@ error _FileH::_open() {
struct stat st; struct stat st;
err = f->_headf->stat(&st); err = f->_headf->stat(&st);
if (err != nil) if (err != nil)
return make_pair(nil, err); return err;
f->blksize = st.st_blksize; f->blksize = st.st_blksize;
f->_headfsize = st.st_size; f->_headfsize = st.st_size;
if (!(f->_headfsize % f->blksize == 0)) if (!(f->_headfsize % f->blksize == 0))
return make_pair(nil, fmt::errorf("wcfs bug: %s size (%d) %% blksize (%d) != 0", return fmt::errorf("wcfs bug: %s size (%d) %% blksize (%d) != 0",
v(f->_headf->name()), f->_headfsize, f->blksize)); v(f->_headf->name()), f->_headfsize, f->blksize);
// start watching f // start watching f
// NOTE we are _not_ holding wconn.mu nor f.mu XXX wconn.atMu? // NOTE we are _not_ holding wconn.mu nor f.mu XXX wconn.atMu?
string ack; string ack;
tie(ack, err) = wconn._wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(wconn.at))); tie(ack, err) = wconn->_wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(wconn->at)));
if (err != nil) if (err != nil)
return make_pair(nil, err); return err;
if (ack != "ok") { if (ack != "ok") {
return make_pair(nil, fmt::errorf("watch: %s", v(ack))); return fmt::errorf("watch: %s", v(ack));
} }
retok = true; retok = true;
return make_pair(f, nil); return nil;
} }
// close releases resources associated with FileH. // close releases resources associated with FileH.
...@@ -429,13 +432,13 @@ error _FileH::close() { ...@@ -429,13 +432,13 @@ error _FileH::close() {
// XXX change all fileh.mmaps to cause EFAULT on any access after fileh.close // XXX change all fileh.mmaps to cause EFAULT on any access after fileh.close
// XXX "watch foid -" -> wconn.wlink (stop watchingthe file) // XXX "watch foid -" -> wconn.wlink (stop watchingthe file)
// remove fileh from wconn._filehtab // remove fileh from wconn._filehTab
// fileh.close can be called several times and after first call another // fileh.close can be called several times and after first call another
// fileh could be opened for the same foid. Be careful not to erase it. // fileh could be opened for the same foid. Be careful not to erase it.
wconn->_filehmu.lock(); wconn->_filehmu.lock();
// XXX decref open count // XXX decref open count
if (wconn->_filehtab.get(fileh.foid)._ptr() == &fileh) if (wconn->_filehTab.get(fileh.foid)._ptr() == &fileh)
wconn->_filehtab.erase(fileh.foid); wconn->_filehTab.erase(fileh.foid);
wconn->_filehmu.unlock(); wconn->_filehmu.unlock();
return E(fileh._headf->close()); return E(fileh._headf->close());
...@@ -592,7 +595,7 @@ error _Conn::resync(zodb::Tid at) { ...@@ -592,7 +595,7 @@ error _Conn::resync(zodb::Tid at) {
}); });
for (auto fit : wconn._filehtab) { for (auto fit : wconn._filehTab) {
zodb::Oid foid = fit.first; zodb::Oid foid = fit.first;
FileH f = fit.second; FileH f = fit.second;
......
...@@ -175,7 +175,7 @@ struct _Conn : object { ...@@ -175,7 +175,7 @@ struct _Conn : object {
// knows noone is using the connection for reading simultaneously. // knows noone is using the connection for reading simultaneously.
// //
// XXX deadlock with pinner? -> use deadlock-avoidance similar to zwatcher in wcfs.go // XXX deadlock with pinner? -> use deadlock-avoidance similar to zwatcher in wcfs.go
sync::RWMutex _atMu sync::RWMutex _atMu;
zodb::Tid at; zodb::Tid at;
// XXX kill downMu? (move under filehmu so that e.g. .open() can check downErr without race) // XXX kill downMu? (move under filehmu so that e.g. .open() can check downErr without race)
...@@ -222,8 +222,12 @@ typedef refptr<struct _FileH> FileH; ...@@ -222,8 +222,12 @@ typedef refptr<struct _FileH> FileH;
struct _FileH : object { struct _FileH : object {
Conn wconn; Conn wconn;
zodb::Oid foid; // ZBigFile root object ID (does not change after fileh open) zodb::Oid foid; // ZBigFile root object ID (does not change after fileh open)
size_t blksize; // block size of this file (does not change after fileh open)
os::File _headf; // file object of head/file chan<structZ> _openReady; // in-flight open completed
error _openErr; // error result from open
os::File _headf; // file object of head/file
size_t blksize; // block size of this file (does not change after fileh open)
// head/file size is known to be at least headfsize (size ↑=) // head/file size is known to be at least headfsize (size ↑=)
// protected by .wconn._atMu // protected by .wconn._atMu
...@@ -244,6 +248,8 @@ public: ...@@ -244,6 +248,8 @@ public:
public: public:
error close(); error close();
pair<Mapping, error> mmap(int64_t blk_start, int64_t blk_len, VMA *vma=nil); pair<Mapping, error> mmap(int64_t blk_start, int64_t blk_len, VMA *vma=nil);
error _open();
}; };
// Mapping represents one memory mapping of FileH. // Mapping represents one memory mapping of FileH.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment