Commit a36cdcc3 authored by Kirill Smelkov's avatar Kirill Smelkov

wcfs: client: Switch to File IO provided by Pygolang

Starting from version 0.1 pygolang provides File out of the box:

nexedi/pygolang@4690460b
https://pypi.org/project/pygolang/#pygolang-change-history

-> Use it and remove our custom File implementation that originally
served as POC for that pygolang functionality.
parent 11e023cf
# Wendelin.core | pythonic package setup # Wendelin.core | pythonic package setup
# Copyright (C) 2014-2021 Nexedi SA and Contributors. # Copyright (C) 2014-2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
...@@ -373,7 +373,7 @@ setup( ...@@ -373,7 +373,7 @@ setup(
'ZODB >= 4', # for ZBigFile / ZBigArray 'ZODB >= 4', # for ZBigFile / ZBigArray
'zodbtools >= 0.0.0.dev8', # lib.zodb.dbstoropen + ... 'zodbtools >= 0.0.0.dev8', # lib.zodb.dbstoropen + ...
'pygolang >= 0.0.8', # defer, sync.WaitGroup, pyx/nogil channels ... 'pygolang >= 0.1', # defer, sync.WaitGroup, pyx/nogil channels ...
'six', # compat py2/py3 'six', # compat py2/py3
......
...@@ -229,8 +229,6 @@ using std::min; ...@@ -229,8 +229,6 @@ using std::min;
using std::max; using std::max;
using std::vector; using std::vector;
namespace ioutil = xgolang::io::ioutil;
#define TRACE 0 #define TRACE 0
#if TRACE #if TRACE
...@@ -277,7 +275,7 @@ error WCFS::_headWait(zodb::Tid at) { ...@@ -277,7 +275,7 @@ error WCFS::_headWait(zodb::Tid at) {
// XXX dumb implementation, because _headWait should go away. // XXX dumb implementation, because _headWait should go away.
while (1) { while (1) {
tie(xatStr, err) = ioutil::ReadFile(wc._path("head/at")); tie(xatStr, err) = os::ReadFile(wc._path("head/at"));
if (err != nil) if (err != nil)
return E(err); return E(err);
...@@ -321,8 +319,8 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) { ...@@ -321,8 +319,8 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) {
wconn->_at = at; wconn->_at = at;
wconn->_wlink = wlink; wconn->_wlink = wlink;
os::RegisterAfterFork(newref( xos::RegisterAfterFork(newref(
static_cast<os::_IAfterFork*>( wconn._ptr() ) static_cast<xos::_IAfterFork*>( wconn._ptr() )
)); ));
context::Context pinCtx; context::Context pinCtx;
...@@ -434,8 +432,8 @@ error _Conn::close() { ...@@ -434,8 +432,8 @@ error _Conn::close() {
if (!errors::Is(err, context::canceled)) // canceled - ok if (!errors::Is(err, context::canceled)) // canceled - ok
reterr1(err); reterr1(err);
os::UnregisterAfterFork(newref( xos::UnregisterAfterFork(newref(
static_cast<os::_IAfterFork*>( &wconn ) static_cast<xos::_IAfterFork*>( &wconn )
)); ));
return E(eret); return E(eret);
......
...@@ -118,7 +118,8 @@ struct VMA; ...@@ -118,7 +118,8 @@ struct VMA;
namespace wcfs { namespace wcfs {
using namespace golang; using namespace golang;
namespace os = xgolang::xos; namespace os = golang::os;
namespace xos = xgolang::xos;
namespace mm = xgolang::xmm; namespace mm = xgolang::xmm;
namespace xstrconv = xgolang::xstrconv; namespace xstrconv = xgolang::xstrconv;
namespace log = xgolang::xlog; namespace log = xgolang::xlog;
...@@ -173,7 +174,7 @@ struct WCFS { ...@@ -173,7 +174,7 @@ struct WCFS {
// Conn logically mirrors ZODB.Connection . // Conn logically mirrors ZODB.Connection .
// It is safe to use Conn from multiple threads simultaneously. // It is safe to use Conn from multiple threads simultaneously.
typedef refptr<struct _Conn> Conn; typedef refptr<struct _Conn> Conn;
struct _Conn : os::_IAfterFork, object { struct _Conn : xos::_IAfterFork, object {
WCFS *_wc; WCFS *_wc;
WatchLink _wlink; // watch/receive pins for mappings created under this conn WatchLink _wlink; // watch/receive pins for mappings created under this conn
......
...@@ -44,92 +44,10 @@ namespace xos { ...@@ -44,92 +44,10 @@ namespace xos {
namespace io = golang::io; namespace io = golang::io;
global<error> ErrClosed = errors::New("file already closed");
// TODO -> os.PathError + err=syscall.Errno // TODO -> os.PathError + err=syscall.Errno
static error _pathError(const char *op, const string &path, int syserr); static error _pathError(const char *op, const string &path, int syserr);
static string _sysErrString(int syserr); static string _sysErrString(int syserr);
int _File::_sysfd() const { return _fd; }
string _File::Name() const { return _path; }
_File::_File() {}
_File::~_File() {}
void _File::decref() {
if (__decref())
delete this;
}
tuple<File, error> Open(const string &path, int flags, mode_t mode) {
int fd = ::open(path.c_str(), flags, mode);
if (fd == -1)
return make_tuple(nil, _pathError("open", path, errno));
File f = adoptref(new _File);
f->_path = path;
f->_fd = fd;
return make_tuple(f, nil);
}
error _File::Close() {
_File& f = *this;
int err = ::close(f._fd);
if (err != 0)
return f._errno("close");
f._fd = -1;
return nil;
}
tuple<int, error> _File::Read(void *buf, size_t count) {
_File& f = *this;
int n;
n = ::read(f._fd, buf, count);
if (n == 0)
return make_tuple(n, io::EOF_);
if (n < 0)
return make_tuple(0, f._errno("read"));
return make_tuple(n, nil);
}
tuple <int, error> _File::Write(const void *buf, size_t count) {
_File& f = *this;
int n, wrote=0;
// NOTE contrary to write(2) we have to write all data as io.Writer requires.
while (count != 0) {
n = ::write(f._fd, buf, count);
if (n < 0)
return make_tuple(wrote, f._errno("write"));
wrote += n;
buf = ((const char *)buf) + n;
count -= n;
}
return make_tuple(wrote, nil);
}
error _File::Stat(struct stat *st) {
_File& f = *this;
int err = fstat(f._fd, st);
if (err != 0)
return f._errno("stat");
return nil;
}
// _errno returns error corresponding to op(file) and errno.
error _File::_errno(const char *op) {
_File& f = *this;
return _pathError(op, f._path, errno);
}
// _pathError returns os.PathError-like for op/path and system error // _pathError returns os.PathError-like for op/path and system error
// indicated by syserr. // indicated by syserr.
static error _pathError(const char *op, const string &path, int syserr) { static error _pathError(const char *op, const string &path, int syserr) {
...@@ -207,7 +125,7 @@ namespace xmm { ...@@ -207,7 +125,7 @@ namespace xmm {
// map memory-maps f.fd[offset +size) somewhere into memory. // map memory-maps f.fd[offset +size) somewhere into memory.
// prot is PROT_* from mmap(2). // prot is PROT_* from mmap(2).
// flags is MAP_* from mmap(2); MAP_FIXED must not be used. // flags is MAP_* from mmap(2); MAP_FIXED must not be used.
tuple<uint8_t*, error> map(int prot, int flags, xos::File f, off_t offset, size_t size) { tuple<uint8_t*, error> map(int prot, int flags, os::File f, off_t offset, size_t size) {
void *addr; void *addr;
if (flags & MAP_FIXED) if (flags & MAP_FIXED)
...@@ -223,7 +141,7 @@ tuple<uint8_t*, error> map(int prot, int flags, xos::File f, off_t offset, size_ ...@@ -223,7 +141,7 @@ tuple<uint8_t*, error> map(int prot, int flags, xos::File f, off_t offset, size_
// map_into memory-maps f.fd[offset +size) into [addr +size). // map_into memory-maps f.fd[offset +size) into [addr +size).
// prot is PROT_* from mmap(2). // prot is PROT_* from mmap(2).
// flags is MAP_* from mmap(2); MAP_FIXED is added automatically. // flags is MAP_* from mmap(2); MAP_FIXED is added automatically.
error map_into(void *addr, size_t size, int prot, int flags, xos::File f, off_t offset) { error map_into(void *addr, size_t size, int prot, int flags, os::File f, off_t offset) {
void *addr2; void *addr2;
addr2 = ::mmap(addr, size, prot, MAP_FIXED | flags, f->_sysfd(), offset); addr2 = ::mmap(addr, size, prot, MAP_FIXED | flags, f->_sysfd(), offset);
...@@ -245,44 +163,6 @@ error unmap(void *addr, size_t size) { ...@@ -245,44 +163,6 @@ error unmap(void *addr, size_t size) {
} // xmm:: } // xmm::
// io::ioutil::
namespace io {
namespace ioutil {
tuple<string, error> ReadFile(const string& path) {
// errctx is ok as returned by all calls.
xos::File f;
error err;
tie(f, err) = xos::Open(path);
if (err != nil)
return make_tuple("", err);
string data;
vector<char> buf(4096);
while (1) {
int n;
tie(n, err) = f->Read(&buf[0], buf.size());
data.append(&buf[0], n);
if (err != nil) {
if (err == golang::io::EOF_)
err = nil;
break;
}
}
error err2 = f->Close();
if (err == nil)
err = err2;
if (err != nil)
data = "";
return make_tuple(data, err);
}
}} // io::ioutil::
// xstrconv:: (strconv-like) // xstrconv:: (strconv-like)
namespace xstrconv { namespace xstrconv {
......
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
#include <stdint.h> #include <stdint.h>
#include <golang/libgolang.h> #include <golang/libgolang.h>
#include <golang/os.h>
using namespace golang; using namespace golang;
#include <string> #include <string>
...@@ -51,63 +52,12 @@ using std::tie; ...@@ -51,63 +52,12 @@ using std::tie;
using std::vector; using std::vector;
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
// xgolang:: // xgolang::
namespace xgolang { namespace xgolang {
// xos:: // xos::
namespace xos { namespace xos {
extern global<error> ErrClosed;
// os::File mimics os.File from Go.
// its operations return error with full file context.
typedef refptr<class _File> File;
class _File : public object {
int _fd;
string _path;
// don't new - create via open
private:
_File();
~_File();
friend tuple<File, error> Open(const string &path, int flags, mode_t mode);
public:
void decref();
public:
int _sysfd() const;
string Name() const;
error Close();
// read implements io.Reader from Go: it reads into buf up-to count bytes.
// XXX buf -> slice<byte> ?
tuple<int, error> Read(void *buf, size_t count);
// write implements io.Writer from Go: it writes all data from buf.
//
// NOTE write behaves like io.Writer in Go - it tries to write as much
// bytes as requested, and if it could write only less - it returns error.
// XXX buf -> slice<byte> ?
tuple<int, error> Write(const void *buf, size_t count);
error Stat(struct stat *st);
private:
error _errno(const char *op);
};
// Open opens file @path.
tuple<File, error> Open(const string &path, int flags = O_RDONLY,
mode_t mode = S_IRUSR | S_IWUSR | S_IXUSR |
S_IRGRP | S_IWGRP | S_IXGRP |
S_IROTH | S_IWOTH | S_IXOTH);
// afterfork // afterfork
// IAfterFork is the interface that objects must implement to be notified after fork. // IAfterFork is the interface that objects must implement to be notified after fork.
...@@ -130,21 +80,13 @@ void UnregisterAfterFork(IAfterFork obj); ...@@ -130,21 +80,13 @@ void UnregisterAfterFork(IAfterFork obj);
// xmm:: // xmm::
namespace xmm { namespace xmm {
tuple<uint8_t*, error> map(int prot, int flags, xos::File f, off_t offset, size_t size); tuple<uint8_t*, error> map(int prot, int flags, os::File f, off_t offset, size_t size);
error map_into(void *addr, size_t size, int prot, int flags, xos::File f, off_t offset); error map_into(void *addr, size_t size, int prot, int flags, os::File f, off_t offset);
error unmap(void *addr, size_t size); error unmap(void *addr, size_t size);
} // xmm:: } // xmm::
// io::ioutil::
namespace io {
namespace ioutil {
tuple<string, error> ReadFile(const string& path);
}} // io::ioutil::
// ---- misc ---- // ---- misc ----
......
...@@ -63,8 +63,8 @@ pair<WatchLink, error> WCFS::_openwatch() { ...@@ -63,8 +63,8 @@ pair<WatchLink, error> WCFS::_openwatch() {
wlink->rx_eof = makechan<structZ>(); wlink->rx_eof = makechan<structZ>();
os::RegisterAfterFork(newref( xos::RegisterAfterFork(newref(
static_cast<os::_IAfterFork*>( wlink._ptr() ) static_cast<xos::_IAfterFork*>( wlink._ptr() )
)); ));
context::Context serveCtx; context::Context serveCtx;
...@@ -100,8 +100,8 @@ error _WatchLink::close() { ...@@ -100,8 +100,8 @@ error _WatchLink::close() {
if (err == nil) if (err == nil)
err = err3; err = err3;
os::UnregisterAfterFork(newref( xos::UnregisterAfterFork(newref(
static_cast<os::_IAfterFork*>( &wlink ) static_cast<xos::_IAfterFork*>( &wlink )
)); ));
return E(err); return E(err);
......
// Copyright (C) 2018-2021 Nexedi SA and Contributors. // Copyright (C) 2018-2022 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -70,7 +70,7 @@ static_assert(sizeof(rxPkt) == 256, "rxPkt miscompiled"); // NOTE 128 is too low ...@@ -70,7 +70,7 @@ static_assert(sizeof(rxPkt) == 256, "rxPkt miscompiled"); // NOTE 128 is too low
// //
// It is safe to use WatchLink from multiple threads simultaneously. // It is safe to use WatchLink from multiple threads simultaneously.
typedef refptr<class _WatchLink> WatchLink; typedef refptr<class _WatchLink> WatchLink;
class _WatchLink : public os::_IAfterFork, object { class _WatchLink : public xos::_IAfterFork, object {
WCFS *_wc; WCFS *_wc;
os::File _f; // head/watch file handle os::File _f; // head/watch file handle
string _rxbuf; // buffer for data already read from _f string _rxbuf; // buffer for data already read from _f
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment