Commit 57489834 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ae23c2c3
......@@ -74,6 +74,8 @@ from .internal._wcfs import \
# Raw files on wcfs can be accessed with ._path/._read/._stat/._open .
#
# WCFS logically mirrors ZODB.DB .
#
# XXX kill doc instead of C++.
class WCFS(_WCFS):
# .mountpoint path to wcfs mountpoint
# ._fwcfs /.wcfs/zurl opened to keep the server from going away (at least cleanly)
......
......@@ -40,18 +40,33 @@ using std::pair;
#include "wcfs_misc.h"
// from wendelin/bigfile/virtmem.h
extern "C" {
struct BigFileH;
}
// wcfs::
namespace wcfs {
struct _File;
struct _Mapping;
typedef refptr<struct _Conn> Conn;
typedef refptr<struct _Mapping> Mapping;
typedef refptr<struct _FileH> FileH;
typedef refptr<struct _WatchLink> WatchLink;
struct PinReq;
typedef refptr<class _Conn> Conn;
typedef refptr<class _WatchLink> WatchLink;
// WCFS represents filesystem-level connection to wcfs server.
// XXX doc
//
// XXX Use join to create it?
//
// The primary way to access wcfs is to open logical connection viewing on-wcfs
// data as of particular database state, and use that logical connection to
// create base-layer mappings. See .connect and Conn for details.
//
// XXX raw files?
//
// WCFS logically mirrors ZODB.DB .
struct WCFS {
string mountpoint;
......@@ -69,7 +84,7 @@ struct WCFS {
// cache in OS pagecache of /head/bigfile/*.
//
// Use WCFS.connect(at) to create Conn.
// Use .mmap to create new Mappings.
// Use .open to create new FileH.
// Use .resync to resync Conn onto different database view.
//
// Conn logically mirrors ZODB.Connection .
......@@ -79,8 +94,8 @@ struct _Conn : object {
zodb::Tid at;
WatchLink _wlink; // watch/receive pins for created mappings
sync::Mutex _filemu;
dict<zodb::Oid, _File*> _filetab; // {} foid -> _file
sync::Mutex _filehmu;
dict<zodb::Oid, FileH> _filehtab; // {} foid -> fileh
sync::WorkGroup _pinWG;
func<void()> _pinCancel;
......@@ -95,8 +110,8 @@ public:
public:
error close();
// XXX move mmap -> _FileH ?
pair<_Mapping*, error> mmap(zodb::Oid foid, int64_t blk_start, int64_t blk_len);
// XXX move mmap -> FileH
pair<Mapping, error> mmap(zodb::Oid foid, int64_t blk_start, int64_t blk_len);
error resync(zodb::Tid at);
private:
......@@ -104,6 +119,64 @@ private:
void _pin1(PinReq *req);
};
// FileH represent isolated file view under Conn.
//
// The file view is maintained to be as of @Conn.at database state.
// The file view uses /head/<file>/data primarilty and @revX/<file>/data pin overrides.
//
// Use .mmap to map file view into memory.
typedef refptr<struct _FileH> FileH;
struct _FileH : object {
Conn wconn;
zodb::Oid _foid; // ZBigFile root object ID
size_t _blksize; // block size of this file XXX -> off_t ?
os::File _headf; // file object of head/file
off_t _headfsize; // head/file size is known to be at least headfsize (size ↑=)
dict<int64_t, zodb::Tid> _pinned; // {} blk -> rev that wcfs already sent us for this file
vector<Mapping> _mmaps; // []Mapping ↑blk_start mappings of this file
// don't new - create via Conn.open
private:
_FileH();
~_FileH();
// XXX -> friend Conn::open
friend pair<Mapping, error> _Conn::mmap(zodb::Oid foid, int64_t blk_start, int64_t blk_len);
public:
void decref();
};
// Mapping represents one mapping of FileH.
typedef refptr<struct _Mapping> Mapping;
struct _Mapping : object {
FileH fileh;
int64_t blk_start; // offset of this mapping in file
BigFileH *virt_fileh; // mmapped under this virtmem file handle XXX -> VMA XXX can be nil
uint8_t *mem_start; // mmapped memory [mem_start, mem_stop)
uint8_t *mem_stop;
int64_t blk_stop() const {
//XXX reenable
//ASSERT((mem_stop - mem_start) % file->blksize == 0);
return blk_start + (mem_stop - mem_start) / fileh->_blksize;
}
error _remmapblk(int64_t blk, zodb::Tid at);
void remmap_blk(int64_t blk);
void unmap();
// don't new - create via FileH.mmap
private:
_Mapping();
~_Mapping();
// XXX -> friend FileH.mmap
friend pair<Mapping, error> _Conn::mmap(zodb::Oid foid, int64_t blk_start, int64_t blk_len);
public:
void decref();
};
} // wcfs::
......
......@@ -71,43 +71,6 @@ static error mmap_zero_into_ro(void *addr, size_t size);
static tuple<uint8_t*, error> mmap_ro(os::File f, off_t offset, size_t size);
static error mmap_into_ro(void *addr, size_t size, os::File f, off_t offset);
// _File represent isolated file view under Conn.
//
// XXX doc XXX naming -> _FileView ?
// XXX -> refptr ?
struct _File {
Conn wconn;
zodb::Oid foid; // ZBigFile root object ID
size_t blksize; // block size of this file XXX -> off_t ?
os::File headf; // file object of head/file
off_t headfsize; // head/file size is known to be at least headfsize (size ↑=)
dict<int64_t, zodb::Tid> pinned; // {} blk -> rev that wcfs already sent us for this file
vector<_Mapping*> mmaps; // []_Mapping ↑blk_start mappings of this file
};
// _Mapping represents one mapping of _File.
// XXX -> refptr ?
struct _Mapping {
_File *file;
int64_t blk_start; // offset of this mapping in file
BigFileH *fileh; // mmapped under this file handle
uint8_t *mem_start; // mmapped memory [mem_start, mem_stop)
uint8_t *mem_stop;
int64_t blk_stop() const {
//XXX reenable
//ASSERT((mem_stop - mem_start) % file->blksize == 0);
return blk_start + (mem_stop - mem_start) / file->blksize;
}
error _remmapblk(int64_t blk, zodb::Tid at);
void remmap_blk(int64_t blk);
void unmap();
};
// connect creates new Conn viewing WCFS state as of @at.
pair<Conn, error> WCFS::connect(zodb::Tid at) {
WCFS *wc = this;
......@@ -154,20 +117,20 @@ error _Conn::close() {
// close all files - both that have no mappings and that still have opened mappings.
// XXX after file is closed mappings continue to survive, but we can no
// longer maintain consistent view.
wconn._filemu.lock();
wconn._filehmu.lock();
defer([&]() {
wconn._filemu.unlock();
wconn._filehmu.unlock();
});
for (auto _ : wconn._filetab) {
for (auto _ : wconn._filehtab) {
auto f = _.second;
f->headf->close(); // XXX err
f->headf = nil;
f->_headf->close(); // XXX err
f->_headf = nil;
// XXX stop watching f
}
wconn._filetab.clear();
wconn._filehtab.clear();
return nil;
}
......@@ -226,19 +189,19 @@ void _Conn::_pin1(PinReq *req) {
defer(_)
#endif
_File *f;
FileH f;
bool ok;
wconn._filemu.lock();
tie(f, ok) = wconn._filetab.get(req->foid);
wconn._filehmu.lock();
tie(f, ok) = wconn._filehtab.get(req->foid);
if (!ok) {
wconn._filemu.unlock();
wconn._filehmu.unlock();
// XXX err = we are not watching the file - why wcfs sent us this update?
}
// XXX relock wconn -> f
for (auto mmap : f->mmaps) { // XXX use ↑blk_start for binary search
for (auto mmap : f->_mmaps) { // XXX use ↑blk_start for binary search
if (!(mmap->blk_start <= req->blk && req->blk < mmap->blk_stop()))
continue; // blk ∉ mmap
......@@ -259,21 +222,21 @@ void _Conn::_pin1(PinReq *req) {
//trace("\t-> remmaped"); XXX
}
// update f.pinned
// update f._pinned
// XXX do it before ^^^ remmapblk (so that e.g. concurrent
// discard/writeout see correct f.pinned) ?
// discard/writeout see correct f._pinned) ?
if (req->at == TidHead) {
f->pinned.erase(req->blk); // unpin to @head
f->_pinned.erase(req->blk); // unpin to @head
}
else {
f->pinned[req->blk] = req->at;
f->_pinned[req->blk] = req->at;
}
wconn._filemu.unlock();
wconn._filehmu.unlock();
}
// mmap creates file mapping representing file[blk_start +blk_len) data as of wconn.at database state.
pair<_Mapping*, error> _Conn::mmap(zodb::Oid foid, int64_t blk_start, int64_t blk_len) {
pair<Mapping, error> _Conn::mmap(zodb::Oid foid, int64_t blk_start, int64_t blk_len) {
_Conn& wconn = *this;
// XXX err ctx
// XXX (blk_start + blk_len) * blk_size overflow
......@@ -286,34 +249,34 @@ pair<_Mapping*, error> _Conn::mmap(zodb::Oid foid, int64_t blk_start, int64_t bl
int64_t blk_stop = blk_start + blk_len; // XXX overflow
wconn._filemu.lock();
wconn._filehmu.lock();
defer([&]() {
wconn._filemu.unlock();
wconn._filehmu.unlock();
});
_File* f; bool ok;
tie(f, ok) = wconn._filetab.get(foid);
FileH f; bool ok;
tie(f, ok) = wconn._filehtab.get(foid);
if (f == nil) {
f = new _File(); // XXX free f on error return (will be automatic if f = refptr)
f = adoptref(new _FileH());
f->wconn = newref(&wconn); // XXX newref -> simpler?
f->foid = foid;
tie(f->headf, err)
f->_foid = foid;
tie(f->_headf, err)
= wconn._wc->_open(fmt::sprintf("head/bigfile/%s", h_(foid)));
if (err != nil)
return make_pair(nil, err);
//f.pinned = {}
//f.mmaps = []
//f._pinned = {}
//f._mmaps = []
struct stat st;
err = f->headf->stat(&st);
err = f->_headf->stat(&st);
if (err != nil)
return make_pair(nil, err);
f->blksize = st.st_blksize;
f->headfsize = st.st_size;
if (!(f->headfsize % f->blksize == 0))
f->_blksize = st.st_blksize;
f->_headfsize = st.st_size;
if (!(f->_headfsize % f->_blksize == 0))
return make_pair(nil, fmt::errorf("wcfs bug: head/file size %% blksize != 0"));
wconn._filetab[foid] = f;
wconn._filehtab[foid] = f;
// start watching f
string ack;
......@@ -321,7 +284,7 @@ pair<_Mapping*, error> _Conn::mmap(zodb::Oid foid, int64_t blk_start, int64_t bl
if (err != nil)
return make_pair(nil, err);
if (ack != "ok") {
// XXX unregister f from _filetab
// XXX unregister f from _filehtab
// XXX vvv -> errctx?
return make_pair(nil, fmt::errorf("@%s: mmap f<%s>[%zd +%zd): %s", h_(wconn.at), h_(foid), blk_start, blk_len, ack.c_str()));
}
......@@ -331,28 +294,28 @@ pair<_Mapping*, error> _Conn::mmap(zodb::Oid foid, int64_t blk_start, int64_t bl
// create memory with head/f mapping and applied pins
// mmap-in zeros after f.size (else access to memory after file.size will raise SIGBUS)
int64_t start = blk_start*f->blksize;
int64_t start = blk_start*f->_blksize;
uint8_t *mem_start, *mem_stop;
tie(mem_start, err) = mmap_ro(f->headf, start, blk_len*f->blksize);
tie(mem_start, err) = mmap_ro(f->_headf, start, blk_len*f->_blksize);
if (err != nil)
return make_pair(nil, err);
mem_stop = mem_start + blk_len*f->blksize;
mem_stop = mem_start + blk_len*f->_blksize;
int64_t stop = blk_stop*f->blksize;
if (stop > f->headfsize) {
uint8_t *zmem_start = mem_start + (max(f->headfsize/*XXX -1 ?*/, start) - start);
int64_t stop = blk_stop*f->_blksize;
if (stop > f->_headfsize) {
uint8_t *zmem_start = mem_start + (max(f->_headfsize/*XXX -1 ?*/, start) - start);
err = mmap_zero_into_ro(zmem_start, mem_stop - zmem_start);
if (err != nil)
return make_pair(nil, err);
}
_Mapping* mmap = new _Mapping();
mmap->file = f;
Mapping mmap = adoptref(new _Mapping());
mmap->fileh = f;
mmap->blk_start = blk_start;
mmap->mem_start = mem_start;
mmap->mem_stop = mem_stop;
for (auto _ : f->pinned) { // XXX keep f.pinned ↑blk and use binary search?
for (auto _ : f->_pinned) { // XXX keep f._pinned ↑blk and use binary search?
int64_t blk = _.first;
zodb::Tid rev = _.second;
if (!(blk_start <= blk && blk < blk_stop))
......@@ -360,7 +323,7 @@ pair<_Mapping*, error> _Conn::mmap(zodb::Oid foid, int64_t blk_start, int64_t bl
mmap->_remmapblk(blk, rev); // XXX err?
}
f->mmaps.push_back(mmap); // XXX keep f.mmaps ↑blk_start
f->_mmaps.push_back(mmap); // XXX keep f._mmaps ↑blk_start
return make_pair(mmap, nil);
}
......@@ -372,42 +335,42 @@ error _Conn::resync(zodb::Tid at) {
// XXX locking
for (auto fit : wconn._filetab) {
for (auto fit : wconn._filehtab) {
zodb::Oid foid = fit.first;
_File &f = *fit.second;
FileH f = fit.second;
// XXX if file has no mappings and was not used during whole prev
// cycle - forget and stop watching it
// update f.headfsize and remmap to head/f zero regions that are now covered by head/f
// update f._headfsize and remmap to head/f zero regions that are now covered by head/f
struct stat st;
error err = f.headf->stat(&st);
error err = f->_headf->stat(&st);
if (err != nil)
return err;
if ((size_t)st.st_blksize != f.blksize) // blksize must not change
return fmt::errorf("wcfs bug: blksize changed: %zd -> %ld", f.blksize, st.st_blksize);
if ((size_t)st.st_blksize != f->_blksize) // blksize must not change
return fmt::errorf("wcfs bug: blksize changed: %zd -> %ld", f->_blksize, st.st_blksize);
auto headfsize = st.st_size;
if (!(f.headfsize <= headfsize)) // head/file size ↑=
if (!(f->_headfsize <= headfsize)) // head/file size ↑=
return fmt::errorf("wcfs bug: head/file size not ↑=");
if (!(headfsize % f.blksize == 0))
if (!(headfsize % f->_blksize == 0))
return fmt::errorf("wcfs bug: head/file size %% blksize != 0");
for (auto mmap : f.mmaps) {
printf(" resync -> %s: unzero [%lu:%lu)", h_(at), f.headfsize/f.blksize, headfsize/f.blksize);
for (auto mmap : f->_mmaps) {
printf(" resync -> %s: unzero [%lu:%lu)", h_(at), f->_headfsize/f->_blksize, headfsize/f->_blksize);
uint8_t *mem_unzero_start = min(mmap->mem_stop,
mmap->mem_start + (f.headfsize - mmap->blk_start*f.blksize));
mmap->mem_start + (f->_headfsize - mmap->blk_start*f->_blksize));
uint8_t *mem_unzero_stop = min(mmap->mem_stop,
mmap->mem_start + (headfsize - mmap->blk_start*f.blksize));
mmap->mem_start + (headfsize - mmap->blk_start*f->_blksize));
if (mem_unzero_stop - mem_unzero_start > 0) {
err = mmap_into_ro(mem_unzero_start, mem_unzero_stop-mem_unzero_start, f.headf, f.headfsize);
err = mmap_into_ro(mem_unzero_start, mem_unzero_stop-mem_unzero_start, f->_headf, f->_headfsize);
if (err != nil)
return err;
}
}
f.headfsize = headfsize;
f->_headfsize = headfsize;
string ack;
......@@ -415,7 +378,7 @@ error _Conn::resync(zodb::Tid at) {
if (err != nil)
return err;
if (ack != "ok") {
// XXX unregister f from _filetab
// XXX unregister f from _filehtab
// XXX vvv -> errctx?
return fmt::errorf("resync @%s -> @%s: f<%s>: %s", h_(wconn.at), h_(at), h_(foid), ack.c_str());
}
......@@ -435,19 +398,19 @@ error _Mapping::_remmapblk(int64_t blk, zodb::Tid at) {
//XXX reenable
//ASSERT(mmap->blk_start <= blk && blk < mmap->blk_stop());
_File *f = mmap->file;
FileH f = mmap->fileh;
error err;
uint8_t *blkmem = mmap->mem_start + (blk - mmap->blk_start)*f->blksize;
uint8_t *blkmem = mmap->mem_start + (blk - mmap->blk_start)*f->_blksize;
os::File fsfile;
bool fclose = false;
if (at == TidHead) {
fsfile = f->headf;
fsfile = f->_headf;
}
else {
// TODO share @rev fd until wconn is resynced?
tie(fsfile, err) = f->wconn->_wc->_open(
fmt::sprintf("@%s/bigfile/%s", h_(at), h_(f->foid)));
fmt::sprintf("@%s/bigfile/%s", h_(at), h_(f->_foid)));
if (err != nil)
return err;
fclose = true;
......@@ -461,19 +424,19 @@ error _Mapping::_remmapblk(int64_t blk, zodb::Tid at) {
err = fsfile->stat(&st);
if (err != nil)
return err;
if ((size_t)st.st_blksize != f->blksize)
return fmt::errorf("wcfs bug: blksize changed: %zd -> %ld", f->blksize, st.st_blksize);
if ((size_t)st.st_blksize != f->_blksize)
return fmt::errorf("wcfs bug: blksize changed: %zd -> %ld", f->_blksize, st.st_blksize);
// block is beyond file size - mmap with zeros - else access to memory
// after file.size will raise SIGBUS. (assumes head/f size ↑=)
if ((blk+1)*f->blksize > (size_t)st.st_size) {
err = mmap_zero_into_ro(blkmem, 1*f->blksize);
if ((blk+1)*f->_blksize > (size_t)st.st_size) {
err = mmap_zero_into_ro(blkmem, 1*f->_blksize);
if (err != nil)
return err;
}
// block is inside file - mmap file data
else {
err = mmap_into_ro(blkmem, 1*f->blksize, fsfile, blk*f->blksize);
err = mmap_into_ro(blkmem, 1*f->_blksize, fsfile, blk*f->_blksize);
if (err != nil)
return err;
}
......@@ -492,7 +455,7 @@ void _Mapping::remmap_blk(int64_t blk) {
// blkrev = rev | @head
zodb::Tid blkrev; bool ok;
tie(blkrev, ok) = mmap.file->pinned.get(blk);
tie(blkrev, ok) = mmap.fileh->_pinned.get(blk);
if (!ok)
blkrev = TidHead;
......@@ -502,7 +465,7 @@ void _Mapping::remmap_blk(int64_t blk) {
// unmap removes mapping memory from address space.
// virtmem calls this when VMA is unmapped.
void _Mapping::unmap() {
_Mapping* mmap = this;
Mapping mmap = newref(this); // XXX newref for std::remove
// XXX locking
......@@ -513,9 +476,9 @@ void _Mapping::unmap() {
mmap->mem_stop = NULL;
// XXX clear other fields?
_File *f = mmap->file;
//f->mmaps.remove(mmap);
std::remove(f->mmaps.begin(), f->mmaps.end(), mmap); // XXX keep mmaps sorted
FileH f = mmap->fileh;
//f->_mmaps.remove(mmap);
std::remove(f->_mmaps.begin(), f->_mmaps.end(), mmap); // XXX keep mmaps sorted
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment