// Copyright (C) 2018-2020 Nexedi SA and Contributors. // Kirill Smelkov <kirr@nexedi.com> // // This program is free software: you can Use, Study, Modify and Redistribute // it under the terms of the GNU General Public License version 3, or (at your // option) any later version, as published by the Free Software Foundation. // // You can also Link and Combine this program with other software covered by // the terms of any of the Free Software licenses or any of the Open Source // Initiative approved licenses and Convey the resulting work. Corresponding // source of such a combination shall include the source code for all other // software used. // // This program is distributed WITHOUT ANY WARRANTY; without even the implied // warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // // See COPYING file for full licensing terms. // See https://www.nexedi.com/licensing for rationale and options. // Package wcfs provides WCFS client integrated with user-space virtual memory manager. // See wcfs.h for package overview. // Wcfs client organization // // - need to maintain pinner map registry // // [blk< +len) -> vma ↓blk // #blk -> []vma covering it // // - we can reuse virtmem code: // // XXX review text // // if RW page is added to mapping, pinner does not need to split the mmap - // it just needs to check before overwriting that page with @revX/data // whether RAM page was mmaped by virtmem or not. If RW page is already // there, pinner does not overwrite it. // // RW -> Uptodate: Virtmem calls pinner to remmap the page RO after commit or // abort. // // // VMA -> fileh -> file // ↑↓ ↑↓ (XXX here ?) // Mapping FileH // #include "wcfs_misc.h" #include "wcfs.h" #include "wcfs_watchlink.h" #include <wendelin/bigfile/virtmem.h> #include <wendelin/bigfile/ram.h> #include <golang/errors.h> #include <golang/fmt.h> #include <golang/io.h> #include <algorithm> #include <string> #include <vector> #include <sys/types.h> #include <sys/mman.h> #include <sys/stat.h> #include <unistd.h> using std::min; using std::max; using std::vector; // wcfs:: namespace wcfs { static error mmap_zero_into_ro(void *addr, size_t size); static tuple<uint8_t*, error> mmap_ro(os::File f, off_t offset, size_t size); static error mmap_into_ro(void *addr, size_t size, os::File f, off_t offset); // connect creates new Conn viewing WCFS state as of @at. pair<Conn, error> WCFS::connect(zodb::Tid at) { WCFS *wc = this; xerr::Contextf E("wcfs %s: connect @%s", v(wc->mountpoint), v(at)); // TODO support !isolated mode WatchLink wlink; error err; tie(wlink, err) = wc->_openwatch(); if (err != nil) return make_pair(nil, E(err)); Conn wconn = adoptref(new _Conn()); wconn->_wc = wc; wconn->at = at; wconn->_wlink = wlink; context::Context pinCtx; tie(pinCtx, wconn->_pinCancel) = context::with_cancel(context::background()); wconn->_pinWG = sync::NewWorkGroup(pinCtx); wconn->_pinWG->go([wconn](context::Context ctx) -> error { return wconn->_pinner(ctx); }); return make_pair(wconn, nil); } static global<error> errConnClosed = errors::New("connection closed"); // close releases resources associated with wconn. // // opened fileh and mappings become invalid to use except close and unmap. error _Conn::close() { _Conn& wconn = *this; xerr::Contextf E("wcfs %s: close conn @%s", v(wconn._wc->mountpoint), v(wconn.at)); // XXX + conn # e.g. from wconn._wlink.id? or wlink.close should include its id itself? error err, eret; auto reterr1 = [&eret](error err) { if (eret == nil && err != nil) eret = err; }; wconn._downMu.lock(); wconn._downErr = errConnClosed; // XXX ok to change even if it was !nil before? wconn._downMu.unlock(); err = wconn._wlink->close(); if (err != nil) reterr1(err); wconn._pinCancel(); err = wconn._pinWG->wait(); if (!errors::Is(err, context::canceled)) // canceled - ok reterr1(err); // close all files - both that have no mappings and that still have opened mappings. // // NOTE after file is closed mappings could continue to survive, but we can no // longer maintain consistent view. For this reason we change mappings to // something that gives EFAULT on access. XXX implement wconn._filehmu.lock(); defer([&]() { wconn._filehmu.unlock(); }); for (auto _ : wconn._filehtab) { auto f = _.second; err = f->_headf->close(); // XXX mark fileh as down so that fileh.close does not say "bad fd" if (err != nil) reterr1(err); f->_headf = nil; // XXX stop watching f } wconn._filehtab.clear(); return E(eret); } // _pinner receives pin messages from wcfs and adjusts wconn mappings. error _Conn::_pinner(context::Context ctx) { _Conn& wconn = *this; error err = wconn.__pinner(ctx); // if pinner fails, wcfs will kill us. // log pinner error so the error is not hidden. // print to stderr as well as by default log does not print to there. // XXX also catch panic/exc ? if (!(err == nil || errors::Is(err, context::canceled))) { // canceled = .close asks pinner to stop log::Fatalf("CRITICAL: %s", v(err)); log::Fatalf("CRITICAL: wcfs server will likely kill us soon."); fprintf(stderr, "CRITICAL: %s\n", v(err)); fprintf(stderr, "CRITICAL: wcfs server will likely kill us soon.\n"); } // mark the connection non-operational if the pinner fails wconn._downMu.lock(); if (wconn._downErr == nil) { wconn._downErr = fmt::errorf("no longer operational due to: %w", err); // XXX make all fileh and mapping invalid. } wconn._downMu.unlock(); return err; } error _Conn::__pinner(context::Context ctx) { _Conn& wconn = *this; xerr::Contextf E("pinner"); // NOTE pinner error goes to Conn::close who has its own context PinReq req; error err; while (1) { err = wconn._wlink->recvReq(ctx, &req); if (err != nil) { // it is ok if we receive EOF due to us closing the connection if (err == io::EOF_) { wconn._downMu.lock(); err = (wconn._downErr == errConnClosed) ? nil : io::ErrUnexpectedEOF; wconn._downMu.unlock(); } return E(err); } // we received request to pin/unpin file block. handle it err = wconn._pin1(&req); if (err != nil) { return E(err); } } } // pin1 handles one pin request received from wcfs. error _Conn::_pin1(PinReq *req) { _Conn& wconn = *this; xerr::Contextf E("pin f<%s> #%ld @%s", v(req->foid), req->blk, v(req->at)); error err = wconn.__pin1(req); // reply either ack or nak on error string ack = "ack"; if (err != nil) ack = fmt::sprintf("nak: %s", v(err)); error err2 = wconn._wlink->replyReq(context::background(), req, ack); // XXX ctx ok? if (err == nil) err = err2; return E(err); } error _Conn::__pin1(PinReq *req) { _Conn& wconn = *this; FileH f; bool ok; wconn._filehmu.lock(); tie(f, ok) = wconn._filehtab.get_(req->foid); if (!ok) { wconn._filehmu.unlock(); return fmt::errorf("unexpected pin: file not watched"); // why wcfs sent us this update? } // XXX relock wconn -> f for (auto mmap : f->_mmaps) { // XXX use ↑blk_start for binary search if (!(mmap->blk_start <= req->blk && req->blk < mmap->blk_stop())) continue; // blk ∉ mmap //trace("\tremmapblk %d @%s" % (req->blk, (v(req.at) if req.at else "head"))) // pin only if virtmem did not dirtied page corresponding to this block already // if virtmem dirtied the page - it will ask us to remmap it again after commit or abort. bool do_pin= true; error err; if (mmap->vma != NULL) { virt_lock(); BigFileH *virt_fileh = mmap->vma->fileh; TODO (mmap->fileh->blksize != virt_fileh->ramh->ram->pagesize); do_pin = !__fileh_page_isdirty(virt_fileh, req->blk); } if (do_pin) err = mmap->_remmapblk(req->blk, req->at); if (mmap->vma != NULL) virt_unlock(); // on error don't need to continue with other mappings - all fileh and // all mappings become marked invalid on pinner failure. // XXX all call wconn._down from here under wconn._filehmu lock? if (err != nil) return err; //trace("\t-> remmaped"); XXX } // update f._pinned // XXX do it before ^^^ remmapblk (so that e.g. concurrent // discard/writeout see correct f._pinned) ? if (req->at == TidHead) { f->_pinned.erase(req->blk); // unpin to @head } else { f->_pinned[req->blk] = req->at; } wconn._filehmu.unlock(); return nil; } // open opens FileH corresponding to ZBigFile foid. pair<FileH, error> _Conn::open(zodb::Oid foid) { _Conn& wconn = *this; error err; xerr::Contextf E("wcfs %s: conn @%s: open f<%s>", v(wconn._wc->mountpoint), v(wconn.at), v(foid)); wconn._filehmu.lock(); defer([&]() { wconn._filehmu.unlock(); }); if (wconn._downErr != nil) // XXX under filehmu or downMu ? return make_pair(nil, E(wconn._downErr)); FileH f; bool ok; tie(f, ok) = wconn._filehtab.get_(foid); if (ok) { // XXX incref open count return make_pair(f, nil); } // TODO perform open with filehmu released f = adoptref(new _FileH()); f->wconn = newref(&wconn); // XXX newref -> simpler? f->foid = foid; tie(f->_headf, err) = wconn._wc->_open(fmt::sprintf("head/bigfile/%s", v(foid))); if (err != nil) return make_pair(nil, E(err)); bool allok = false; defer([&]() { if (!allok) f->_headf->close(); }); struct stat st; err = f->_headf->stat(&st); if (err != nil) return make_pair(nil, E(err)); f->blksize = st.st_blksize; f->_headfsize = st.st_size; if (!(f->_headfsize % f->blksize == 0)) return make_pair(nil, E(fmt::errorf("wcfs bug: head/file size %% blksize != 0"))); wconn._filehtab[foid] = f; // start watching f string ack; tie(ack, err) = wconn._wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(wconn.at))); if (err != nil) return make_pair(nil, E(err)); if (ack != "ok") { // XXX unregister f from _filehtab return make_pair(nil, fmt::errorf("watch: %s", v(ack))); } allok = true; return make_pair(f, nil); } // close releases resources associated with FileH. // // Left fileh mappings become invalid to use except unmap. error _FileH::close() { _FileH& fileh = *this; Conn wconn = fileh.wconn; xerr::Contextf E("wcfs %s: conn @%s: close f<%s>", v(wconn->_wc->mountpoint), v(wconn->at), v(fileh.foid)); // remove fileh from wconn._filehtab // fileh.close can be called several times and after first call another // fileh could be opened for the same foid. Be careful not to erase it. wconn->_filehmu.lock(); // XXX decref open count if (wconn->_filehtab.get(fileh.foid)._ptr() == &fileh) // XXX _ptr() - or better teach `refptr<T> == T*` ? wconn->_filehtab.erase(fileh.foid); wconn->_filehmu.unlock(); return E(fileh._headf->close()); } // mmap creates file mapping representing file[blk_start +blk_len) data as of wconn.at database state. // // If vma != nil, created mapping is associated with that vma of user-space virtual memory manager. pair<Mapping, error> _FileH::mmap(int64_t blk_start, int64_t blk_len, VMA *vma) { _FileH& f = *this; xerr::Contextf E("wcfs %s: conn @%s: mmap f<%s> [blk%ld +blk%ld)", v(f.wconn->_wc->mountpoint), v(f.wconn->at), v(f.foid), blk_start, blk_len); // XXX (blk_start + blk_len) * blk_size overflow error err; if (blk_start < 0) panic("blk_start < 0"); if (blk_len < 0) panic("blk_len < 0"); int64_t blk_stop = blk_start + blk_len; // XXX overflow // XXX f locking? // create memory with head/f mapping and applied pins // mmap-in zeros after f.size (else access to memory after file.size will raise SIGBUS) int64_t start = blk_start*f.blksize; uint8_t *mem_start, *mem_stop; tie(mem_start, err) = mmap_ro(f._headf, start, blk_len*f.blksize); if (err != nil) return make_pair(nil, E(err)); mem_stop = mem_start + blk_len*f.blksize; int64_t stop = blk_stop*f.blksize; if (stop > f._headfsize) { uint8_t *zmem_start = mem_start + (max(f._headfsize/*XXX -1 ?*/, start) - start); err = mmap_zero_into_ro(zmem_start, mem_stop - zmem_start); if (err != nil) return make_pair(nil, E(err)); } Mapping mmap = adoptref(new _Mapping()); mmap->fileh = newref(&f); // XXX newref - simpler? mmap->blk_start = blk_start; mmap->mem_start = mem_start; mmap->mem_stop = mem_stop; mmap->vma = vma; for (auto _ : f._pinned) { // XXX keep f._pinned ↑blk and use binary search? int64_t blk = _.first; zodb::Tid rev = _.second; if (!(blk_start <= blk && blk < blk_stop)) continue; // blk out of this mapping mmap->_remmapblk(blk, rev); // XXX err? } f._mmaps.push_back(mmap); // XXX keep f._mmaps ↑blk_start if (vma != NULL) { vma->mmap_overlay_server = mmap._ptr(); // XXX +giveref } return make_pair(mmap, nil); } // resync resyncs connection and its mappings onto different database view. error _Conn::resync(zodb::Tid at) { _Conn& wconn = *this; xerr::Contextf E("wcfs %s: conn @%s: resync -> @%s", v(wconn._wc->mountpoint), v(wconn.at), v(at)); // XXX locking error err; wconn._downMu.lock(); err = wconn._downErr; wconn._downMu.unlock(); if (err != nil) return E(err); for (auto fit : wconn._filehtab) { zodb::Oid foid = fit.first; FileH f = fit.second; // XXX if file has no mappings and was not used during whole prev // cycle - forget and stop watching it // update f._headfsize and remmap to head/f zero regions that are now covered by head/f struct stat st; err = f->_headf->stat(&st); if (err != nil) return E(err); if ((size_t)st.st_blksize != f->blksize) // blksize must not change return E(fmt::errorf("wcfs bug: blksize changed: %zd -> %ld", f->blksize, st.st_blksize)); auto headfsize = st.st_size; if (!(f->_headfsize <= headfsize)) // head/file size ↑= return E(fmt::errorf("wcfs bug: head/file size not ↑=")); if (!(headfsize % f->blksize == 0)) return E(fmt::errorf("wcfs bug: head/file size %% blksize != 0")); for (auto mmap : f->_mmaps) { printf(" resync -> %s: unzero [%lu:%lu)", v(at), f->_headfsize/f->blksize, headfsize/f->blksize); uint8_t *mem_unzero_start = min(mmap->mem_stop, mmap->mem_start + (f->_headfsize - mmap->blk_start*f->blksize)); uint8_t *mem_unzero_stop = min(mmap->mem_stop, mmap->mem_start + (headfsize - mmap->blk_start*f->blksize)); if (mem_unzero_stop - mem_unzero_start > 0) { err = mmap_into_ro(mem_unzero_start, mem_unzero_stop-mem_unzero_start, f->_headf, f->_headfsize); if (err != nil) return E(err); } } f->_headfsize = headfsize; string ack; tie(ack, err) = wconn._wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(at))); if (err != nil) return E(err); if (ack != "ok") { // XXX unregister f from _filehtab return E(fmt::errorf("%s", v(ack))); } } wconn.at = at; return nil; } // _remmapblk remmaps mapping memory for file[blk] to be viewing database as of @at state. // // at=TidHead means unpin to head/ . // NOTE this does not check whether virtmem already mapped blk as RW. error _Mapping::_remmapblk(int64_t blk, zodb::Tid at) { _Mapping *mmap = this; FileH f = mmap->fileh; xerr::Contextf E("wcfs %s: conn @%s: f<%s>: remmapblk #%ld @%s", v(f->wconn->_wc->mountpoint), v(f->wconn->at), v(f->foid), blk, v(at)); ASSERT(mmap->blk_start <= blk && blk < mmap->blk_stop()); error err; uint8_t *blkmem = mmap->mem_start + (blk - mmap->blk_start)*f->blksize; os::File fsfile; bool fclose = false; if (at == TidHead) { fsfile = f->_headf; } else { // TODO share @rev fd until wconn is resynced? tie(fsfile, err) = f->wconn->_wc->_open( fmt::sprintf("@%s/bigfile/%s", v(at), v(f->foid))); if (err != nil) return E(err); fclose = true; } defer([&]() { if (fclose) fsfile->close(); }); struct stat st; err = fsfile->stat(&st); if (err != nil) return E(err); if ((size_t)st.st_blksize != f->blksize) return E(fmt::errorf("wcfs bug: blksize changed: %zd -> %ld", f->blksize, st.st_blksize)); // block is beyond file size - mmap with zeros - else access to memory // after file.size will raise SIGBUS. (assumes head/f size ↑=) if ((blk+1)*f->blksize > (size_t)st.st_size) { err = mmap_zero_into_ro(blkmem, 1*f->blksize); if (err != nil) return E(err); } // block is inside file - mmap file data else { err = mmap_into_ro(blkmem, 1*f->blksize, fsfile, blk*f->blksize); if (err != nil) return E(err); } return nil; } // remmap_blk remmaps file[blk] in its place again. // virtmem calls this to remmap a block after RW dirty page was e.g. discarded. void _Mapping::remmap_blk(int64_t blk) { _Mapping& mmap = *this; // XXX locking if (!(mmap.blk_start <= blk && blk < mmap.blk_stop())) panic("remmap_blk: blk out of Mapping range"); // blkrev = rev | @head zodb::Tid blkrev; bool ok; tie(blkrev, ok) = mmap.fileh->_pinned.get_(blk); if (!ok) blkrev = TidHead; error err = mmap._remmapblk(blk, blkrev); if (err != nil) panic(v(err)); // XXX } // unmap removes mapping memory from address space. // virtmem calls this when VMA is unmapped. void _Mapping::unmap() { Mapping mmap = newref(this); // XXX newref for std::remove // XXX locking error err = mm::unmap(mmap->mem_start, mmap->mem_stop - mmap->mem_start); if (err != nil) panic("TODO"); // XXX mmap->mem_start = NULL; mmap->mem_stop = NULL; // XXX clear other fields? FileH f = mmap->fileh; //f->_mmaps.remove(mmap); // FIXME keep mmaps sorted f->_mmaps.erase( std::remove(f->_mmaps.begin(), f->_mmaps.end(), mmap), f->_mmaps.end()); } // ---- WCFS raw file access ---- // _path returns path for object on wcfs. // - str: wcfs root + obj; string WCFS::_path(const string &obj) { WCFS *wc = this; return wc->mountpoint + "/" + obj; } tuple<os::File, error> WCFS::_open(const string &path, int flags) { WCFS *wc = this; string path_ = wc->_path(path); return os::open(path_, flags); } // ---- misc ---- // mmap_zero_into_ro mmaps read-only zeros into [addr +size) so that region as all zeros. // created mapping, even after it is accessed, does not consume memory. static error mmap_zero_into_ro(void *addr, size_t size) { xerr::Contextf E("mmap zero"); // mmap /dev/zero with MAP_NORESERVE and MAP_SHARED // this way the mapping will be able to be read, but no memory will be allocated to keep it. os::File z; error err; tie(z, err) = os::open("/dev/zero"); if (err != nil) return E(err); defer([&]() { z->close(); }); err = mm::map_into(addr, size, PROT_READ, MAP_SHARED | MAP_NORESERVE, z, 0); if (err != nil) return E(err); return nil; } // mmap_ro mmaps read-only fd[offset +size). // The mapping is created with MAP_SHARED. static tuple<uint8_t*, error> mmap_ro(os::File f, off_t offset, size_t size) { return mm::map(PROT_READ, MAP_SHARED, f, offset, size); } // mmap_into_ro mmaps read-only fd[offset +size) into [addr +size). // The mapping is created with MAP_SHARED. static error mmap_into_ro(void *addr, size_t size, os::File f, off_t offset) { return mm::map_into(addr, size, PROT_READ, MAP_SHARED, f, offset); } _Conn::_Conn() {} _Conn::~_Conn() {} void _Conn::decref() { if (__decref()) delete this; } _FileH::_FileH() {} _FileH::~_FileH() {} void _FileH::decref() { if (__decref()) delete this; } _Mapping::_Mapping() {} _Mapping::~_Mapping() {} void _Mapping::decref() { if (__decref()) delete this; } dict<int64_t, zodb::Tid> _tfileh_pinned(FileH fileh) { return fileh->_pinned; } } // wcfs::