// Copyright (C) 2018-2020  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.

// Package wcfs provides WCFS client integrated with user-space virtual memory manager.
// See wcfs.h for package overview.

// Wcfs client organization
//
// - need to maintain pinner map registry
//
//      [blk< +len) -> vma      ↓blk
//      #blk        -> []vma covering it
//
// - we can reuse virtmem code:
//
//   XXX review text
//
//   if RW page is added to mapping, pinner does not need to split the mmap -
//   it just needs to check before overwriting that page with @revX/data
//   whether RAM page was mmaped by virtmem or not. If RW page is already
//   there, pinner does not overwrite it.
//
//   RW -> Uptodate: Virtmem calls pinner to remmap the page RO after commit or
//   abort.
//
//
//  VMA     -> fileh -> file
//   ↑↓          ↑↓ (XXX here ?)
// Mapping     FileH
//

#include "wcfs_misc.h"
#include "wcfs.h"
#include "wcfs_watchlink.h"

#include <wendelin/bigfile/virtmem.h>
#include <wendelin/bigfile/ram.h>

#include <golang/errors.h>
#include <golang/fmt.h>
#include <golang/io.h>

#include <algorithm>
#include <string>
#include <vector>

#include <sys/types.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>

using std::min;
using std::max;
using std::vector;

// wcfs::
namespace wcfs {

static error mmap_zero_into_ro(void *addr, size_t size);
static tuple<uint8_t*, error> mmap_ro(os::File f, off_t offset, size_t size);
static error mmap_into_ro(void *addr, size_t size, os::File f, off_t offset);

// connect creates new Conn viewing WCFS state as of @at.
pair<Conn, error> WCFS::connect(zodb::Tid at) {
    WCFS *wc = this;
    xerr::Contextf E("wcfs %s: connect @%s", v(wc->mountpoint), v(at));

    // TODO support !isolated mode

    WatchLink wlink;
    error err;
    tie(wlink, err) = wc->_openwatch();
    if (err != nil)
        return make_pair(nil, E(err));

    Conn wconn = adoptref(new _Conn());
    wconn->_wc      = wc;
    wconn->at       = at;
    wconn->_wlink   = wlink;

    context::Context pinCtx;
    tie(pinCtx, wconn->_pinCancel) = context::with_cancel(context::background());
    wconn->_pinWG = sync::NewWorkGroup(pinCtx);
    wconn->_pinWG->go([wconn](context::Context ctx) -> error {
        return wconn->_pinner(ctx);
    });

    return make_pair(wconn, nil);
}


static global<error> errConnClosed = errors::New("connection closed");

// close releases resources associated with wconn.
//
// opened fileh and mappings become invalid to use except close and unmap.
error _Conn::close() {
    _Conn& wconn = *this;
    xerr::Contextf E("wcfs %s: close conn @%s", v(wconn._wc->mountpoint), v(wconn.at));
    // XXX + conn # e.g. from wconn._wlink.id? or wlink.close should include its id itself?

    error err, eret;
    auto reterr1 = [&eret](error err) {
        if (eret == nil && err != nil)
            eret = err;
    };

    wconn._downMu.lock();
    wconn._downErr = errConnClosed; // XXX ok to change even if it was !nil before?
    wconn._downMu.unlock();

    err = wconn._wlink->close();
    if (err != nil)
        reterr1(err);
    wconn._pinCancel();
    err = wconn._pinWG->wait();
    if (!errors::Is(err, context::canceled)) // canceled - ok
        reterr1(err);

    // close all files - both that have no mappings and that still have opened mappings.
    //
    // NOTE after file is closed mappings could continue to survive, but we can no
    // longer maintain consistent view. For this reason we change mappings to
    // something that gives EFAULT on access. XXX implement
    wconn._filehmu.lock();
    defer([&]() {
        wconn._filehmu.unlock();
    });

    for (auto _ : wconn._filehtab) {
        auto f = _.second;
        err = f->_headf->close(); // XXX mark fileh as down so that fileh.close does not say "bad fd"
        if (err != nil)
            reterr1(err);
        f->_headf = nil;

        // XXX stop watching f
    }

    wconn._filehtab.clear();

    return E(eret);
}

// _pinner receives pin messages from wcfs and adjusts wconn mappings.
error _Conn::_pinner(context::Context ctx) {
    _Conn& wconn = *this;
    error err = wconn.__pinner(ctx);

    // if pinner fails, wcfs will kill us.
    // log pinner error so the error is not hidden.
    // print to stderr as well as by default log does not print to there.
    // XXX also catch panic/exc ?
    if (!(err == nil || errors::Is(err, context::canceled))) { // canceled = .close asks pinner to stop
        log::Fatalf("CRITICAL: %s", v(err));
        log::Fatalf("CRITICAL: wcfs server will likely kill us soon.");
        fprintf(stderr, "CRITICAL: %s\n", v(err));
        fprintf(stderr, "CRITICAL: wcfs server will likely kill us soon.\n");
    }

    // mark the connection non-operational if the pinner fails
    wconn._downMu.lock();
    if (wconn._downErr == nil) {
        wconn._downErr = fmt::errorf("no longer operational due to: %w", err);
        // XXX make all fileh and mapping invalid.
    }
    wconn._downMu.unlock();

    return err;
}

error _Conn::__pinner(context::Context ctx) {
    _Conn& wconn = *this;
    xerr::Contextf E("pinner"); // NOTE pinner error goes to Conn::close who has its own context

    PinReq req;
    error  err;

    while (1) {
        err = wconn._wlink->recvReq(ctx, &req);
        if (err != nil) {
            // it is ok if we receive EOF due to us closing the connection
            if (err == io::EOF_) {
                wconn._downMu.lock();
                err = (wconn._downErr == errConnClosed) ? nil : io::ErrUnexpectedEOF;
                wconn._downMu.unlock();
            }
            return E(err);
        }

        // we received request to pin/unpin file block. handle it
        err = wconn._pin1(&req);
        if (err != nil) {
            return E(err);
        }
    }
}

// pin1 handles one pin request received from wcfs.
error _Conn::_pin1(PinReq *req) {
    _Conn& wconn = *this;
    xerr::Contextf E("pin f<%s> #%ld @%s", v(req->foid), req->blk, v(req->at));

    error err = wconn.__pin1(req);

    // reply either ack or nak on error
    string ack = "ack";
    if (err != nil)
        ack = fmt::sprintf("nak: %s", v(err));
    error err2 = wconn._wlink->replyReq(context::background(), req, ack);    // XXX ctx ok?
    if (err == nil)
        err = err2;

    return E(err);
}

error _Conn::__pin1(PinReq *req) {
    _Conn& wconn = *this;

    FileH f;
    bool  ok;
    wconn._filehmu.lock();
    tie(f, ok) = wconn._filehtab.get_(req->foid);
    if (!ok) {
        wconn._filehmu.unlock();
        return fmt::errorf("unexpected pin: file not watched"); // why wcfs sent us this update?
    }


    // XXX relock wconn -> f

    for (auto mmap : f->_mmaps) {    // XXX use ↑blk_start for binary search
        if (!(mmap->blk_start <= req->blk && req->blk < mmap->blk_stop()))
            continue;   // blk ∉ mmap

        //trace("\tremmapblk %d @%s" % (req->blk, (v(req.at) if req.at else "head")))

        // pin only if virtmem did not dirtied page corresponding to this block already
        // if virtmem dirtied the page - it will ask us to remmap it again after commit or abort.
        bool do_pin= true;
        error err;
        if (mmap->vma != NULL) {
            virt_lock();
            BigFileH *virt_fileh = mmap->vma->fileh;
            TODO (mmap->fileh->blksize != virt_fileh->ramh->ram->pagesize);
            do_pin = !__fileh_page_isdirty(virt_fileh, req->blk);
        }

        if (do_pin)
            err = mmap->_remmapblk(req->blk, req->at);

        if (mmap->vma != NULL)
            virt_unlock();

        // on error don't need to continue with other mappings - all fileh and
        // all mappings become marked invalid on pinner failure.
        // XXX all call wconn._down from here under wconn._filehmu lock?
        if (err != nil)
            return err;

        //trace("\t-> remmaped");   XXX
    }

    // update f._pinned
    // XXX do it before ^^^ remmapblk (so that e.g. concurrent
    // discard/writeout see correct f._pinned) ?
    if (req->at == TidHead) {
        f->_pinned.erase(req->blk);      // unpin to @head
    }
    else {
        f->_pinned[req->blk] = req->at;
    }

    wconn._filehmu.unlock();
    return nil;
}

// open opens FileH corresponding to ZBigFile foid.
pair<FileH, error> _Conn::open(zodb::Oid foid) {
    _Conn& wconn = *this;
    error err;
    xerr::Contextf E("wcfs %s: conn @%s: open f<%s>", v(wconn._wc->mountpoint), v(wconn.at), v(foid));

    wconn._filehmu.lock();
    defer([&]() {
        wconn._filehmu.unlock();
    });

    if (wconn._downErr != nil)  // XXX under filehmu or downMu ?
        return make_pair(nil, E(wconn._downErr));

    FileH f; bool ok;
    tie(f, ok) = wconn._filehtab.get_(foid);
    if (ok) {
        // XXX incref open count
        return make_pair(f, nil);
    }

    // TODO perform open with filehmu released

    f = adoptref(new _FileH());
    f->wconn    = newref(&wconn);    // XXX newref -> simpler?
    f->foid     = foid;
    tie(f->_headf, err)
                = wconn._wc->_open(fmt::sprintf("head/bigfile/%s", v(foid)));
    if (err != nil)
        return make_pair(nil, E(err));

    bool allok = false;
    defer([&]() {
        if (!allok)
            f->_headf->close();
    });

    struct stat st;
    err = f->_headf->stat(&st);
    if (err != nil)
        return make_pair(nil, E(err));
    f->blksize    = st.st_blksize;
    f->_headfsize = st.st_size;
    if (!(f->_headfsize % f->blksize == 0))
        return make_pair(nil, E(fmt::errorf("wcfs bug: head/file size %% blksize != 0")));

    wconn._filehtab[foid] = f;

    // start watching f
    string ack;
    tie(ack, err) = wconn._wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(wconn.at)));
    if (err != nil)
        return make_pair(nil, E(err));
    if (ack != "ok") {
        // XXX unregister f from _filehtab
        return make_pair(nil, fmt::errorf("watch: %s", v(ack)));
    }

    allok = true;
    return make_pair(f, nil);
}

// close releases resources associated with FileH.
//
// Left fileh mappings become invalid to use except unmap.
error _FileH::close() {
    _FileH& fileh = *this;
    Conn    wconn = fileh.wconn;
    xerr::Contextf E("wcfs %s: conn @%s: close f<%s>", v(wconn->_wc->mountpoint), v(wconn->at), v(fileh.foid));

    // remove fileh from wconn._filehtab
    // fileh.close can be called several times and after first call another
    // fileh could be opened for the same foid. Be careful not to erase it.
    wconn->_filehmu.lock();
    // XXX decref open count
    if (wconn->_filehtab.get(fileh.foid)._ptr() == &fileh) // XXX _ptr() - or better teach `refptr<T> == T*` ?
        wconn->_filehtab.erase(fileh.foid);
    wconn->_filehmu.unlock();

    return E(fileh._headf->close());
}

// mmap creates file mapping representing file[blk_start +blk_len) data as of wconn.at database state.
//
// If vma != nil, created mapping is associated with that vma of user-space virtual memory manager.
pair<Mapping, error> _FileH::mmap(int64_t blk_start, int64_t blk_len, VMA *vma) {
    _FileH& f = *this;
    xerr::Contextf E("wcfs %s: conn @%s: mmap f<%s> [blk%ld +blk%ld)",
            v(f.wconn->_wc->mountpoint), v(f.wconn->at), v(f.foid), blk_start, blk_len);

    // XXX (blk_start + blk_len) * blk_size overflow
    error err;

    if (blk_start < 0)
        panic("blk_start < 0");
    if (blk_len < 0)
        panic("blk_len < 0");

    int64_t blk_stop = blk_start + blk_len;     // XXX overflow

    // XXX f locking?

    // create memory with head/f mapping and applied pins
    // mmap-in zeros after f.size (else access to memory after file.size will raise SIGBUS)
    int64_t start = blk_start*f.blksize;
    uint8_t *mem_start, *mem_stop;
    tie(mem_start, err) = mmap_ro(f._headf, start, blk_len*f.blksize);
    if (err != nil)
        return make_pair(nil, E(err));
    mem_stop = mem_start + blk_len*f.blksize;

    int64_t stop = blk_stop*f.blksize;
    if (stop > f._headfsize) {
        uint8_t *zmem_start = mem_start + (max(f._headfsize/*XXX -1 ?*/, start) - start);
        err = mmap_zero_into_ro(zmem_start, mem_stop - zmem_start);
        if (err != nil)
            return make_pair(nil, E(err));
    }

    Mapping mmap = adoptref(new _Mapping());
    mmap->fileh     = newref(&f);   // XXX newref - simpler?
    mmap->blk_start = blk_start;
    mmap->mem_start = mem_start;
    mmap->mem_stop  = mem_stop;
    mmap->vma       = vma;

    for (auto _ : f._pinned) {  // XXX keep f._pinned ↑blk and use binary search?
        int64_t    blk = _.first;
        zodb::Tid  rev = _.second;
        if (!(blk_start <= blk && blk < blk_stop))
            continue;   // blk out of this mapping
        mmap->_remmapblk(blk, rev); // XXX err?
    }

    f._mmaps.push_back(mmap);    // XXX keep f._mmaps ↑blk_start

    if (vma != NULL) {
        vma->mmap_overlay_server = mmap._ptr(); // XXX +giveref
    }

    return make_pair(mmap, nil);
}

// resync resyncs connection and its mappings onto different database view.
error _Conn::resync(zodb::Tid at) {
    _Conn& wconn = *this;
    xerr::Contextf E("wcfs %s: conn @%s: resync -> @%s", v(wconn._wc->mountpoint), v(wconn.at), v(at));

    // XXX locking
    error err;

    wconn._downMu.lock();
    err = wconn._downErr;
    wconn._downMu.unlock();
    if (err != nil)
        return E(err);

    for (auto fit : wconn._filehtab) {
        zodb::Oid  foid = fit.first;
        FileH      f    = fit.second;

        // XXX if file has no mappings and was not used during whole prev
        // cycle - forget and stop watching it

        // update f._headfsize and remmap to head/f zero regions that are now covered by head/f
        struct stat st;
        err = f->_headf->stat(&st);
        if (err != nil)
            return E(err);

        if ((size_t)st.st_blksize != f->blksize)    // blksize must not change
            return E(fmt::errorf("wcfs bug: blksize changed: %zd -> %ld", f->blksize, st.st_blksize));
        auto headfsize = st.st_size;
        if (!(f->_headfsize <= headfsize))   // head/file size ↑=
            return E(fmt::errorf("wcfs bug: head/file size not ↑="));
        if (!(headfsize % f->blksize == 0))
            return E(fmt::errorf("wcfs bug: head/file size %% blksize != 0"));

        for (auto mmap : f->_mmaps) {
            printf("  resync -> %s: unzero [%lu:%lu)", v(at), f->_headfsize/f->blksize, headfsize/f->blksize);
            uint8_t *mem_unzero_start = min(mmap->mem_stop,
                            mmap->mem_start + (f->_headfsize - mmap->blk_start*f->blksize));
            uint8_t *mem_unzero_stop  = min(mmap->mem_stop,
                            mmap->mem_start + (headfsize   - mmap->blk_start*f->blksize));

            if (mem_unzero_stop - mem_unzero_start > 0) {
                err = mmap_into_ro(mem_unzero_start, mem_unzero_stop-mem_unzero_start, f->_headf, f->_headfsize);
                if (err != nil)
                    return E(err);
            }
        }

        f->_headfsize = headfsize;


        string ack;
        tie(ack, err) = wconn._wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(at)));
        if (err != nil)
            return E(err);
        if (ack != "ok") {
            // XXX unregister f from _filehtab
            return E(fmt::errorf("%s", v(ack)));
        }
    }

    wconn.at = at;
    return nil;
}

// _remmapblk remmaps mapping memory for file[blk] to be viewing database as of @at state.
//
// at=TidHead means unpin to head/ .
// NOTE this does not check whether virtmem already mapped blk as RW.
error _Mapping::_remmapblk(int64_t blk, zodb::Tid at) {
    _Mapping *mmap = this;
    FileH f = mmap->fileh;
    xerr::Contextf E("wcfs %s: conn @%s: f<%s>: remmapblk #%ld @%s",
            v(f->wconn->_wc->mountpoint), v(f->wconn->at), v(f->foid), blk, v(at));


    ASSERT(mmap->blk_start <= blk && blk < mmap->blk_stop());
    error err;

    uint8_t *blkmem = mmap->mem_start + (blk - mmap->blk_start)*f->blksize;
    os::File fsfile;
    bool fclose = false;
    if (at == TidHead) {
        fsfile = f->_headf;
    }
    else {
        // TODO share @rev fd until wconn is resynced?
        tie(fsfile, err) = f->wconn->_wc->_open(
                fmt::sprintf("@%s/bigfile/%s", v(at), v(f->foid)));
        if (err != nil)
            return E(err);
        fclose = true;
    }
    defer([&]() {
        if (fclose)
            fsfile->close();
    });

    struct stat st;
    err = fsfile->stat(&st);
    if (err != nil)
        return E(err);
    if ((size_t)st.st_blksize != f->blksize)
        return E(fmt::errorf("wcfs bug: blksize changed: %zd -> %ld", f->blksize, st.st_blksize));

    // block is beyond file size - mmap with zeros - else access to memory
    // after file.size will raise SIGBUS. (assumes head/f size ↑=)
    if ((blk+1)*f->blksize > (size_t)st.st_size) {
        err = mmap_zero_into_ro(blkmem, 1*f->blksize);
        if (err != nil)
            return E(err);
    }
    // block is inside file - mmap file data
    else {
        err = mmap_into_ro(blkmem, 1*f->blksize, fsfile, blk*f->blksize);
        if (err != nil)
            return E(err);
    }

    return nil;
}

// remmap_blk remmaps file[blk] in its place again.
// virtmem calls this to remmap a block after RW dirty page was e.g. discarded.
void _Mapping::remmap_blk(int64_t blk) {
    _Mapping& mmap = *this;

    // XXX locking
    if (!(mmap.blk_start <= blk && blk < mmap.blk_stop()))
        panic("remmap_blk: blk out of Mapping range");

    // blkrev = rev | @head
    zodb::Tid blkrev; bool ok;
    tie(blkrev, ok) = mmap.fileh->_pinned.get_(blk);
    if (!ok)
        blkrev = TidHead;

    error err = mmap._remmapblk(blk, blkrev);
    if (err != nil)
        panic(v(err)); // XXX
}

// unmap removes mapping memory from address space.
// virtmem calls this when VMA is unmapped.
void _Mapping::unmap() {
    Mapping mmap = newref(this);    // XXX newref for std::remove

    // XXX locking

    error err = mm::unmap(mmap->mem_start, mmap->mem_stop - mmap->mem_start);
    if (err != nil)
        panic("TODO");  // XXX
    mmap->mem_start = NULL;
    mmap->mem_stop  = NULL;
    // XXX clear other fields?

    FileH f = mmap->fileh;
    //f->_mmaps.remove(mmap);

    // FIXME keep mmaps sorted
    f->_mmaps.erase(
        std::remove(f->_mmaps.begin(), f->_mmaps.end(), mmap),
        f->_mmaps.end());
}


// ---- WCFS raw file access ----

// _path returns path for object on wcfs.
// - str:        wcfs root + obj;
string WCFS::_path(const string &obj) {
    WCFS *wc = this;

    return wc->mountpoint + "/" + obj;
}

tuple<os::File, error> WCFS::_open(const string &path, int flags) {
    WCFS *wc = this;
    string path_ = wc->_path(path);
    return os::open(path_, flags);
}


// ---- misc ----

// mmap_zero_into_ro mmaps read-only zeros into [addr +size) so that region as all zeros.
// created mapping, even after it is accessed, does not consume memory.
static error mmap_zero_into_ro(void *addr, size_t size) {
    xerr::Contextf E("mmap zero");

    // mmap /dev/zero with MAP_NORESERVE and MAP_SHARED
    // this way the mapping will be able to be read, but no memory will be allocated to keep it.
    os::File z;
    error err;
    tie(z, err) = os::open("/dev/zero");
    if (err != nil)
        return E(err);
    defer([&]() {
        z->close();
    });
    err = mm::map_into(addr, size, PROT_READ, MAP_SHARED | MAP_NORESERVE, z, 0);
    if (err != nil)
        return E(err);
    return nil;
}

// mmap_ro mmaps read-only fd[offset +size).
// The mapping is created with MAP_SHARED.
static tuple<uint8_t*, error> mmap_ro(os::File f, off_t offset, size_t size) {
    return mm::map(PROT_READ, MAP_SHARED, f, offset, size);
}

// mmap_into_ro mmaps read-only fd[offset +size) into [addr +size).
// The mapping is created with MAP_SHARED.
static error mmap_into_ro(void *addr, size_t size, os::File f, off_t offset) {
    return mm::map_into(addr, size, PROT_READ, MAP_SHARED, f, offset);
}


_Conn::_Conn()  {}
_Conn::~_Conn() {}
void _Conn::decref() {
    if (__decref())
        delete this;
}

_FileH::_FileH()  {}
_FileH::~_FileH() {}
void _FileH::decref() {
    if (__decref())
        delete this;
}

_Mapping::_Mapping()  {}
_Mapping::~_Mapping() {}
void _Mapping::decref() {
    if (__decref())
        delete this;
}


dict<int64_t, zodb::Tid> _tfileh_pinned(FileH fileh) {
    return fileh->_pinned;
}

}   // wcfs::