Commit 0ec8ce51 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e728f5db
# -*- coding: utf-8 -*-
# Copyright (C) 2018-2020 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
# cython: language_level=2
# distutils: language=c++
# Package _wcfs provides Python-wrappers for C++ wcfs client package.
#
# It wraps WCFS/Conn/FileH/Mapping and WatchLink to help client_test.py unit-test
# WCFS base-layer mmap functionality. At functional level WCFS client (and especially
# pinner) is verified when running wendelin.core array tests in wcfs mode.
from golang cimport chan, structZ, string, error, refptr
from golang cimport context, cxx
from libc.stdint cimport int64_t, uint64_t, uint8_t
from libcpp.utility cimport pair
from libcpp.vector cimport vector
cdef extern from "wcfs/client/wcfs_misc.h" namespace "zodb" nogil:
ctypedef uint64_t Tid
ctypedef uint64_t Oid
cdef extern from "wcfs/client/wcfs_misc.h" namespace "wcfs" nogil:
const Tid TidHead
# pyx/nogil description for C++ classes
cdef extern from "wcfs/client/wcfs_watchlink.h" namespace "wcfs" nogil:
cppclass _WatchLink:
error close()
error closeWrite()
pair[string, error] sendReq(context.Context ctx, const string &req)
error recvReq(context.Context ctx, PinReq *prx)
error replyReq(context.Context ctx, const PinReq *req, const string& reply);
vector[string] fatalv
chan[structZ] rx_eof
cppclass WatchLink (refptr[_WatchLink]):
# WatchLink.X = WatchLink->X in C++
error close "_ptr()->close" ()
error closeWrite "_ptr()->closeWrite"()
pair[string, error] sendReq "_ptr()->sendReq" (context.Context ctx, const string &req)
error recvReq "_ptr()->recvReq" (context.Context ctx, PinReq *prx)
error replyReq "_ptr()->replyReq" (context.Context ctx, const PinReq *req, const string& reply);
vector[string] fatalv "_ptr()->fatalv"
chan[structZ] rx_eof "_ptr()->rx_eof"
cppclass PinReq:
Oid foid
int64_t blk
Tid at
string msg
error _twlinkwrite(WatchLink wlink, const string& pkt)
cdef extern from "wcfs/client/wcfs.h" namespace "wcfs" nogil:
cppclass WCFS:
string mountpoint
pair[WatchLink, error] _openwatch()
pair[Conn, error] connect(Tid at)
cppclass _Conn:
pair[FileH, error] open(Oid foid)
error close()
error resync(Tid at)
cppclass Conn (refptr[_Conn]):
# Conn.X = Conn->X in C++
pair[FileH, error] open "_ptr()->open" (Oid foid)
error close "_ptr()->close" ()
error resync "_ptr()->resync" (Tid at)
cppclass _FileH:
size_t blksize
error close()
pair[Mapping, error] mmap(int64_t blk_start, int64_t blk_len) # `VMA *vma=nil` not exposed
cppclass FileH (refptr[_FileH]):
# FileH.X = FileH->X in C++
size_t blksize "_ptr()->blksize"
error close "_ptr()->close" ()
pair[Mapping, error] mmap "_ptr()->mmap" (int64_t blk_start, int64_t blk_len)
cppclass _Mapping:
FileH fileh
int64_t blk_start
int64_t blk_stop() const
uint8_t *mem_start
uint8_t *mem_stop
error unmap()
cppclass Mapping (refptr[_Mapping]):
# Mapping.X = Mapping->X in C++
FileH fileh "_ptr()->fileh"
int64_t blk_start "_ptr()->blk_start"
int64_t blk_stop "_ptr()->blk_stop" () const
uint8_t *mem_start "_ptr()->mem_start"
uint8_t *mem_stop "_ptr()->mem_stop"
error unmap "_ptr()->unmap" ()
cxx.dict[int64_t, Tid] _tfileh_pinned(FileH wfileh)
# ---- python bits ----
cdef class PyWCFS:
cdef WCFS wc
cdef class PyConn:
cdef Conn wconn
cdef class PyFileH:
cdef FileH wfileh
cdef class PyMapping:
cdef Mapping wmmap
cdef readonly PyFileH fileh
cdef class PyWatchLink:
cdef WatchLink wlink
cdef class PyPinReq:
cdef PinReq pinreq
# -*- coding: utf-8 -*-
# Copyright (C) 2018-2020 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
# cython: language_level=2
# cython: auto_pickle=False
# distutils: language=c++
# Package _wcfs provides Python-wrappers for C++ wcfs client package.
# See _wcfs.pxd for package overview.
from golang cimport pychan, pyerror, nil
from golang cimport io
cdef extern from *:
ctypedef bint cbool "bool"
from ZODB.utils import p64, u64
from cpython cimport PyBuffer_FillInfo
from libcpp.unordered_map cimport unordered_map
cdef class PyWCFS:
property mountpoint:
def __get__(PyWCFS pywc):
return pywc.wc.mountpoint
def __set__(PyWCFS pywc, string v):
pywc.wc.mountpoint = v
def connect(PyWCFS pywc, pyat): # -> PyConn
cdef Tid at = u64(pyat)
with nogil:
_ = wcfs_connect_pyexc(&pywc.wc, at)
wconn = _.first
err = _.second
if err != nil:
raise pyerr(err)
cdef PyConn pywconn = PyConn.__new__(PyConn)
pywconn.wconn = wconn
return pywconn
cdef class PyConn:
def __dealloc__(PyConn pywconn):
pywconn.wconn = nil
def close(PyConn pywconn):
with nogil:
err = wconn_close_pyexc(pywconn.wconn)
if err != nil:
raise pyerr(err)
def open(PyConn pywconn, pyfoid): # -> FileH
cdef Oid foid = u64(pyfoid)
with nogil:
_ = wconn_open_pyexc(pywconn.wconn, foid)
wfileh = _.first
err = _.second
if err != nil:
raise pyerr(err)
cdef PyFileH pywfileh = PyFileH.__new__(PyFileH)
pywfileh.wfileh = wfileh
return pywfileh
def resync(PyConn pywconn, pyat):
cdef Tid at = u64(pyat)
with nogil:
err = wconn_resync_pyexc(pywconn.wconn, at)
if err != nil:
raise pyerr(err)
cdef class PyFileH:
def __dealloc__(PyFileH pywfileh):
pywfileh.wfileh = nil
def close(PyFileH pywfileh):
with nogil:
err = wfileh_close_pyexc(pywfileh.wfileh)
if err != nil:
raise pyerr(err)
def mmap(PyFileH pywfileh, int64_t blk_start, int64_t blk_len):
with nogil:
_ = wfileh_mmap_pyexc(pywfileh.wfileh, blk_start, blk_len)
wmmap = _.first
err = _.second
if err != nil:
raise pyerr(err)
assert wmmap.fileh .eq (pywfileh.wfileh)
cdef PyMapping pywmmap = PyMapping.__new__(PyMapping)
pywmmap.wmmap = wmmap
pywmmap.fileh = pywfileh
return pywmmap
property blksize:
def __get__(PyFileH pywfileh):
return pywfileh.wfileh.blksize
# XXX for tests
property pinned:
def __get__(PyFileH pywfileh):
# XXX cast: needed for cython to automatically convert to py dict
cdef dict p = <unordered_map[int64_t, Tid]> _tfileh_pinned(pywfileh.wfileh)
for blk in p:
p[blk] = p64(p[blk]) # rev(int64) -> rev(bytes)
return p
cdef class PyMapping:
def __dealloc__(PyMapping pywmmap):
# unmap just in case (double unmap is ok)
with nogil:
err = wmmap_unmap_pyexc(pywmmap.wmmap)
pywmmap.wmmap = nil
if err != nil:
raise pyerr(err)
property blk_start:
def __get__(PyMapping pywmmap):
return pywmmap.wmmap.blk_start
property blk_stop:
def __get__(PyMapping pywmmap):
return pywmmap.wmmap.blk_stop()
def __getbuffer__(PyMapping pywmmap, Py_buffer *view, int flags):
PyBuffer_FillInfo(view, pywmmap, pywmmap.wmmap.mem_start,
pywmmap.wmmap.mem_stop - pywmmap.wmmap.mem_start, readonly=1, flags=flags)
property mem:
def __get__(PyMapping pywmmap) -> memoryview:
return memoryview(pywmmap)
def unmap(PyMapping pywmmap):
with nogil:
err = wmmap_unmap_pyexc(pywmmap.wmmap)
if err != nil:
raise pyerr(err)
# ----------------------------------------
cdef class PyWatchLink:
def __init__(PyWatchLink pywlink, PyWCFS pywc):
with nogil:
_ = wcfs_openwatch_pyexc(&pywc.wc)
pywlink.wlink = _.first
err = _.second
if err != nil:
raise pyerr(err)
def __dealloc__(PyWatchLink pywlink):
pywlink.wlink = nil
def close(PyWatchLink pywlink):
with nogil:
err = wlink_close_pyexc(pywlink.wlink)
if err != nil:
raise pyerr(err)
def closeWrite(PyWatchLink pywlink):
with nogil:
err = wlink_closeWrite_pyexc(pywlink.wlink)
if err != nil:
raise pyerr(err)
def sendReq(PyWatchLink pywlink, context.PyContext pyctx, string req): # -> reply(string)
with nogil:
_ = wlink_sendReq_pyexc(pywlink.wlink, pyctx.ctx, req)
reply = _.first
err = _.second
if err != nil:
raise pyerr(err)
return reply
def recvReq(PyWatchLink pywlink, context.PyContext pyctx): # -> PinReq | None when EOF
cdef PyPinReq pyreq = PyPinReq.__new__(PyPinReq)
with nogil:
err = wlink_recvReq_pyexc(pywlink.wlink, pyctx.ctx, &pyreq.pinreq)
if err.eq(io.EOF):
return None
if err != nil:
raise pyerr(err)
return pyreq
def replyReq(PyWatchLink pywlink, context.PyContext pyctx, PyPinReq pyreq, string reply):
with nogil:
err = wlink_replyReq_pyexc(pywlink.wlink, pyctx.ctx, &pyreq.pinreq, reply)
if err != nil:
raise pyerr(err)
return
# XXX for tests
property fatalv:
def __get__(PyWatchLink pywlink):
return pywlink.wlink.fatalv
property rx_eof:
def __get__(PyWatchLink pywlink):
return pychan.from_chan_structZ(pywlink.wlink.rx_eof)
cdef class PyPinReq:
property foid:
def __get__(PyPinReq pypin):
return p64(pypin.pinreq.foid)
property blk:
def __get__(PyPinReq pypin):
return pypin.pinreq.blk
property at:
def __get__(PyPinReq pypin):
at = pypin.pinreq.at
if at == TidHead:
return None
return p64(at)
# wcfs_test.py uses req.msg in several places
property msg:
def __get__(PyPinReq pypin):
return pypin.pinreq.msg
def _tpywlinkwrite(PyWatchLink pywlink, bytes pypkt):
cdef string pkt = pypkt
with nogil:
err = _twlinkwrite_pyexc(pywlink.wlink, pkt)
if err != nil:
raise pyerr(err)
# ---- misc ----
# pyerr converts error into python error.
cdef object pyerr(error err):
return pyerror.from_error(err)
from golang cimport topyexc
cdef nogil:
pair[WatchLink, error] wcfs_openwatch_pyexc(WCFS *wcfs) except +topyexc:
return wcfs._openwatch()
pair[Conn, error] wcfs_connect_pyexc(WCFS *wcfs, Tid at) except +topyexc:
return wcfs.connect(at)
error wconn_close_pyexc(Conn wconn) except +topyexc:
return wconn.close()
pair[FileH, error] wconn_open_pyexc(Conn wconn, Oid foid) except +topyexc:
return wconn.open(foid)
error wconn_resync_pyexc(Conn wconn, Tid at) except +topyexc:
return wconn.resync(at)
error wfileh_close_pyexc(FileH wfileh) except +topyexc:
return wfileh.close()
pair[Mapping, error] wfileh_mmap_pyexc(FileH wfileh, int64_t blk_start, int64_t blk_len) except +topyexc:
return wfileh.mmap(blk_start, blk_len)
error wmmap_unmap_pyexc(Mapping wmmap) except +topyexc:
return wmmap.unmap()
error wlink_close_pyexc(WatchLink wlink) except +topyexc:
return wlink.close()
error wlink_closeWrite_pyexc(WatchLink wlink) except +topyexc:
return wlink.closeWrite()
pair[string, error] wlink_sendReq_pyexc(WatchLink wlink, context.Context ctx, const string &req) except +topyexc:
return wlink.sendReq(ctx, req)
error wlink_recvReq_pyexc(WatchLink wlink, context.Context ctx, PinReq *prx) except +topyexc:
return wlink.recvReq(ctx, prx)
error wlink_replyReq_pyexc(WatchLink wlink, context.Context ctx, const PinReq *req, const string& reply) except +topyexc:
return wlink.replyReq(ctx, req, reply)
error _twlinkwrite_pyexc(WatchLink wlink, const string& pkt) except +topyexc:
return _twlinkwrite(wlink, pkt)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
// Copyright (C) 2019-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
#include "wcfs_misc.h"
#include <golang/libgolang.h>
#include <golang/errors.h>
#include <golang/fmt.h>
#include <golang/io.h>
using namespace golang;
#include <inttypes.h>
#include <stdarg.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/mman.h>
#include <memory>
// golang::
namespace golang {
// os::
namespace os {
global<error> ErrClosed = errors::New("file already closed");
// TODO -> os.PathError + err=syscall.Errno
static error _pathError(const char *op, const string &path, int syserr);
int _File::fd() const { return _fd; }
string _File::name() const { return _path; }
_File::_File() {}
_File::~_File() {}
void _File::decref() {
if (__decref())
delete this;
}
tuple<File, error> open(const string &path, int flags, mode_t mode) {
int fd = ::open(path.c_str(), flags, mode);
if (fd == -1)
return make_tuple(nil, _pathError("open", path, errno));
File f = adoptref(new _File);
f->_path = path;
f->_fd = fd;
return make_tuple(f, nil);
}
error _File::close() {
_File& f = *this;
int err = ::close(f._fd);
if (err != 0)
return f._errno("close");
f._fd = -1;
return nil;
}
tuple<int, error> _File::read(void *buf, size_t count) {
_File& f = *this;
int n;
n = ::read(f._fd, buf, count);
if (n == 0)
return make_tuple(n, io::EOF_);
if (n < 0)
return make_tuple(0, f._errno("read"));
return make_tuple(n, nil);
}
tuple <int, error> _File::write(const void *buf, size_t count) {
_File& f = *this;
int n, wrote=0;
// NOTE contrary to write(2) we have to write all data as io.Writer requires.
while (count != 0) {
n = ::write(f._fd, buf, count);
if (n < 0)
return make_tuple(wrote, f._errno("write"));
wrote += n;
buf = ((const char *)buf) + n;
count -= n;
}
return make_tuple(wrote, nil);
}
error _File::stat(struct stat *st) {
_File& f = *this;
int err = fstat(f._fd, st);
if (err != 0)
return f._errno("stat");
return nil;
}
// _errno returns error corresponding to op(file) and errno.
error _File::_errno(const char *op) {
_File& f = *this;
return _pathError(op, f._path, errno);
}
// _pathError returns os.PathError-like for op/path and system error
// indicated by syserr.
static error _pathError(const char *op, const string &path, int syserr) {
char ebuf[128];
char *estr = strerror_r(syserr, ebuf, sizeof(ebuf));
return fmt::errorf("%s %s: %s", op, v(path), estr); // TODO estr -> syscall.Errno
}
} // os::
// mm::
namespace mm {
// map memory-maps f.fd[offset +size) somewhere into memory.
// prot is PROT_* from mmap(2).
// flags is MAP_* from mmap(2); MAP_FIXED must not be used.
tuple<uint8_t*, error> map(int prot, int flags, os::File f, off_t offset, size_t size) {
void *addr;
if (flags & MAP_FIXED)
panic("MAP_FIXED not allowed for map - use map_into");
addr = ::mmap(nil, size, prot, flags, f->fd(), offset);
if (addr == MAP_FAILED)
return make_tuple(nil, os::_pathError("mmap", f->name(), errno));
return make_tuple((uint8_t*)addr, nil);
}
// map_into memory-maps f.fd[offset +size) into [addr +size).
// prot is PROT_* from mmap(2).
// flags is MAP_* from mmap(2); MAP_FIXED is added automatically.
error map_into(void *addr, size_t size, int prot, int flags, os::File f, off_t offset) {
void *addr2;
addr2 = ::mmap(addr, size, prot, MAP_FIXED | flags, f->fd(), offset);
if (addr2 == MAP_FAILED)
return os::_pathError("mmap", f->name(), errno);
if (addr2 != addr)
panic("mmap(addr, MAP_FIXED): returned !addr");
return nil;
}
// unmap unmaps [addr +size) memory previously mapped with map & co.
error unmap(void *addr, size_t size) {
int err = ::munmap(addr, size);
if (err != 0)
return os::_pathError("munmap", "<memory>", errno);
return nil;
}
} // mm::
// io::ioutil::
namespace io {
namespace ioutil {
tuple<string, error> ReadFile(const string& path) {
// errctx is ok as returned by all calls.
os::File f;
error err;
tie(f, err) = os::open(path);
if (err != nil)
return make_tuple("", err);
string data;
vector<char> buf(4096);
while (1) {
int n;
tie(n, err) = f->read(&buf[0], buf.size());
data.append(&buf[0], n);
if (err != nil) {
if (err == io::EOF_)
err = nil;
break;
}
}
error err2 = f->close();
if (err == nil)
err = err2;
if (err != nil)
data = "";
return make_tuple(data, err);
}
}} // io::ioutil::
// xstrconv:: (strconv-like)
namespace xstrconv {
// parseHex64 decodes 16-character-wide hex-encoded string into uint64.
tuple<uint64_t, error> parseHex64(const string& s) {
if (s.size() != 16)
return make_tuple(0, fmt::errorf("hex64 %s invalid", v(s)));
uint64_t v;
int n = sscanf(s.c_str(), "%16" SCNx64, &v);
if (n != 1)
return make_tuple(0, fmt::errorf("hex64 %s invalid", v(s)));
return make_tuple(v, nil);
}
// parseInt decodes string s as signed decimal integer.
tuple<int64_t, error> parseInt(const string& s) {
int64_t v;
int n = sscanf(s.c_str(), "%" SCNi64, &v);
if (!(n == 1 && std::to_string(v) == s))
return make_tuple(0, fmt::errorf("int %s invalid", v(s)));
return make_tuple(v, nil);
}
// parseUint decodes string s as unsigned decimal integer.
tuple<uint64_t, error> parseUint(const string& s) {
uint64_t v;
int n = sscanf(s.c_str(), "%" SCNu64, &v);
if (!(n == 1 && std::to_string(v) == s))
return make_tuple(0, fmt::errorf("uint %s invalid", v(s)));
return make_tuple(v, nil);
}
} // xstrconv::
} // golang::
// xerr::
namespace xerr {
// XXX don't require fmt::vsprintf
#if 0
Contextf::Contextf(const char *format, ...) {
Contextf& c = *this;
va_list argp;
va_start(argp, format);
c.errctx = fmt::sprintfv(format, argp);
va_end(argp);
}
#endif
error Contextf::operator() (error err) const {
const Contextf& c = *this;
if (err == nil)
return nil;
return fmt::errorf("%s: %w", v(c.errctx), err);
}
} // xerr::
#include <golang/time.h>
#include <time.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/syscall.h>
// golang::log::
namespace golang {
namespace log {
void __Logf(const char *file, int line, char level, const char *format, ...) {
double t = time::now();
time_t t_int = time_t(t);
struct tm tm_loc;
localtime_r(&t_int, &tm_loc);
char t_buf[32];
strftime(t_buf, sizeof(t_buf), "%m%d %H:%M:%S", &tm_loc);
int t_us = int((t-t_int)*1E6);
pid_t tid = syscall(SYS_gettid);
string prefix = fmt::sprintf("%c%s.%06d % 7d %s:%d] ", level, t_buf, t_us, tid, file, line);
// TODO better to emit prefix and msg in one go.
flockfile(stderr);
fprintf(stderr, "%s", v(prefix));
va_list argp;
va_start(argp, format);
vfprintf(stderr, format, argp);
va_end(argp);
fprintf(stderr, "\n");
funlockfile(stderr);
}
}} // golang::log::
// wcfs::
namespace wcfs {
template<> string v_(error err) {
return (err != nil) ? err->Error() : "nil";
}
static string h016(uint64_t v) { return fmt::sprintf("%016lx", v); }
template<> string v_(const zodb::Tid& tid) { return h016(tid); }
//template<> string v_(zodb::Oid oid) { return h016(oid); }
// XXX Tid and Oid are typedefs for uint64_t and C++ reduces template
// specializations to the underlying type. This providing specialization for
// both Tid and Oid results in "multiple definition" error.
} // wcfs::
// Copyright (C) 2019-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// wcfs_misc.{h,cpp} provide miscellaneous utilities for other wcfs_* files.
#ifndef _NXD_WCFS_MISC_H_
#define _NXD_WCFS_MISC_H_
// XXX hack: C++ does not have __builtin_types_compatible_p, but CCAN configure
// thinks it does because CCAN is configured via C, not C++.
#include <config.h>
#undef HAVE_BUILTIN_TYPES_COMPATIBLE_P
#define HAVE_BUILTIN_TYPES_COMPATIBLE_P 0
#include <ccan/array_size/array_size.h>
#include <stddef.h>
#include <stdint.h>
#include <golang/libgolang.h>
using namespace golang;
#include <string>
using std::string;
#include <utility>
using std::pair;
using std::make_pair;
#include <tuple>
using std::tuple;
using std::make_tuple;
using std::tie;
#include <vector>
using std::vector;
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
// golang::
namespace golang {
// os::
namespace os {
extern global<error> ErrClosed;
// os::File mimics os.File from Go.
// its operations return error with full file context.
typedef refptr<class _File> File;
class _File : public object {
int _fd;
string _path;
// don't new - create via open
private:
_File();
~_File();
friend tuple<File, error> open(const string &path, int flags, mode_t mode);
public:
void decref();
public:
int fd() const;
string name() const;
error close();
// read implements io.Reader from Go: it reads into buf up-to count bytes.
// XXX buf -> slice<byte> ?
tuple<int, error> read(void *buf, size_t count);
// write implements io.Writer from Go: it writes all data from buf.
//
// NOTE write behaves like io.Writer in Go - it tries to write as much
// bytes as requested, and if it could write only less - it returns error.
// XXX buf -> slice<byte> ?
tuple<int, error> write(const void *buf, size_t count);
error stat(struct stat *st);
private:
error _errno(const char *op);
};
// open opens file @path.
tuple<File, error> open(const string &path, int flags = O_RDONLY,
mode_t mode = S_IRUSR | S_IWUSR | S_IXUSR |
S_IRGRP | S_IWGRP | S_IXGRP |
S_IROTH | S_IWOTH | S_IXOTH);
} // os::
// mm::
namespace mm {
tuple<uint8_t*, error> map(int prot, int flags, os::File f, off_t offset, size_t size);
error map_into(void *addr, size_t size, int prot, int flags, os::File f, off_t offset);
error unmap(void *addr, size_t size);
} // mm::
// io::ioutil::
namespace io {
namespace ioutil {
tuple<string, error> ReadFile(const string& path);
}} // io::ioutil::
// ---- misc ----
// xstrconv::
namespace xstrconv {
tuple<uint64_t, error> parseHex64(const string& s);
tuple<int64_t, error> parseInt(const string& s);
tuple<uint64_t, error> parseUint(const string& s);
} // xstrconv::
// log::
namespace log {
#define Debugf(format, ...) __Logf(__FILE__, __LINE__, 'D', format, ##__VA_ARGS__)
#define Infof(format, ...) __Logf(__FILE__, __LINE__, 'I', format, ##__VA_ARGS__)
#define Warnf(format, ...) __Logf(__FILE__, __LINE__, 'W', format, ##__VA_ARGS__)
#define Errorf(format, ...) __Logf(__FILE__, __LINE__, 'E', format, ##__VA_ARGS__)
#define Fatalf(format, ...) __Logf(__FILE__, __LINE__, 'F', format, ##__VA_ARGS__)
void __Logf(const char *file, int line, char level, const char *format, ...);
} // log::
} // golang::
// zodb::
namespace zodb {
typedef uint64_t Tid;
typedef uint64_t Oid;
} // zodb::
#include <golang/fmt.h>
// xerr::
namespace xerr {
// xerr::Contextf mimics xerr.Contextf from Go.
//
// Usage is a bit different(*) compared to Go:
//
// func doSomething(arg) {
// xerr.Contextf E("doing something %s", v(arg));
// ...
// return E(err);
// }
//
// (*) because C++ does not allow to modify returned value on the fly.
class Contextf {
string errctx;
public:
template<typename ...Argv>
inline Contextf(const char *format, Argv... argv) {
// XXX string() to avoid "error: format not a string literal" given by -Werror=format-security
errctx = fmt::sprintf(string(format), argv...);
}
error operator() (error) const;
};
} // xerr::
// wcfs::
namespace wcfs {
// TidHead is invalid Tid which is largest Tid value and means @head.
const zodb::Tid TidHead = -1ULL;
// v mimics %v for T to be used in printf & friends.
//
// NOTE returned char* pointer is guaranteed to stay valid only till end of
// current expression. For example
//
// printf("hello %s", v(obj))
//
// is valid, while
//
// x = v(obj);
// use(x);
//
// is not valid.
#define v(obj) (wcfs::v_(obj).c_str())
template<typename T> string v_(T* obj) { return obj->String(); }
template<typename T> string v_(const T* obj) { return obj->String(); }
template<typename T> string v_(const T& obj) { return obj.String(); }
template<typename T> string v_(refptr<T> obj) { return obj->String(); }
template<> inline string v_(const string& s) { return s; }
template<> string v_(error);
template<> string v_(const zodb::Tid&);
template<> string v_(const zodb::Oid&);
} // wcfs::
#endif
This diff is collapsed.
// Copyright (C) 2018-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// wcfs_watchlink provides WatchLink class that implements message exchange
// over /head/watch on wcfs.
#ifndef _NXD_WCFS_WATCHLINK_H_
#define _NXD_WCFS_WATCHLINK_H_
#include <golang/libgolang.h>
#include <golang/context.h>
#include <golang/cxx.h>
#include <golang/sync.h>
using namespace golang;
using cxx::dict;
using cxx::set;
#include "wcfs.h"
#include "wcfs_misc.h"
// wcfs::
namespace wcfs {
struct PinReq;
// StreamID stands for ID of a stream multiplexed over WatchLink.
typedef uint64_t StreamID;
// rxPkt internally represents data of one message received over WatchLink.
struct rxPkt {
// stream over which the data was received
StreamID stream;
// raw data received/to-be-sent.
// XXX not e.g. string, as chan<T> currently does not support types with
// non-trivial copy. Note: we anyway need to limit rx line length to
// avoid DoS, but just for DoS the limit would be higher.
uint16_t datalen;
char data[256 - sizeof(StreamID) - sizeof(uint16_t)];
error from_string(const string& rx);
string to_string() const;
};
static_assert(sizeof(rxPkt) == 256, "rxPkt miscompiled"); // NOTE 128 is too low for long error message
// WatchLink represents /head/watch link opened on wcfs.
//
// It is created by WCFS._openwatch().
//
// .sendReq()/.recvReq() provides raw IO in terms of wcfs isolation protocol messages.
// .close() closes the link.
//
// It is safe to use WatchLink from multiple threads simultaneously.
typedef refptr<class _WatchLink> WatchLink;
class _WatchLink : public object {
WCFS *_wc;
os::File _f; // head/watch file handle
string _rxbuf; // buffer for data already read from _f
// iso.protocol message IO
chan<rxPkt> _acceptq; // server originated messages go here
sync::Mutex _rxmu;
bool _down; // y when the link is no-longer operational
bool _rxeof; // y if EOF was received from server
dict<StreamID, chan<rxPkt>>
_rxtab; // {} stream -> rxq server replies go via here
set<StreamID> _accepted; // streams we accepted but did not replied yet
StreamID _req_next; // stream ID for next client-originated request TODO -> atomic
sync::Mutex _txmu; // serializes writes
sync::Once _txclose1;
sync::WorkGroup _serveWG; // _serveRX is running under _serveWG
func<void()> _serveCancel;
// XXX for tests
public:
vector<string> fatalv; // ad-hoc, racy. TODO rework to send messages to control channel
chan<structZ> rx_eof; // becomes ready when wcfs closes its tx side
// don't new - create only via WCFS._openwatch()
private:
_WatchLink();
~_WatchLink();
friend pair<WatchLink, error> WCFS::_openwatch();
public:
void decref();
public:
error close();
error closeWrite();
pair<string, error> sendReq(context::Context ctx, const string &req);
error recvReq(context::Context ctx, PinReq *rx_into);
error replyReq(context::Context ctx, const PinReq *req, const string& reply);
string String() const;
int fd() const;
private:
error _serveRX(context::Context ctx);
tuple<string, error> _readline();
error _send(StreamID stream, const string &msg);
error _write(const string &pkt);
StreamID _nextReqID();
tuple<chan<rxPkt>, error> _sendReq(context::Context ctx, StreamID stream, const string &req);
friend error _twlinkwrite(WatchLink wlink, const string &pkt);
};
// PinReq represents 1 server-initiated wcfs pin request received over /head/watch link.
struct PinReq {
StreamID stream; // request was received with this stream ID
zodb::Oid foid; // request is about this file
int64_t blk; // ----//---- about this block
zodb::Tid at; // pin to this at; TidHead means unpin to head
string msg; // XXX raw message for tests (TODO kill)
};
// for testing
error _twlinkwrite(WatchLink wlink, const string &pkt);
} // wcfs::
#endif
#!/usr/bin/env -S bpftrace
// Copyright (C) 2019-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// fusetrace - trace FUSE protocol exchange that goes over /dev/fuse.
#include <linux/fuse.h>
#include <linux/uio.h>
BEGIN {
@tstart = nsecs;
// XXX vvv commented to save space - else BPF for BEGIN might be rejected as too large
//@nread = {}; // {} tid -> nread
//@nwrite = {}; // {} tid -> nwrite
//@rpending[0] = ustack;
//@wpending[0] = ustack;
//clear(@rpending); // {} (tid,#nr) -> ustack
//clear(@wpending); // {} (tid,#nw) -> ustack
@opcode [FUSE_LOOKUP] = "LOOKUP";
@opcode [FUSE_FORGET] = "FORGET";
@opcode [FUSE_GETATTR] = "GETATTR";
@opcode [FUSE_SETATTR] = "SETATTR";
@opcode [FUSE_READLINK] = "READLINK";
@opcode [FUSE_SYMLINK] = "SYMLINK";
@opcode [FUSE_MKNOD] = "MKNOD";
@opcode [FUSE_MKDIR] = "MKDIR";
@opcode [FUSE_UNLINK] = "UNLINK";
@opcode [FUSE_RMDIR] = "RMDIR";
@opcode [FUSE_RENAME] = "RENAME";
@opcode [FUSE_LINK] = "LINK";
@opcode [FUSE_OPEN] = "OPEN";
@opcode [FUSE_READ] = "READ";
@opcode [FUSE_WRITE] = "WRITE";
@opcode [FUSE_STATFS] = "STATFS";
@opcode [FUSE_RELEASE] = "RELEASE";
@opcode [FUSE_FSYNC] = "FSYNC";
@opcode [FUSE_SETXATTR] = "SETXATTR";
@opcode [FUSE_GETXATTR] = "GETXATTR";
@opcode [FUSE_LISTXATTR] = "LISTXATTR";
@opcode [FUSE_REMOVEXATTR] = "REMOVEXATTR";
@opcode [FUSE_FLUSH] = "FLUSH";
@opcode [FUSE_INIT] = "INIT";
@opcode [FUSE_OPENDIR] = "OPENDIR";
@opcode [FUSE_READDIR] = "READDIR";
@opcode [FUSE_RELEASEDIR] = "RELEASEDIR";
@opcode [FUSE_FSYNCDIR] = "FSYNCDIR";
@opcode [FUSE_GETLK] = "GETLK";
@opcode [FUSE_SETLK] = "SETLK";
@opcode [FUSE_SETLKW] = "SETLKW";
@opcode [FUSE_ACCESS] = "ACCESS";
@opcode [FUSE_CREATE] = "CREATE";
@opcode [FUSE_INTERRUPT] = "INTERRUPT";
@opcode [FUSE_BMAP] = "BMAP";
@opcode [FUSE_DESTROY] = "DESTROY";
@opcode [FUSE_IOCTL] = "IOCTL";
@opcode [FUSE_POLL] = "POLL";
@opcode [FUSE_NOTIFY_REPLY] = "NOTIFY_REPLY";
@opcode [FUSE_BATCH_FORGET] = "BATCH_FORGET";
@opcode [FUSE_FALLOCATE] = "FALLOCATE";
@opcode [FUSE_READDIRPLUS] = "READDIRPLUS";
@opcode [FUSE_RENAME2] = "RENAME2";
@opcode [FUSE_LSEEK] = "LSEEK";
// XXX >= Linux x
//@opcode[FUSE_COPY_FILE_RANGE] = "COPY_FILE_RANGE";
@notify [FUSE_NOTIFY_POLL] = "NOTIFY_POLL";
@notify [FUSE_NOTIFY_INVAL_INODE] = "NOTIFY_INVAL_INODE";
@notify [FUSE_NOTIFY_INVAL_ENTRY] = "NOTIFY_INVAL_ENTRY";
@notify [FUSE_NOTIFY_STORE] = "NOTIFY_STORE";
@notify [FUSE_NOTIFY_RETRIEVE] = "NOTIFY_RETRIEVE";
@notify [FUSE_NOTIFY_DELETE] = "NOTIFY_DELETE";
}
END {
// don't print:
clear(@nread);
clear(@nwrite);
clear(@opcode);
clear(@notify);
clear(@rbuf);
// print @{r,w}pending
printf("\npending read/write:\n");
}
// reader enqueues
kprobe::fuse_dev_read {
$tr = (nsecs - @tstart) / 1000; // μs
$nr = @nread[tid];
@rpending[tid,$nr] = ustack;
// fuse_dev_do_read advances `to` - fetch/remember buffer pointer before.
$to = (struct iov_iter *)arg1;
$buf = $to->iov->iov_base;
@rbuf[tid] = $buf;
printf("P%d %d.%d /dev/fuse <- qread %s/%d_%d_r:\n", cpu, $tr/1000000, $tr%1000000, comm, tid, $nr);
printf("%s\n", ustack);
}
// read ready
kretprobe::fuse_dev_read {
$trr = (nsecs - @tstart) / 1000; // μs
$rr = @nread[tid];
@nread[tid] = $rr + 1;
delete(@rpending[tid,$rr]);
$h = (struct fuse_in_header *)@rbuf[tid];
delete(@rbuf[tid]);
$op = @opcode[$h->opcode];
printf("P%d %d.%d /dev/fuse -> read %s/%d_%d_r:\n", cpu, $trr/1000000, $trr%1000000, comm, tid, $rr);
// XXX ret=... ^^^
printf("\t.%d %s i%d ...\t\t(ret=%d)\n\n", $h->unique, $op, $h->nodeid, retval);
}
// write request
kprobe::fuse_dev_write {
$tw = (nsecs - @tstart) / 1000; // μs
$nw = @nwrite[tid];
@wpending[tid,$nw] = ustack;
$from = (struct iov_iter *)arg1;
$wbuf = $from->iov->iov_base;
$wh = (struct fuse_out_header *)$wbuf;
printf("P%d %d.%d /dev/fuse <- write %s/%d_%d_w:\n", cpu, $tw/1000000, $tw%1000000, comm, tid, $nw);
$u = $wh->unique;
$e = $wh->error;
if ($u == 0) {
$nop = @notify[$e];
//if ($nop != "") { // XXX https://github.com/iovisor/bpftrace/issues/402#issuecomment-461752005
printf("\t%s ...\n", $nop);
//} else {
// printf("\t?notify(%d) ...\n", $e);
//}
} else {
printf("\t.%d (%d) ...\n", $u, $e);
}
printf("%s\n", ustack);
}
// write ack
kretprobe::fuse_dev_write {
$tww = (nsecs - @tstart) / 1000; // μs
$ww = @nwrite[tid];
@nwrite[tid] = $ww + 1;
delete(@wpending[tid,$ww]);
printf("P%d %d.%d /dev/fuse -> write_ack %s/%d_%d_w", cpu, $tww/1000000, $tww%1000000, comm, tid, $ww);
printf(" (ret=%d)\n\n", retval);
}
// XXX splice stubs (so that splice IO is not missed)
kprobe::fuse_dev_splice_read {
printf("/dev/fuse -> splice read\n")
}
kprobe::fuse_dev_splice_write {
printf("/dev/fuse <- splice write:%s\n", kstack)
}
...@@ -563,25 +563,6 @@ func parseWatch(msg string) (oid zodb.Oid, at zodb.Tid, err error) { ...@@ -563,25 +563,6 @@ func parseWatch(msg string) (oid zodb.Oid, at zodb.Tid, err error) {
// ---- make df happy (else it complains "function not supported") ---- // ---- make df happy (else it complains "function not supported") ----
func (root *Root) StatFs() *fuse.StatfsOut {
return &fuse.StatfsOut{
// filesystem sizes (don't try to estimate)
Blocks: 0,
Bfree: 0,
Bavail: 0,
// do we need to count files?
Files: 0,
Ffree: 0,
// block size
Bsize: 2*1024*1024, // "optimal transfer block size" XXX better get from root?
Frsize: 2*1024*1024, // "fragment size"
NameLen: 255, // XXX ok? /proc uses the same
}
}
// ---- misc ---- // ---- misc ----
func panicf(format string, argv ...interface{}) { func panicf(format string, argv ...interface{}) {
......
This diff is collapsed.
...@@ -68,7 +68,7 @@ type zBlk interface { ...@@ -68,7 +68,7 @@ type zBlk interface {
loadBlkData(ctx context.Context) (data []byte, rev zodb.Tid, _ error) loadBlkData(ctx context.Context) (data []byte, rev zodb.Tid, _ error)
// inΔFtail returns pointer to struct zblkInΔFtail embedded into this ZBlk. // inΔFtail returns pointer to struct zblkInΔFtail embedded into this ZBlk.
inΔFtail() *zblkInΔFtail // inΔFtail() *zblkInΔFtail
// XXX kill - in favour of inΔFtail // XXX kill - in favour of inΔFtail
/* /*
...@@ -140,7 +140,7 @@ func (zb *zBlkBase) blkBoundTo() map[*BigFile]SetI64 { ...@@ -140,7 +140,7 @@ func (zb *zBlkBase) blkBoundTo() map[*BigFile]SetI64 {
// ZBlk0 mimics ZBlk0 from python. // ZBlk0 mimics ZBlk0 from python.
type ZBlk0 struct { type ZBlk0 struct {
zblkInΔFtail // zblkInΔFtail
zodb.Persistent zodb.Persistent
// NOTE py source uses bytes(buf) but on python2 it still results in str // NOTE py source uses bytes(buf) but on python2 it still results in str
...@@ -211,7 +211,7 @@ func (zd *zDataState) PySetState(pystate interface{}) error { ...@@ -211,7 +211,7 @@ func (zd *zDataState) PySetState(pystate interface{}) error {
// ZBlk1 mimics ZBlk1 from python. // ZBlk1 mimics ZBlk1 from python.
type ZBlk1 struct { type ZBlk1 struct {
zblkInΔFtail // zblkInΔFtail
zodb.Persistent zodb.Persistent
chunktab *btree.IOBTree // {} offset -> ZData(chunk) chunktab *btree.IOBTree // {} offset -> ZData(chunk)
......
// Code generated by gen-set BigFile *BigFile; DO NOT EDIT.
// Copyright (C) 2015-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package main
// SetBigFile is a set of *BigFile.
type SetBigFile map[*BigFile]struct{}
// Add adds v to the set.
func (s SetBigFile) Add(v *BigFile) {
s[v] = struct{}{}
}
// Del removes v from the set.
// it is noop if v was not in the set.
func (s SetBigFile) Del(v *BigFile) {
delete(s, v)
}
// Has checks whether the set contains v.
func (s SetBigFile) Has(v *BigFile) bool {
_, ok := s[v]
return ok
}
// Update adds t values to s.
func (s SetBigFile) Update(t SetBigFile) {
for v := range t {
s.Add(v)
}
}
// Elements returns all elements of set as slice.
func (s SetBigFile) Elements() []*BigFile {
ev := make([]*BigFile, len(s))
i := 0
for e := range s {
ev[i] = e
i++
}
return ev
}
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment