Commit 4690460b authored by Kirill Smelkov's avatar Kirill Smelkov

Nogil IO

Provide C++ package "os" with File, Pipe, etc similarly to what is
provided on Go side. The package works through IO methods provided by
runtimes.

We need IO facility because os/signal package will need to use
pipe in cooperative IO mode in its receiving-loop goroutine.

os.h and os.cpp are based on drafts from wendelin.core:

https://lab.nexedi.com/nexedi/wendelin.core/blob/wendelin.core-2.0.alpha1-18-g38dde766/wcfs/client/wcfs_misc.h
https://lab.nexedi.com/nexedi/wendelin.core/blob/wendelin.core-2.0.alpha1-18-g38dde766/wcfs/client/wcfs_misc.cpp

/reviewed-on !17
parent 07cae4e9
......@@ -21,6 +21,8 @@ include golang/fmt.cpp
include golang/fmt_test.cpp
include golang/io.h
include golang/io.cpp
include golang/os.h
include golang/os.cpp
include golang/strings.h
include golang/strings.cpp
include golang/strings_test.cpp
......
......@@ -7,6 +7,7 @@
/_golang.cpp
/_golang_test.cpp
/_io.cpp
/_os_test.cpp
/_strings_test.cpp
/_sync.cpp
/_sync_test.cpp
......
# -*- coding: utf-8 -*-
# cython: language_level=2
# distutils: language=c++
#
# Copyright (C) 2021-2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from __future__ import print_function, absolute_import
from golang cimport string, topyexc
# os_test.cpp
cdef extern from * nogil:
"""
extern void __test_os_fileio_cpp(const golang::string&);
extern void _test_os_pipe_cpp();
"""
void __test_os_fileio_cpp(string) except +topyexc
void _test_os_pipe_cpp() except +topyexc
def _test_os_fileio_cpp(tmp_path):
cdef string _tmpd = tmp_path
with nogil:
__test_os_fileio_cpp(_tmpd)
def test_os_pipe_cpp():
with nogil:
_test_os_pipe_cpp()
......@@ -173,6 +173,9 @@
#include <stddef.h>
#include <stdint.h>
#include <sys/types.h>
#include <sys/stat.h>
// DSO symbols visibility (based on https://gcc.gnu.org/wiki/Visibility)
#if defined _WIN32 || defined __CYGWIN__
#define LIBGOLANG_DSO_EXPORT __declspec(dllexport)
......@@ -312,6 +315,7 @@ extern LIBGOLANG_API const _selcase _default;
// libgolang runtime - the runtime must be initialized before any other libgolang use.
typedef struct _libgolang_sema _libgolang_sema;
typedef struct _libgolang_ioh _libgolang_ioh;
typedef enum _libgolang_runtime_flags {
// STACK_DEAD_WHILE_PARKED indicates that it is not safe to access
// goroutine's stack memory while the goroutine is parked.
......@@ -347,6 +351,43 @@ typedef struct _libgolang_runtime_ops {
// nanotime should return current time since EPOCH in nanoseconds.
uint64_t (*nanotime)(void);
// ---- IO -----
// NOTE syserr below is an error code always < 0, for example -ENOENT.
// io_open should open file @path similarly to open(2), but the error is
// returned in out_syserr, _not_ in errno.
_libgolang_ioh* (*io_open) (int* out_syserr, const char *path, int flags, mode_t mode);
// io_fdopen should wrap OS-level file descriptor into libgolang IO handle.
// the ownership of sysfd is transferred to returned ioh.
_libgolang_ioh* (*io_fdopen)(int* out_syserr, int sysfd);
// io_close should close underlying file and release file resources
// associated with ioh. it will be called exactly once and with the
// guarantee that no other ioh operation is running durion io_close call.
int/*syserr*/ (*io_close)(_libgolang_ioh* ioh);
// io_free should release ioh memory.
// it will be called exactly once after io_close.
void (*io_free) (_libgolang_ioh* ioh);
// io_sysfd should return OS-level file descriptor associated with
// libgolang IO handle. it should return -1 on error.
int/*sysfd*/ (*io_sysfd) (_libgolang_ioh* ioh);
// io_read should read up to count bytes of data from ioh.
// it should block if waiting for at least some data is needed.
// it should return read amount, 0 on EOF, or syserr on error.
int/*(n|syserr)*/ (*io_read) (_libgolang_ioh* ioh, void *buf, size_t count);
// io_write should write up to count bytes of data into ioh.
// it should block if waiting is needed to write at least some data.
// it should return wrote amount, or syserr on error.
int/*(n|syserr)*/ (*io_write)(_libgolang_ioh* ioh, const void *buf, size_t count);
// io_fstat should stat the file underlying ioh similarly to fstat(2).
int/*syserr*/ (*io_fstat)(struct stat *out_st, _libgolang_ioh* ioh);
} _libgolang_runtime_ops;
LIBGOLANG_API void _libgolang_init(const _libgolang_runtime_ops *runtime_ops);
......
// Copyright (C) 2019-2022 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package os mirrors Go package os.
// See os.h for package overview.
#include "golang/errors.h"
#include "golang/fmt.h"
#include "golang/io.h"
#include "golang/os.h"
#include "golang/time.h"
#include "golang/runtime/internal.h"
#include "golang/runtime/internal/syscall.h"
#include <unistd.h>
#include <string.h>
using golang::internal::_runtime;
namespace sys = golang::internal::syscall;
using std::tuple;
using std::make_tuple;
using std::tie;
// golang::os::
namespace golang {
namespace os {
global<error> ErrClosed = errors::New("file already closed");
// TODO -> os.PathError
static error _pathError(const char *op, const string &path, error err);
// File
string _File::Name() const { return _path; }
_File::_File() {}
_File::~_File() {}
void _File::decref() {
if (__decref()) {
_File& f = *this;
f.Close(); // ignore error
_runtime->io_free(f._ioh);
f._ioh = nil;
delete this;
}
}
File _newFile(_libgolang_ioh* ioh, const string& name) {
File f = adoptref(new _File);
f->_path = name;
f->_ioh = ioh;
f->_inflight.store(0);
f->_closed.store(false);
return f;
}
tuple<File, error> Open(const string &path, int flags, mode_t mode) {
int syserr;
_libgolang_ioh* ioh = _runtime->io_open(&syserr,
path.c_str(), flags, mode);
if (syserr != 0)
return make_tuple(nil, _pathError("open", path, sys::NewErrno(syserr)));
return make_tuple(_newFile(ioh, path), nil);
}
tuple<File, error> NewFile(int sysfd, const string& name) {
int syserr;
_libgolang_ioh* ioh = _runtime->io_fdopen(&syserr, sysfd);
if (syserr != 0)
return make_tuple(nil, _pathError("fdopen", name, sys::NewErrno(syserr)));
return make_tuple(_newFile(ioh, name), nil);
}
error _File::Close() {
_File& f = *this;
bool x = false;
if (!f._closed.compare_exchange_strong(x, true))
return f._err("close", ErrClosed);
// wait till all currently-inprogress IO is complete
//
// TODO try to interrupt those inprogress IO calls.
// It is not so easy however - for example on Linux sys_read from pipe is
// not interrupted by sys_close of that pipe. sys_read/sys_write on regular
// files are also not interrupted by sys_close. For sockets we could use
// sys_shutdown, but shutdown does not work for anything else but sockets.
//
// NOTE1 with io_uring any inflight operation can be cancelled.
// NOTE2 under gevent io_close does interrupt outstanding IO, at least for
// pollable file descriptors, with `cancel_wait_ex: [Errno 9] File
// descriptor was closed in another greenlet` exception.
//
// For now we use simplest-possible way to wait until all IO is complete.
while (1) {
if (f._inflight.load() == 0)
break;
time::sleep(1*time::microsecond);
}
int syserr = _runtime->io_close(f._ioh);
if (syserr != 0)
return f._err("close", sys::NewErrno(syserr));
return nil;
}
int _File::_sysfd() {
_File& f = *this;
f._inflight.fetch_add(+1);
defer([&]() {
f._inflight.fetch_add(-1);
});
if (f._closed.load())
return -1; // bad file descriptor
return _runtime->io_sysfd(f._ioh);
}
tuple<int, error> _File::Read(void *buf, size_t count) {
_File& f = *this;
int n;
f._inflight.fetch_add(+1);
defer([&]() {
f._inflight.fetch_add(-1);
});
if (f._closed.load())
return make_tuple(0, f._err("read", ErrClosed));
n = _runtime->io_read(f._ioh, buf, count);
if (n == 0)
return make_tuple(n, io::EOF_);
if (n < 0)
return make_tuple(0, f._err("read", sys::NewErrno(n)));
return make_tuple(n, nil);
}
tuple<int, error> _File::Write(const void *buf, size_t count) {
_File& f = *this;
int n, wrote=0;
f._inflight.fetch_add(+1);
defer([&]() {
f._inflight.fetch_add(-1);
});
if (f._closed.load())
return make_tuple(0, f._err("write", ErrClosed));
// NOTE contrary to write(2) we have to write all data as io.Writer requires.
while (count != 0) {
n = _runtime->io_write(f._ioh, buf, count);
if (n < 0)
return make_tuple(wrote, f._err("write", sys::NewErrno(n)));
wrote += n;
buf = ((const char *)buf) + n;
count -= n;
}
return make_tuple(wrote, nil);
}
error _File::Stat(struct stat *st) {
_File& f = *this;
f._inflight.fetch_add(+1);
defer([&]() {
f._inflight.fetch_add(-1);
});
if (f._closed.load())
return f._err("stat", ErrClosed);
int syserr = _runtime->io_fstat(st, f._ioh);
if (syserr != 0)
return f._err("stat", sys::NewErrno(syserr));
return nil;
}
// pipe
tuple<File, File, error> Pipe() {
int vfd[2], syserr;
syserr = sys::Pipe(vfd);
if (syserr != 0)
return make_tuple(nil, nil, fmt::errorf("pipe: %w", sys::NewErrno(syserr)));
File r, w;
error err;
tie(r, err) = NewFile(vfd[0], "|0");
if (err != nil) {
return make_tuple(nil, nil, fmt::errorf("pipe: |0: %w", err));
}
tie(w, err) = NewFile(vfd[1], "|1");
if (err != nil) {
r->Close(); // ignore err
return make_tuple(nil, nil, fmt::errorf("pipe: |1: %w", err));
}
return make_tuple(r, w, nil);
}
// _err returns error corresponding to op(file) and underlying error err.
error _File::_err(const char *op, error err) {
_File& f = *this;
return _pathError(op, f._path, err);
}
// _pathError returns os.PathError-like for op/path and underlying error err.
static error _pathError(const char *op, const string &path, error err) {
// TODO use fmt::v and once it lands in
// return fmt::errorf("%s %s: %s", op, v(path), err));
return fmt::errorf("%s %s: %w", op, path.c_str(), err);
}
}} // golang::os::
#ifndef _NXD_LIBGOLANG_OS_H
#define _NXD_LIBGOLANG_OS_H
//
// Copyright (C) 2019-2022 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package os mirrors Go package os.
//
// - `File` represents an opened file.
// - `Open` opens file @path.
// - `Pipe` creates new pipe.
// - `NewFile` wraps OS-level file-descriptor into File.
//
// See also https://golang.org/pkg/os for Go os package documentation.
#include <golang/libgolang.h>
#include <golang/runtime/internal/atomic.h>
#include <fcntl.h>
#include <tuple>
// golang::os::
namespace golang {
namespace os {
// ErrClosed is returned as cause by operations on closed File.
extern LIBGOLANG_API global<error> ErrClosed;
// File mimics os.File from Go.
// its methods return error with path and operation in context.
typedef refptr<class _File> File;
class _File : public object {
_libgolang_ioh* _ioh;
string _path;
internal::atomic::int32ForkReset _inflight; // # of currently inflight IO operations
std::atomic<bool> _closed;
// don't new - create via Open, NewFile, Pipe, ...
private:
_File();
~_File();
friend File _newFile(_libgolang_ioh* ioh, const string& name);
public:
void decref();
public:
LIBGOLANG_API string Name() const;
LIBGOLANG_API error Close();
// Read implements io.Reader from Go: it reads into buf up-to count bytes.
// TODO buf,count -> slice<byte>
LIBGOLANG_API std::tuple<int, error> Read(void *buf, size_t count);
// Write implements io.Writer from Go: it writes all data from buf.
//
// NOTE write behaves like io.Writer in Go - it tries to write as much
// bytes as requested, and if it could write only less - it returns an error.
//
// TODO buf,count -> slice<byte>
LIBGOLANG_API std::tuple<int, error> Write(const void *buf, size_t count);
// Stat returns information about the file.
LIBGOLANG_API error Stat(struct stat *st);
public:
// _sysfd returns underlying OS file handle for the file.
//
// This handle is valid to use only until the File is alive and not closed.
LIBGOLANG_API int _sysfd();
private:
error _err(const char *op, error err);
};
// Open opens file @path.
LIBGOLANG_API std::tuple<File, error> Open(const string &path, int flags = O_RDONLY,
mode_t mode = S_IRUSR | S_IWUSR | S_IXUSR |
S_IRGRP | S_IWGRP | S_IXGRP |
S_IROTH | S_IWOTH | S_IXOTH);
// NewFile wraps OS-level file-descriptor into File.
// The ownership of sysfd is transferred to File.
LIBGOLANG_API std::tuple<File, error> NewFile(int sysfd, const string& name);
// Pipe creates connected pair of files.
LIBGOLANG_API std::tuple</*r*/File, /*w*/File, error> Pipe();
}} // golang::os::
#endif // _NXD_LIBGOLANG_OS_H
// Copyright (C) 2021-2022 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
#include "golang/io.h"
#include "golang/os.h"
#include "golang/_testing.h"
using namespace golang;
using std::tie;
void __test_os_fileio_cpp(const string& tmpd) {
string tpath = tmpd + "/1";
os::File f;
error err;
// open !existing
tie(f, err) = os::Open(tpath);
ASSERT(f == nil);
ASSERT(err != nil);
ASSERT_EQ(err->Error(), "open " + tpath + ": No such file or directory");
// open +w
tie(f, err) = os::Open(tpath, O_CREAT | O_RDWR);
ASSERT(f != nil);
ASSERT(err == nil);
// write
int n;
tie(n, err) = f->Write("hello world\n", 12);
ASSERT_EQ(n, 12);
ASSERT(err == nil);
// close
err = f->Close();
ASSERT(err == nil);
err = f->Close();
ASSERT(err != nil);
ASSERT_EQ(err->Error(), "close " + tpath + ": file already closed");
// read
tie(f, err) = os::Open(tpath);
ASSERT(f != nil);
ASSERT(err == nil);
char buf[128], *p=buf;
int count=20, got=0;
while (got < 12) {
tie(n, err) = f->Read(p, count);
ASSERT(err == nil);
ASSERT(n > 0);
ASSERT(n <= count);
p += n;
got += n;
count -= n;
}
ASSERT_EQ(got, 12);
ASSERT_EQ(string(buf, got), "hello world\n");
tie(n, err) = f->Read(buf, 10);
ASSERT_EQ(n, 0);
ASSERT_EQ(err, io::EOF_);
// fstat
struct stat st;
err = f->Stat(&st);
ASSERT(err == nil);
ASSERT_EQ(st.st_size, 12);
err = f->Close();
ASSERT(err == nil);
}
void _test_os_pipe_cpp() {
os::File r1, w2; // r1 <- w2
os::File r2, w1; // w1 -> r2
error err;
tie(r1, w2, err) = os::Pipe();
ASSERT(r1 != nil);
ASSERT(w2 != nil);
ASSERT(err == nil);
tie(r2, w1, err) = os::Pipe();
ASSERT(r2 != nil);
ASSERT(w1 != nil);
ASSERT(err == nil);
// T2: ->r2->w2 echo
go([r2,w2]() {
char buf[32];
error err;
while (1) {
int n, n2;
tie(n, err) = r2->Read(buf, sizeof(buf));
if (err == io::EOF_)
break;
ASSERT(err == nil);
ASSERT(0 < n && n <= sizeof(buf));
tie(n2, err) = w2->Write(buf, n);
ASSERT(err == nil);
ASSERT_EQ(n2, n);
}
err = r2->Close(); ASSERT(err == nil);
err = w2->Close(); ASSERT(err == nil);
});
// T1: send 1, 2, 3, ... to T2 and assert the numbers come back
int n;
char buf[32];
for (char c = 0; c < 100; ++c) {
buf[0] = c;
tie(n, err) = w1->Write(buf, 1);
ASSERT(err == nil);
ASSERT_EQ(n, 1);
buf[0] = -1;
tie(n, err) = r1->Read(buf, sizeof(buf));
ASSERT(err == nil);
ASSERT_EQ(n, 1);
ASSERT_EQ(buf[0], c);
}
err = w1->Close(); ASSERT(err == nil);
tie(n, err) = r1->Read(buf, sizeof(buf));
ASSERT_EQ(n, 0);
ASSERT_EQ(err, io::EOF_);
err = r1->Close(); ASSERT(err == nil);
}
# -*- coding: utf-8 -*-
# Copyright (C) 2021-2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from __future__ import print_function, absolute_import
from golang.golang_test import import_pyx_tests
import_pyx_tests("golang._os_test")
from golang._os_test import _test_os_fileio_cpp
from golang import b
# import_pyx_tests does not support passing fixtures into tests
def test_pyx_os_fileio_cpp(tmp_path):
_test_os_fileio_cpp(b(str(tmp_path)))
......@@ -184,6 +184,7 @@ def _with_build_defaults(kw): # -> (pygo, kw')
'strings.h',
'sync.h',
'time.h',
'os.h',
'pyx/runtime.h',
'_testing.h',
]])
......
......@@ -20,9 +20,12 @@
"""pyx declarations for libgolang bits that are only interesting for runtimes."""
from libc.stdint cimport uint64_t
from posix.fcntl cimport mode_t
from posix.stat cimport struct_stat
cdef extern from "golang/libgolang.h" namespace "golang" nogil:
struct _libgolang_sema
struct _libgolang_ioh
enum _libgolang_runtime_flags:
STACK_DEAD_WHILE_PARKED
......@@ -39,6 +42,16 @@ cdef extern from "golang/libgolang.h" namespace "golang" nogil:
void (*nanosleep)(uint64_t)
uint64_t (*nanotime)()
_libgolang_ioh* (*io_open) (int* out_syserr, const char *path, int flags, mode_t mode)
_libgolang_ioh* (*io_fdopen) (int* out_syserr, int sysfd)
int (*io_close) (_libgolang_ioh* ioh)
void (*io_free) (_libgolang_ioh* ioh)
int (*io_sysfd) (_libgolang_ioh* ioh)
int (*io_read) (_libgolang_ioh* ioh, void *buf, size_t count)
int (*io_write) (_libgolang_ioh* ioh, const void *buf, size_t count)
int (*io_fstat) (struct_stat* out_st, _libgolang_ioh* ioh)
# XXX better take from golang.pxd, but there it is declared in `namespace
# "golang"` which fails for C-mode compiles.
void panic(const char *)
......@@ -21,8 +21,8 @@
from __future__ import print_function, absolute_import
# Gevent runtime uses gevent's greenlets and semaphores.
# When sema.acquire() blocks, gevent switches us from current to another greenlet.
# Gevent runtime uses gevent's greenlets, semaphores and file objects.
# When sema.acquire() or IO blocks, gevent switches us from current to another greenlet.
# gevent >= 1.5 stopped to provide pxd to its API
# https://github.com/gevent/gevent/issues/1568
......@@ -40,16 +40,25 @@ ELSE:
from gevent import sleep as pygsleep
from libc.stdint cimport uint64_t
from cpython cimport Py_INCREF, Py_DECREF
from libc.stdint cimport uint8_t, uint64_t
from cpython cimport PyObject, Py_INCREF, Py_DECREF
from cython cimport final
from golang.runtime._libgolang cimport _libgolang_runtime_ops, _libgolang_sema, \
STACK_DEAD_WHILE_PARKED, panic
_libgolang_ioh, STACK_DEAD_WHILE_PARKED, panic
from golang.runtime.internal cimport syscall
from golang.runtime cimport _runtime_thread
from golang.runtime._runtime_pymisc cimport PyExc, pyexc_fetch, pyexc_restore
from golang cimport topyexc
from libc.stdlib cimport calloc, free
from libc.errno cimport EBADF
from posix.fcntl cimport mode_t, F_GETFL, F_SETFL, O_NONBLOCK, O_ACCMODE, O_RDONLY, O_WRONLY, O_RDWR
from posix.stat cimport struct_stat, S_ISREG, S_ISDIR, S_ISBLK
from posix.strings cimport bzero
from gevent.fileobject import FileObjectThread, FileObjectPosix
# _goviapy & _togo serve go
def _goviapy(_togo _ not None):
......@@ -114,6 +123,8 @@ cdef nogil:
if not ok:
panic("pyxgo: gevent: go: failed")
# ---- semaphore ----
_libgolang_sema* sema_alloc():
cdef PyExc exc
with gil:
......@@ -149,6 +160,8 @@ cdef nogil:
if not ok:
panic("pyxgo: gevent: sema: release: failed")
# ---- time ----
void nanosleep(uint64_t dt):
cdef PyExc exc
with gil:
......@@ -158,6 +171,210 @@ cdef nogil:
if not ok:
panic("pyxgo: gevent: sleep: failed")
# ---- IO ----
struct IOH:
PyObject* pygfobj # FileObjectPosix | FileObjectThread
int sysfd # for direct access == pygfobj.fileno()
_libgolang_ioh* io_open(int *out_syserr, const char *path, int flags, mode_t mode):
# open the file and see in io_fdopen whether we can make its IO to be cooperative
# no need to open with O_NONBLOCK because it does not affect anything at open time
sysfd = syscall.Open(path, flags, mode)
if sysfd < 0:
out_syserr[0] = sysfd
return NULL
return io_fdopen(out_syserr, sysfd)
_libgolang_ioh* io_fdopen(int *out_syserr, int sysfd):
# close sysfd on any error
ioh = _io_fdopen(out_syserr, sysfd)
if ioh == NULL:
syscall.Close(sysfd) # ignore err
return ioh
_libgolang_ioh* _io_fdopen(int *out_syserr, int sysfd):
# check if we should enable O_NONBLOCK on this file-descriptor
# even though we could enable O_NONBLOCK for regular files, it does not
# work as expected as most unix'es report regular files as always read
# and write ready.
cdef struct_stat st
cdef int syserr = syscall.Fstat(sysfd, &st)
if syserr < 0:
out_syserr[0] = syserr
return NULL
m = st.st_mode
blocking = (S_ISREG(m) or S_ISDIR(m) or S_ISBLK(m)) # fd cannot refer to symlink
# retrieve current sysfd flags and access mode
flags = syscall.Fcntl(sysfd, F_GETFL, 0)
if flags < 0:
out_syserr[0] = flags
return NULL
acc = (flags & O_ACCMODE)
# enable O_NONBLOCK if needed
if not blocking:
syserr = syscall.Fcntl(sysfd, F_SETFL, flags | O_NONBLOCK)
if syserr < 0:
out_syserr[0] = syserr
return NULL
# create IOH backed by FileObjectThread or FileObjectPosix
ioh = <IOH*>calloc(1, sizeof(IOH))
if ioh == NULL:
panic("out of memory")
cdef PyObject* pygfobj = NULL
cdef PyExc exc
with gil:
pyexc_fetch(&exc)
ok = __io_fdopen(&pygfobj, out_syserr, sysfd, blocking, acc)
pyexc_restore(exc)
if not ok:
panic("pyxgo: gevent: io: fdopen: failed")
if pygfobj == NULL:
return NULL
ioh.pygfobj = pygfobj
ioh.sysfd = sysfd
return <_libgolang_ioh*>ioh
cdef:
bint __io_fdopen(PyObject** ppygfobj, int *out_syserr, int sysfd, bint blocking, int acc):
mode = 'b'
if acc == O_RDONLY:
mode += 'r'
elif acc == O_WRONLY:
mode += 'w'
elif acc == O_RDWR:
mode += 'w+'
pygfobj = None
try:
if blocking:
pygfobj = FileObjectThread(sysfd, mode=mode, buffering=0)
else:
pygfobj = FileObjectPosix(sysfd, mode=mode, buffering=0)
except OSError as e:
out_syserr[0] = -e.errno
else:
Py_INCREF(pygfobj)
ppygfobj[0] = <PyObject*>pygfobj
out_syserr[0] = 0
return True
cdef nogil:
int io_close(_libgolang_ioh* _ioh):
ioh = <IOH*>_ioh
cdef int syserr
cdef PyExc exc # XXX also save/restore errno (+everywhere)
with gil:
pyexc_fetch(&exc)
ok = _io_close(ioh, &syserr)
pyexc_restore(exc)
if not ok:
panic("pyxgo: gevent: io: close: failed")
return syserr
cdef:
bint _io_close(IOH* ioh, int* out_syserr):
pygfobj = <object>ioh.pygfobj
try:
if ioh.sysfd == -1:
out_syserr[0] = -EBADF
else:
pygfobj.close()
ioh.sysfd = -1
out_syserr[0] = 0
except OSError as e:
out_syserr[0] = -e.errno
return True
cdef nogil:
void io_free(_libgolang_ioh* _ioh):
ioh = <IOH*>_ioh
cdef PyExc exc
with gil:
pyexc_fetch(&exc)
ok = _io_free(ioh)
pyexc_restore(exc)
if not ok:
panic("pyxgo: gevent: io: free: failed")
bzero(ioh, sizeof(IOH))
free(ioh)
cdef:
bint _io_free(IOH* ioh):
pygfobj = <object>ioh.pygfobj
ioh.pygfobj = NULL
Py_DECREF(pygfobj)
return True
cdef nogil:
int io_sysfd(_libgolang_ioh* _ioh):
ioh = <IOH*>_ioh
return ioh.sysfd
cdef nogil:
int io_read(_libgolang_ioh* _ioh, void *buf, size_t count):
ioh = <IOH*>_ioh
cdef int n
cdef PyExc exc
with gil:
pyexc_fetch(&exc)
ok = _io_read(ioh, &n, buf, count)
pyexc_restore(exc)
if not ok:
panic("pyxgo: gevent: io: read: failed")
return n
cdef:
bint _io_read(IOH* ioh, int* out_n, void *buf, size_t count):
pygfobj = <object>ioh.pygfobj
cdef uint8_t[::1] mem = <uint8_t[:count]>buf
xmem = memoryview(mem) # to avoid https://github.com/cython/cython/issues/3900 on mem[:0]=b''
try:
n = pygfobj.readinto(xmem)
except OSError as e:
n = -e.errno
out_n[0] = n
return True
cdef nogil:
int io_write(_libgolang_ioh* _ioh, const void *buf, size_t count):
ioh = <IOH*>_ioh
cdef int n
cdef PyExc exc
with gil:
pyexc_fetch(&exc)
ok = _io_write(ioh, &n, buf, count)
pyexc_restore(exc)
if not ok:
panic("pyxgo: gevent: io: write: failed")
return n
cdef:
bint _io_write(IOH* ioh, int* out_n, const void *buf, size_t count):
pygfobj = <object>ioh.pygfobj
cdef const uint8_t[::1] mem = <const uint8_t[:count]>buf
try:
n = pygfobj.write(mem)
except OSError as e:
n = -e.errno
out_n[0] = n
return True
int io_fstat(struct_stat* out_st, _libgolang_ioh* _ioh):
ioh = <IOH*>_ioh
return syscall.Fstat(ioh.sysfd, out_st)
cdef nogil:
# XXX const
_libgolang_runtime_ops gevent_ops = _libgolang_runtime_ops(
......@@ -172,6 +389,14 @@ cdef nogil:
sema_release = sema_release,
nanosleep = nanosleep,
nanotime = _runtime_thread.nanotime, # reuse from _runtime_thread
io_open = io_open,
io_fdopen = io_fdopen,
io_close = io_close,
io_free = io_free,
io_sysfd = io_sysfd,
io_read = io_read,
io_write = io_write,
io_fstat = io_fstat,
)
from cpython cimport PyCapsule_New
......
# cython: language_level=2
# Copyright (C) 2019-2020 Nexedi SA and Contributors.
# Copyright (C) 2019-2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
......@@ -31,6 +31,8 @@ from __future__ import print_function, absolute_import
# new thread - does not depend on GIL. On POSIX, for example, it is small
# wrapper around pthread_create.
#
# For IO direct OS system calls, such as read and write, are used.
#
# NOTE Cython declares PyThread_acquire_lock/PyThread_release_lock as nogil
from cpython.pythread cimport PyThread_acquire_lock, PyThread_release_lock, \
PyThread_type_lock, WAIT_LOCK
......@@ -87,12 +89,17 @@ cdef extern from "pythread.h" nogil:
void PyThread_free_lock(PyThread_type_lock)
from golang.runtime._libgolang cimport _libgolang_runtime_ops, _libgolang_sema, \
_libgolang_runtime_flags, panic
_libgolang_ioh, _libgolang_runtime_flags, panic
from golang.runtime.internal cimport syscall
from libc.stdint cimport uint64_t, UINT64_MAX
from libc.stdlib cimport calloc, free
from libc.errno cimport errno, EINTR, EBADF
from posix.fcntl cimport mode_t
from posix.stat cimport struct_stat
from posix.strings cimport bzero
IF POSIX:
from posix.time cimport clock_gettime, nanosleep as posix_nanosleep, timespec, CLOCK_REALTIME
from libc.errno cimport errno, EINTR
ELSE:
# !posix via-gil timing fallback
import time as pytimemod
......@@ -119,6 +126,8 @@ cdef nogil:
if pytid == -1:
panic("pygo: failed")
# ---- semaphore ----
_libgolang_sema* sema_alloc():
# python calls it "lock", but it is actually a semaphore.
# and in particular can be released by thread different from thread that acquired it.
......@@ -139,6 +148,8 @@ cdef nogil:
pysema = <PyThread_type_lock>gsema
PyThread_release_lock(pysema)
# ---- time ----
IF POSIX:
void nanosleep(uint64_t dt):
cdef timespec ts
......@@ -186,6 +197,58 @@ cdef nogil:
panic("pyxgo: thread: nanotime: time overflow")
return <uint64_t>t_ns
# ---- IO ----
struct IOH:
int sysfd
_libgolang_ioh* io_open(int *out_syserr, const char *path, int flags, mode_t mode):
sysfd = syscall.Open(path, flags, mode)
if sysfd < 0:
out_syserr[0] = sysfd
return NULL
return io_fdopen(out_syserr, sysfd)
_libgolang_ioh* io_fdopen(int *out_syserr, int sysfd):
if sysfd < 0:
out_syserr[0] = -EBADF
return NULL
ioh = <IOH*>calloc(1, sizeof(IOH))
if ioh == NULL:
panic("out of memory")
ioh.sysfd = sysfd
out_syserr[0] = 0
return <_libgolang_ioh*>ioh
int io_close(_libgolang_ioh* _ioh):
ioh = <IOH*>_ioh
syserr = syscall.Close(ioh.sysfd)
ioh.sysfd = -1
return syserr
void io_free(_libgolang_ioh* _ioh):
ioh = <IOH*>_ioh
bzero(ioh, sizeof(IOH))
free(ioh)
int io_sysfd(_libgolang_ioh* _ioh):
ioh = <IOH*>_ioh
return ioh.sysfd
int io_read(_libgolang_ioh* _ioh, void *buf, size_t count):
ioh = <IOH*>_ioh
return syscall.Read(ioh.sysfd, buf, count)
int io_write(_libgolang_ioh* _ioh, const void *buf, size_t count):
ioh = <IOH*>_ioh
return syscall.Write(ioh.sysfd, buf, count)
int io_fstat(struct_stat* out_st, _libgolang_ioh* _ioh):
ioh = <IOH*>_ioh
return syscall.Fstat(ioh.sysfd, out_st)
# XXX const
_libgolang_runtime_ops thread_ops = _libgolang_runtime_ops(
......@@ -197,6 +260,14 @@ cdef nogil:
sema_release = sema_release,
nanosleep = nanosleep,
nanotime = nanotime,
io_open = io_open,
io_fdopen = io_fdopen,
io_close = io_close,
io_free = io_free,
io_sysfd = io_sysfd,
io_read = io_read,
io_write = io_write,
io_fstat = io_fstat,
)
from cpython cimport PyCapsule_New
......
......@@ -203,6 +203,7 @@ setup(
'golang/errors.cpp',
'golang/fmt.cpp',
'golang/io.cpp',
'golang/os.cpp',
'golang/strings.cpp',
'golang/sync.cpp',
'golang/time.cpp'],
......@@ -216,6 +217,7 @@ setup(
'golang/errors.h',
'golang/fmt.h',
'golang/io.h',
'golang/os.h',
'golang/strings.h',
'golang/sync.h',
'golang/time.h'],
......@@ -278,6 +280,10 @@ setup(
Ext('golang._io',
['golang/_io.pyx']),
Ext('golang._os_test',
['golang/_os_test.pyx',
'golang/os_test.cpp']),
Ext('golang._strings_test',
['golang/_strings_test.pyx',
'golang/strings_test.cpp']),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment