Commit 4690460b authored by Kirill Smelkov's avatar Kirill Smelkov

Nogil IO

Provide C++ package "os" with File, Pipe, etc similarly to what is
provided on Go side. The package works through IO methods provided by
runtimes.

We need IO facility because os/signal package will need to use
pipe in cooperative IO mode in its receiving-loop goroutine.

os.h and os.cpp are based on drafts from wendelin.core:

https://lab.nexedi.com/nexedi/wendelin.core/blob/wendelin.core-2.0.alpha1-18-g38dde766/wcfs/client/wcfs_misc.h
https://lab.nexedi.com/nexedi/wendelin.core/blob/wendelin.core-2.0.alpha1-18-g38dde766/wcfs/client/wcfs_misc.cpp

/reviewed-on nexedi/pygolang!17
parent 07cae4e9
......@@ -21,6 +21,8 @@ include golang/fmt.cpp
include golang/fmt_test.cpp
include golang/io.h
include golang/io.cpp
include golang/os.h
include golang/os.cpp
include golang/strings.h
include golang/strings.cpp
include golang/strings_test.cpp
......
......@@ -7,6 +7,7 @@
/_golang.cpp
/_golang_test.cpp
/_io.cpp
/_os_test.cpp
/_strings_test.cpp
/_sync.cpp
/_sync_test.cpp
......
# -*- coding: utf-8 -*-
# cython: language_level=2
# distutils: language=c++
#
# Copyright (C) 2021-2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from __future__ import print_function, absolute_import
from golang cimport string, topyexc
# os_test.cpp
cdef extern from * nogil:
"""
extern void __test_os_fileio_cpp(const golang::string&);
extern void _test_os_pipe_cpp();
"""
void __test_os_fileio_cpp(string) except +topyexc
void _test_os_pipe_cpp() except +topyexc
def _test_os_fileio_cpp(tmp_path):
cdef string _tmpd = tmp_path
with nogil:
__test_os_fileio_cpp(_tmpd)
def test_os_pipe_cpp():
with nogil:
_test_os_pipe_cpp()
......@@ -173,6 +173,9 @@
#include <stddef.h>
#include <stdint.h>
#include <sys/types.h>
#include <sys/stat.h>
// DSO symbols visibility (based on https://gcc.gnu.org/wiki/Visibility)
#if defined _WIN32 || defined __CYGWIN__
#define LIBGOLANG_DSO_EXPORT __declspec(dllexport)
......@@ -312,6 +315,7 @@ extern LIBGOLANG_API const _selcase _default;
// libgolang runtime - the runtime must be initialized before any other libgolang use.
typedef struct _libgolang_sema _libgolang_sema;
typedef struct _libgolang_ioh _libgolang_ioh;
typedef enum _libgolang_runtime_flags {
// STACK_DEAD_WHILE_PARKED indicates that it is not safe to access
// goroutine's stack memory while the goroutine is parked.
......@@ -347,6 +351,43 @@ typedef struct _libgolang_runtime_ops {
// nanotime should return current time since EPOCH in nanoseconds.
uint64_t (*nanotime)(void);
// ---- IO -----
// NOTE syserr below is an error code always < 0, for example -ENOENT.
// io_open should open file @path similarly to open(2), but the error is
// returned in out_syserr, _not_ in errno.
_libgolang_ioh* (*io_open) (int* out_syserr, const char *path, int flags, mode_t mode);
// io_fdopen should wrap OS-level file descriptor into libgolang IO handle.
// the ownership of sysfd is transferred to returned ioh.
_libgolang_ioh* (*io_fdopen)(int* out_syserr, int sysfd);
// io_close should close underlying file and release file resources
// associated with ioh. it will be called exactly once and with the
// guarantee that no other ioh operation is running durion io_close call.
int/*syserr*/ (*io_close)(_libgolang_ioh* ioh);
// io_free should release ioh memory.
// it will be called exactly once after io_close.
void (*io_free) (_libgolang_ioh* ioh);
// io_sysfd should return OS-level file descriptor associated with
// libgolang IO handle. it should return -1 on error.
int/*sysfd*/ (*io_sysfd) (_libgolang_ioh* ioh);
// io_read should read up to count bytes of data from ioh.
// it should block if waiting for at least some data is needed.
// it should return read amount, 0 on EOF, or syserr on error.
int/*(n|syserr)*/ (*io_read) (_libgolang_ioh* ioh, void *buf, size_t count);
// io_write should write up to count bytes of data into ioh.
// it should block if waiting is needed to write at least some data.
// it should return wrote amount, or syserr on error.
int/*(n|syserr)*/ (*io_write)(_libgolang_ioh* ioh, const void *buf, size_t count);
// io_fstat should stat the file underlying ioh similarly to fstat(2).
int/*syserr*/ (*io_fstat)(struct stat *out_st, _libgolang_ioh* ioh);
} _libgolang_runtime_ops;
LIBGOLANG_API void _libgolang_init(const _libgolang_runtime_ops *runtime_ops);
......
// Copyright (C) 2019-2022 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package os mirrors Go package os.
// See os.h for package overview.
#include "golang/errors.h"
#include "golang/fmt.h"
#include "golang/io.h"
#include "golang/os.h"
#include "golang/time.h"
#include "golang/runtime/internal.h"
#include "golang/runtime/internal/syscall.h"
#include <unistd.h>
#include <string.h>
using golang::internal::_runtime;
namespace sys = golang::internal::syscall;
using std::tuple;
using std::make_tuple;
using std::tie;
// golang::os::
namespace golang {
namespace os {
global<error> ErrClosed = errors::New("file already closed");
// TODO -> os.PathError
static error _pathError(const char *op, const string &path, error err);
// File
string _File::Name() const { return _path; }
_File::_File() {}
_File::~_File() {}
void _File::decref() {
if (__decref()) {
_File& f = *this;
f.Close(); // ignore error
_runtime->io_free(f._ioh);
f._ioh = nil;
delete this;
}
}
File _newFile(_libgolang_ioh* ioh, const string& name) {
File f = adoptref(new _File);
f->_path = name;
f->_ioh = ioh;
f->_inflight.store(0);
f->_closed.store(false);
return f;
}
tuple<File, error> Open(const string &path, int flags, mode_t mode) {
int syserr;
_libgolang_ioh* ioh = _runtime->io_open(&syserr,
path.c_str(), flags, mode);
if (syserr != 0)
return make_tuple(nil, _pathError("open", path, sys::NewErrno(syserr)));
return make_tuple(_newFile(ioh, path), nil);
}
tuple<File, error> NewFile(int sysfd, const string& name) {
int syserr;
_libgolang_ioh* ioh = _runtime->io_fdopen(&syserr, sysfd);
if (syserr != 0)
return make_tuple(nil, _pathError("fdopen", name, sys::NewErrno(syserr)));
return make_tuple(_newFile(ioh, name), nil);
}
error _File::Close() {
_File& f = *this;
bool x = false;
if (!f._closed.compare_exchange_strong(x, true))
return f._err("close", ErrClosed);
// wait till all currently-inprogress IO is complete
//
// TODO try to interrupt those inprogress IO calls.
// It is not so easy however - for example on Linux sys_read from pipe is
// not interrupted by sys_close of that pipe. sys_read/sys_write on regular
// files are also not interrupted by sys_close. For sockets we could use
// sys_shutdown, but shutdown does not work for anything else but sockets.
//
// NOTE1 with io_uring any inflight operation can be cancelled.
// NOTE2 under gevent io_close does interrupt outstanding IO, at least for
// pollable file descriptors, with `cancel_wait_ex: [Errno 9] File
// descriptor was closed in another greenlet` exception.
//
// For now we use simplest-possible way to wait until all IO is complete.
while (1) {
if (f._inflight.load() == 0)
break;
time::sleep(1*time::microsecond);
}
int syserr = _runtime->io_close(f._ioh);
if (syserr != 0)
return f._err("close", sys::NewErrno(syserr));
return nil;
}
int _File::_sysfd() {
_File& f = *this;
f._inflight.fetch_add(+1);
defer([&]() {
f._inflight.fetch_add(-1);
});
if (f._closed.load())
return -1; // bad file descriptor
return _runtime->io_sysfd(f._ioh);
}
tuple<int, error> _File::Read(void *buf, size_t count) {
_File& f = *this;
int n;
f._inflight.fetch_add(+1);
defer([&]() {
f._inflight.fetch_add(-1);
});
if (f._closed.load())
return make_tuple(0, f._err("read", ErrClosed));
n = _runtime->io_read(f._ioh, buf, count);
if (n == 0)
return make_tuple(n, io::EOF_);
if (n < 0)
return make_tuple(0, f._err("read", sys::NewErrno(n)));
return make_tuple(n, nil);
}
tuple<int, error> _File::Write(const void *buf, size_t count) {
_File& f = *this;
int n, wrote=0;
f._inflight.fetch_add(+1);
defer([&]() {
f._inflight.fetch_add(-1);
});
if (f._closed.load())
return make_tuple(0, f._err("write", ErrClosed));
// NOTE contrary to write(2) we have to write all data as io.Writer requires.
while (count != 0) {
n = _runtime->io_write(f._ioh, buf, count);
if (n < 0)
return make_tuple(wrote, f._err("write", sys::NewErrno(n)));
wrote += n;
buf = ((const char *)buf) + n;
count -= n;
}
return make_tuple(wrote, nil);
}
error _File::Stat(struct stat *st) {
_File& f = *this;
f._inflight.fetch_add(+1);
defer([&]() {
f._inflight.fetch_add(-1);
});
if (f._closed.load())
return f._err("stat", ErrClosed);
int syserr = _runtime->io_fstat(st, f._ioh);
if (syserr != 0)
return f._err("stat", sys::NewErrno(syserr));
return nil;
}
// pipe
tuple<File, File, error> Pipe() {
int vfd[2], syserr;
syserr = sys::Pipe(vfd);
if (syserr != 0)
return make_tuple(nil, nil, fmt::errorf("pipe: %w", sys::NewErrno(syserr)));
File r, w;
error err;
tie(r, err) = NewFile(vfd[0], "|0");
if (err != nil) {
return make_tuple(nil, nil, fmt::errorf("pipe: |0: %w", err));
}
tie(w, err) = NewFile(vfd[1], "|1");
if (err != nil) {
r->Close(); // ignore err
return make_tuple(nil, nil, fmt::errorf("pipe: |1: %w", err));
}
return make_tuple(r, w, nil);
}
// _err returns error corresponding to op(file) and underlying error err.
error _File::_err(const char *op, error err) {
_File& f = *this;
return _pathError(op, f._path, err);
}
// _pathError returns os.PathError-like for op/path and underlying error err.
static error _pathError(const char *op, const string &path, error err) {
// TODO use fmt::v and once it lands in
// return fmt::errorf("%s %s: %s", op, v(path), err));
return fmt::errorf("%s %s: %w", op, path.c_str(), err);
}
}} // golang::os::
#ifndef _NXD_LIBGOLANG_OS_H
#define _NXD_LIBGOLANG_OS_H
//
// Copyright (C) 2019-2022 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package os mirrors Go package os.
//
// - `File` represents an opened file.
// - `Open` opens file @path.
// - `Pipe` creates new pipe.
// - `NewFile` wraps OS-level file-descriptor into File.
//
// See also https://golang.org/pkg/os for Go os package documentation.
#include <golang/libgolang.h>
#include <golang/runtime/internal/atomic.h>
#include <fcntl.h>
#include <tuple>
// golang::os::
namespace golang {
namespace os {
// ErrClosed is returned as cause by operations on closed File.
extern LIBGOLANG_API global<error> ErrClosed;
// File mimics os.File from Go.
// its methods return error with path and operation in context.
typedef refptr<class _File> File;
class _File : public object {
_libgolang_ioh* _ioh;
string _path;
internal::atomic::int32ForkReset _inflight; // # of currently inflight IO operations
std::atomic<bool> _closed;
// don't new - create via Open, NewFile, Pipe, ...
private:
_File();
~_File();
friend File _newFile(_libgolang_ioh* ioh, const string& name);
public:
void decref();
public:
LIBGOLANG_API string Name() const;
LIBGOLANG_API error Close();
// Read implements io.Reader from Go: it reads into buf up-to count bytes.
// TODO buf,count -> slice<byte>
LIBGOLANG_API std::tuple<int, error> Read(void *buf, size_t count);
// Write implements io.Writer from Go: it writes all data from buf.
//
// NOTE write behaves like io.Writer in Go - it tries to write as much
// bytes as requested, and if it could write only less - it returns an error.
//
// TODO buf,count -> slice<byte>
LIBGOLANG_API std::tuple<int, error> Write(const void *buf, size_t count);
// Stat returns information about the file.
LIBGOLANG_API error Stat(struct stat *st);
public:
// _sysfd returns underlying OS file handle for the file.
//
// This handle is valid to use only until the File is alive and not closed.
LIBGOLANG_API int _sysfd();
private:
error _err(const char *op, error err);
};
// Open opens file @path.
LIBGOLANG_API std::tuple<File, error> Open(const string &path, int flags = O_RDONLY,
mode_t mode = S_IRUSR | S_IWUSR | S_IXUSR |
S_IRGRP | S_IWGRP | S_IXGRP |
S_IROTH | S_IWOTH | S_IXOTH);
// NewFile wraps OS-level file-descriptor into File.
// The ownership of sysfd is transferred to File.
LIBGOLANG_API std::tuple<File, error> NewFile(int sysfd, const string& name);
// Pipe creates connected pair of files.
LIBGOLANG_API std::tuple</*r*/File, /*w*/File, error> Pipe();
}} // golang::os::
#endif // _NXD_LIBGOLANG_OS_H
// Copyright (C) 2021-2022 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
#include "golang/io.h"
#include "golang/os.h"
#include "golang/_testing.h"
using namespace golang;
using std::tie;
void __test_os_fileio_cpp(const string& tmpd) {
string tpath = tmpd + "/1";
os::File f;
error err;
// open !existing
tie(f, err) = os::Open(tpath);
ASSERT(f == nil);
ASSERT(err != nil);
ASSERT_EQ(err->Error(), "open " + tpath + ": No such file or directory");
// open +w
tie(f, err) = os::Open(tpath, O_CREAT | O_RDWR);
ASSERT(f != nil);
ASSERT(err == nil);
// write
int n;
tie(n, err) = f->Write("hello world\n", 12);
ASSERT_EQ(n, 12);
ASSERT(err == nil);
// close
err = f->Close();
ASSERT(err == nil);
err = f->Close();
ASSERT(err != nil);
ASSERT_EQ(err->Error(), "close " + tpath + ": file already closed");
// read
tie(f, err) = os::Open(tpath);
ASSERT(f != nil);
ASSERT(err == nil);
char buf[128], *p=buf;
int count=20, got=0;
while (got < 12) {
tie(n, err) = f->Read(p, count);
ASSERT(err == nil);
ASSERT(n > 0);
ASSERT(n <= count);
p += n;
got += n;
count -= n;
}
ASSERT_EQ(got, 12);
ASSERT_EQ(string(buf, got), "hello world\n");
tie(n, err) = f->Read(buf, 10);
ASSERT_EQ(n, 0);
ASSERT_EQ(err, io::EOF_);
// fstat
struct stat st;
err = f->Stat(&st);
ASSERT(err == nil);
ASSERT_EQ(st.st_size, 12);
err = f->Close();
ASSERT(err == nil);
}
void _test_os_pipe_cpp() {
os::File r1, w2; // r1 <- w2
os::File r2, w1; // w1 -> r2
error err;
tie(r1, w2, err) = os::Pipe();
ASSERT(r1 != nil);
ASSERT(w2 != nil);
ASSERT(err == nil);
tie(r2, w1, err) = os::Pipe();
ASSERT(r2 != nil);
ASSERT(w1 != nil);
ASSERT(err == nil);
// T2: ->r2->w2 echo
go([r2,w2]() {
char buf[32];
error err;
while (1) {
int n, n2;
tie(n, err) = r2->Read(buf, sizeof(buf));
if (err == io::EOF_)
break;
ASSERT(err == nil);
ASSERT(0 < n && n <= sizeof(buf));
tie(n2, err) = w2->Write(buf, n);
ASSERT(err == nil);
ASSERT_EQ(n2, n);
}
err = r2->Close(); ASSERT(err == nil);
err = w2->Close(); ASSERT(err == nil);
});
// T1: send 1, 2, 3, ... to T2 and assert the numbers come back
int n;
char buf[32];
for (char c = 0; c < 100; ++c) {
buf[0] = c;
tie(n, err) = w1->Write(buf, 1);
ASSERT(err == nil);
ASSERT_EQ(n, 1);
buf[0] = -1;
tie(n, err) = r1->Read(buf, sizeof(buf));
ASSERT(err == nil);
ASSERT_EQ(n, 1);
ASSERT_EQ(buf[0], c);
}
err = w1->Close(); ASSERT(err == nil);
tie(n, err) = r1->Read(buf, sizeof(buf));
ASSERT_EQ(n, 0);
ASSERT_EQ(err, io::EOF_);
err = r1->Close(); ASSERT(err == nil);
}
# -*- coding: utf-8 -*-
# Copyright (C) 2021-2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from __future__ import print_function, absolute_import
from golang.golang_test import import_pyx_tests
import_pyx_tests("golang._os_test")
from golang._os_test import _test_os_fileio_cpp
from golang import b
# import_pyx_tests does not support passing fixtures into tests
def test_pyx_os_fileio_cpp(tmp_path):
_test_os_fileio_cpp(b(str(tmp_path)))
......@@ -184,6 +184,7 @@ def _with_build_defaults(kw): # -> (pygo, kw')
'strings.h',
'sync.h',
'time.h',
'os.h',
'pyx/runtime.h',
'_testing.h',
]])
......
......@@ -20,9 +20,12 @@
"""pyx declarations for libgolang bits that are only interesting for runtimes."""
from libc.stdint cimport uint64_t
from posix.fcntl cimport mode_t
from posix.stat cimport struct_stat
cdef extern from "golang/libgolang.h" namespace "golang" nogil:
struct _libgolang_sema
struct _libgolang_ioh
enum _libgolang_runtime_flags:
STACK_DEAD_WHILE_PARKED
......@@ -39,6 +42,16 @@ cdef extern from "golang/libgolang.h" namespace "golang" nogil:
void (*nanosleep)(uint64_t)
uint64_t (*nanotime)()
_libgolang_ioh* (*io_open) (int* out_syserr, const char *path, int flags, mode_t mode)
_libgolang_ioh* (*io_fdopen) (int* out_syserr, int sysfd)
int (*io_close) (_libgolang_ioh* ioh)
void (*io_free) (_libgolang_ioh* ioh)
int (*io_sysfd) (_libgolang_ioh* ioh)
int (*io_read) (_libgolang_ioh* ioh, void *buf, size_t count)
int (*io_write) (_libgolang_ioh* ioh, const void *buf, size_t count)
int (*io_fstat) (struct_stat* out_st, _libgolang_ioh* ioh)
# XXX better take from golang.pxd, but there it is declared in `namespace
# "golang"` which fails for C-mode compiles.
void panic(const char *)
......@@ -21,8 +21,8 @@
from __future__ import print_function, absolute_import
# Gevent runtime uses gevent's greenlets and semaphores.
# When sema.acquire() blocks, gevent switches us from current to another greenlet.
# Gevent runtime uses gevent's greenlets, semaphores and file objects.
# When sema.acquire() or IO blocks, gevent switches us from current to another greenlet.
# gevent >= 1.5 stopped to provide pxd to its API
# https://github.com/gevent/gevent/issues/1568
......@@ -40,16 +40,25 @@ ELSE:
from gevent import sleep as pygsleep
from libc.stdint cimport uint64_t
from cpython cimport Py_INCREF, Py_DECREF
from libc.stdint cimport uint8_t, uint64_t
from cpython cimport PyObject, Py_INCREF, Py_DECREF
from cython cimport final
from golang.runtime._libgolang cimport _libgolang_runtime_ops, _libgolang_sema, \
STACK_DEAD_WHILE_PARKED, panic
_libgolang_ioh, STACK_DEAD_WHILE_PARKED, panic
from golang.runtime.internal cimport syscall
from golang.runtime cimport _runtime_thread
from golang.runtime._runtime_pymisc cimport PyExc, pyexc_fetch, pyexc_restore
from golang cimport topyexc
from libc.stdlib cimport calloc, free