Commit 2a359791 authored by Kirill Smelkov's avatar Kirill Smelkov

context: Move/Port context package to C++/Pyx nogil

Provide context-related functionality that can be used directly from C++
and Pyx/nogil codes. Python-level classes and functions become small
wrappers around pyx/nogil ones.

Like with timers (b073f6df "time: Move/Port timers to C++/Pyx nogil")
and interfaces (5a99b769 "libgolang: Start providing interfaces")
memory for objects dynamically allocated on heap is managed
automatically.
parent 9785f2d3
include COPYING README.rst CHANGELOG.rst tox.ini pyproject.toml trun include COPYING README.rst CHANGELOG.rst tox.ini pyproject.toml trun
include golang/libgolang.h include golang/libgolang.h
include golang/runtime/libgolang.cpp include golang/runtime/libgolang.cpp
include golang/context.h
include golang/context.cpp
include golang/cxx.h include golang/cxx.h
include golang/errors.h include golang/errors.h
include golang/errors.cpp include golang/errors.cpp
......
# cython: language_level=2
# Copyright (C) 2019 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Package context mirrors and amends Go package context.
- `Context` represents operational context that carries deadline, cancellation
signal and immutable context-local key -> value dict.
- `background` returns empty context that is never canceled.
- `with_cancel` creates new context that can be canceled on its own.
- `with_deadline` creates new context with deadline.
- `with_timeout` creates new context with timeout.
- `with_value` creates new context with attached key=value.
- `merge` creates new context from 2 parents(*).
See also https://golang.org/pkg/context for Go context package documentation.
See also https://blog.golang.org/context for overview.
(*) not provided in Go version.
"""
from golang cimport chan, structZ, error, refptr, interface
from golang cimport cxx
from libcpp.utility cimport pair
# XXX for std::function cython does not provide operator() and =nullptr
#from libcpp.functional cimport function
#ctypedef function[void()] cancelFunc
cdef extern from "<functional>" namespace "std" nogil:
cppclass cancelFunc "std::function<void()>":
void operator() ()
void operator= (nullptr_t)
cdef extern from "golang/context.h" namespace "golang::context" nogil:
cppclass _Context:
double deadline()
chan[structZ] done()
error err()
interface value(const void *key)
cppclass Context (refptr[_Context]):
# Context.X = Context->X in C++
double deadline "_ptr()->deadline" ()
chan[structZ] done "_ptr()->done" ()
error err "_ptr()->err" ()
interface value "_ptr()->value" (const void *key)
Context background()
error canceled
error deadlineExceeded
pair[Context, cancelFunc] with_cancel (Context parent)
Context with_value (Context parent, const void *key, interface value)
pair[Context, cancelFunc] with_deadline (Context parent, double deadline)
pair[Context, cancelFunc] with_timeout (Context parent, double timeout)
pair[Context, cancelFunc] merge (Context parent1, Context parent2)
# for testing
cxx.set[Context] _tctxchildren(Context ctx)
...@@ -18,54 +18,142 @@ ...@@ -18,54 +18,142 @@
# #
# See COPYING file for full licensing terms. # See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options. # See https://www.nexedi.com/licensing for rationale and options.
"""_context.pyx implements context.py - see context.py for package overview.""" """_context.pyx implements context.pyx - see _context.pxd for package overview."""
from __future__ import print_function, absolute_import from __future__ import print_function, absolute_import
from golang import go as pygo, select as pyselect, default as pydefault, nilchan as pynilchan from golang cimport pychan, nil, _interface, gobject, newref, adoptref, topyexc
from golang import _sync as _pysync # avoid cycle: context -> sync -> context from golang cimport cxx, time
from golang import time as pytime from cython cimport final, internal
from cython.operator cimport typeid
from golang cimport pychan from libc.math cimport INFINITY
from golang cimport time from cpython cimport PyObject, Py_INCREF, Py_DECREF
from cython cimport final from libcpp.cast cimport static_cast, dynamic_cast
# Context is the interface that every context must implement. # _frompyx indicates that a constructur is called from pyx code
cdef object _frompyx = object()
# _newPyCtx creates new PyContext wrapping ctx.
cdef PyContext _newPyCtx(Context ctx):
cdef PyContext pyctx = PyContext.__new__(PyContext, _frompyx)
pyctx.ctx = ctx
pyctx._pydone = pychan.from_chan_structZ(ctx.done())
return pyctx
# Context represents operational context.
# #
# A context carries deadline, cancellation signal and immutable context-local # A context carries deadline, cancellation signal and immutable context-local
# key -> value dict. # key -> value dict.
@final
cdef class PyContext: cdef class PyContext:
cdef Context ctx
cdef pychan _pydone # pychan wrapping ctx.done()
def __cinit__(PyContext pyctx, object bywho):
if bywho is not _frompyx:
raise AssertionError("Context must not be instantiated by user")
def __dealloc__(PyContext pyctx):
ctx = NULL
# deadline() returns context deadline or None, if there is no deadline. # deadline() returns context deadline or None, if there is no deadline.
def deadline(PyContext ctx): # -> time | None def deadline(PyContext pyctx): # -> time | None
raise NotImplementedError() d = pyctx.ctx.deadline()
if d == INFINITY:
return None
return d
# done returns channel that is closed when the context is canceled. # done returns channel that is closed when the context is canceled.
def done(PyContext ctx): # -> pychan(dtype='C.structZ') def done(PyContext pyctx): # -> pychan(dtype='C.structZ')
raise NotImplementedError() return pyctx._pydone
# err returns None if done is not yet closed, or error that explains why context was canceled. # err returns None if done is not yet closed, or error that explains why context was canceled.
def err(PyContext ctx): # -> error def err(PyContext pyctx): # -> error
raise NotImplementedError() with nogil:
err = pyctx.ctx.err()
if err == nil:
return None
if err.eq(canceled):
return pycanceled
if err.eq(deadlineExceeded):
return pydeadlineExceeded
return RuntimeError(err.Error())
# value returns value associated with key, or None, if context has no key. # value returns value associated with key, or None, if context has no key.
# #
# NOTE keys are compared by object identity, _not_ equality. # NOTE keys are compared by object identity, _not_ equality.
# For example two different object instances that are treated by Python as # For example two different object instances that are treated by Python as
# equal will be considered as _different_ keys. # equal will be considered as _different_ keys.
def value(PyContext ctx, object key): # -> value | None def value(PyContext pyctx, object key): # -> value | None
raise NotImplementedError() cdef _PyValue *_pyvalue
xvalue = pyctx.ctx.value(<void *>key)
if xvalue == nil:
return None
_pyvalue = dynamic_cast[_pPyValue](xvalue._ptr())
if _pyvalue == nil:
raise RuntimeError("value is of unexpected C type: %s" % typeid(xvalue).name())
return <object>_pyvalue.pyobj
# _PyValue holds python-level value in a context.
ctypedef _PyValue *_pPyValue # https://github.com/cython/cython/issues/534
cdef cppclass _PyValue (_interface, gobject) nogil:
PyObject *pyobj # holds 1 reference
__init__(object obj) with gil:
Py_INCREF(obj)
this.pyobj = <PyObject*>obj
void incref():
gobject.incref()
void decref():
cdef _PyValue *self = this # https://github.com/cython/cython/issues/3233
if __decref():
del self
__dealloc__():
with gil:
obj = <object>this.pyobj
this.pyobj = NULL
Py_DECREF(obj)
# _newPyCancel creates new _PyCancel wrapping cancel.
cdef _PyCancel _newPyCancel(cancelFunc cancel):
cdef _PyCancel pycancel = _PyCancel.__new__(_PyCancel, _frompyx)
pycancel.cancel = cancel
return pycancel
# _PyCancel wraps C cancel func.
@final
@internal
cdef class _PyCancel:
cdef cancelFunc cancel
def __cinit__(_PyCancel pycancel, object bywho):
if bywho is not _frompyx:
raise AssertionError("_PyCancel must not be instantiated by user")
def __dealloc__(_PyCancel pycancel):
pycancel.cancel = nil
def __call__(_PyCancel pycancel):
with nogil:
pycancel.cancel()
# background returns empty context that is never canceled. # background returns empty context that is never canceled.
def pybackground(): # -> Context def pybackground(): # -> Context
return _pybackground return _pybackground
cdef PyContext _pybackground = _newPyCtx(background())
# canceled is the error returned by Context.err when context is canceled. # canceled is the error returned by Context.err when context is canceled.
pycanceled = RuntimeError("context canceled") pycanceled = RuntimeError(canceled.Error())
# deadlineExceeded is the error returned by Context.err when time goes past context's deadline. # deadlineExceeded is the error returned by Context.err when time goes past context's deadline.
pydeadlineExceeded = RuntimeError("deadline exceeded") pydeadlineExceeded = RuntimeError(deadlineExceeded.Error())
# with_cancel creates new context that can be canceled on its own. # with_cancel creates new context that can be canceled on its own.
...@@ -75,16 +163,25 @@ pydeadlineExceeded = RuntimeError("deadline exceeded") ...@@ -75,16 +163,25 @@ pydeadlineExceeded = RuntimeError("deadline exceeded")
# #
# The caller should explicitly call cancel to release context resources as soon # The caller should explicitly call cancel to release context resources as soon
# the context is no longer needed. # the context is no longer needed.
def pywith_cancel(parent): # -> ctx, cancel def pywith_cancel(PyContext pyparent): # -> ctx, cancel
ctx = _CancelCtx(parent) with nogil:
return ctx, lambda: ctx._cancel(pycanceled) _ = with_cancel(pyparent.ctx)
cdef Context ctx = _.first
cdef cancelFunc cancel = _.second
return _newPyCtx(ctx), _newPyCancel(cancel)
# with_value creates new context with key=value. # with_value creates new context with key=value.
# #
# Returned context inherits from parent and in particular has all other # Returned context inherits from parent and in particular has all other
# (key, value) pairs provided by parent. # (key, value) pairs provided by parent.
def pywith_value(parent, object key, object value): # -> ctx ctypedef _interface *_pinterface # https://github.com/cython/cython/issues/534
return _ValueCtx(key, value, parent) def pywith_value(PyContext pyparent, object key, object value): # -> ctx
pyvalue = adoptref(new _PyValue(value))
cdef _interface *_ipyvalue = static_cast[_pinterface](pyvalue._ptr())
cdef interface ipyvalue = <interface>newref(_ipyvalue)
with nogil:
ctx = with_value(pyparent.ctx, <void *>key, ipyvalue)
return _newPyCtx(ctx)
# with_deadline creates new context with deadline. # with_deadline creates new context with deadline.
# #
...@@ -94,27 +191,22 @@ def pywith_value(parent, object key, object value): # -> ctx ...@@ -94,27 +191,22 @@ def pywith_value(parent, object key, object value): # -> ctx
# #
# The caller should explicitly call cancel to release context resources as soon # The caller should explicitly call cancel to release context resources as soon
# the context is no longer needed. # the context is no longer needed.
def pywith_deadline(parent, double deadline): # -> ctx, cancel def pywith_deadline(PyContext pyparent, double deadline): # -> ctx, cancel
# parent's deadline is before deadline -> just use parent with nogil:
pdead = parent.deadline() _ = with_deadline(pyparent.ctx, deadline)
if pdead is not None and pdead <= deadline: cdef Context ctx = _.first
return pywith_cancel(parent) cdef cancelFunc cancel = _.second
return _newPyCtx(ctx), _newPyCancel(cancel)
# timeout <= 0 -> already canceled
timeout = deadline - time.now()
if timeout <= 0:
ctx, cancel = pywith_cancel(parent)
cancel()
return ctx, cancel
ctx = _TimeoutCtx(timeout, deadline, parent)
return ctx, lambda: ctx._cancel(pycanceled)
# with_timeout creates new context with timeout. # with_timeout creates new context with timeout.
# #
# it is shorthand for with_deadline(parent, now+timeout). # it is shorthand for with_deadline(parent, now+timeout).
def pywith_timeout(parent, double timeout): # -> ctx, cancel def pywith_timeout(PyContext pyparent, double timeout): # -> ctx, cancel
return pywith_deadline(parent, time.now() + timeout) with nogil:
_ = with_timeout(pyparent.ctx, timeout)
cdef Context ctx = _.first
cdef cancelFunc cancel = _.second
return _newPyCtx(ctx), _newPyCancel(cancel)
# merge merges 2 contexts into 1. # merge merges 2 contexts into 1.
# #
...@@ -129,215 +221,28 @@ def pywith_timeout(parent, double timeout): # -> ctx, cancel ...@@ -129,215 +221,28 @@ def pywith_timeout(parent, double timeout): # -> ctx, cancel
# #
# Note: on Go side merge is not part of stdlib context and is provided by # Note: on Go side merge is not part of stdlib context and is provided by
# https://godoc.org/lab.nexedi.com/kirr/go123/xcontext#hdr-Merging_contexts # https://godoc.org/lab.nexedi.com/kirr/go123/xcontext#hdr-Merging_contexts
def pymerge(parent1, parent2): # -> ctx, cancel def pymerge(PyContext parent1, PyContext parent2): # -> ctx, cancel
ctx = _CancelCtx(parent1, parent2) with nogil:
return ctx, lambda: ctx._cancel(pycanceled) _ = merge(parent1.ctx, parent2.ctx)
cdef Context ctx = _.first
# -------- cdef cancelFunc cancel = _.second
return _newPyCtx(ctx), _newPyCancel(cancel)
# _PyBackground implements root context that is never canceled.
@final
cdef class _PyBackground:
def done(bg):
return _nilchanZ
def err(bg):
return None
def value(bg, key):
return None
def deadline(bg):
return None
_pybackground = _PyBackground()
_nilchanZ = pychan.nil('C.structZ')
# _BaseCtx is the common base for Contexts implemented in this package.
cdef class _BaseCtx:
# parents of this context - either _BaseCtx* or generic Context.
# does not change after setup.
cdef tuple _parentv
cdef object _mu # sync.PyMutex
cdef set _children # children of this context - we propagate cancel there (all _BaseCtx)
cdef object _err
cdef object _done # pychan | None
def __init__(_BaseCtx ctx, done, *parentv): # XXX done -> pychan?
ctx._parentv = parentv
ctx._mu = _pysync.PyMutex()
ctx._children = set()
ctx._err = None
# pychan: if context can be canceled on its own
# None: if context can not be canceled on its own
ctx._done = done
if done is None:
assert len(parentv) == 1
ctx._propagateCancel()
def done(_BaseCtx ctx):
if ctx._done is not None:
return ctx._done
return ctx._parentv[0].done()
def err(_BaseCtx ctx):
with ctx._mu:
return ctx._err
# value returns value for key from one of its parents.
# this behaviour is inherited by all contexts except _ValueCtx who amends it.
def value(_BaseCtx ctx, object key):
for parent in ctx._parentv:
v = parent.value(key)
if v is not None:
return v
return None
# deadline returns the earliest deadline of parents. # ---- for tests ----
# this behaviour is inherited by all contexts except _TimeoutCtx who overrides it.
def deadline(_BaseCtx ctx):
d = None
for parent in ctx._parentv:
pd = parent.deadline()
if d is None or (pd is not None and pd < d):
d = pd
return d
# _cancel cancels ctx and its children. def _tctxAssertChildren(PyContext pyctx, set pychildrenOK):
def _cancel(_BaseCtx ctx, err): # pychildrenOK must be set[PyContext]
return ctx._cancelFrom(None, err) for _ in pychildrenOK:
assert isinstance(_, PyContext)
# _cancelFrom cancels ctx and its children.
# if cancelFrom != None it indicates which ctx parent cancellation was the cause for ctx cancel.
def _cancelFrom(_BaseCtx ctx, cancelFrom, err):
with ctx._mu:
if ctx._err is not None:
return # already canceled
ctx._err = err
children = ctx._children; ctx._children = set()
if ctx._done is not None:
ctx._done.close()
# no longer need to propagate cancel from parent after we are canceled
for parent in ctx._parentv:
if parent is cancelFrom:
continue
if isinstance(parent, _BaseCtx):
_parent = <_BaseCtx>parent
with _parent._mu:
if ctx in _parent._children:
_parent._children.remove(ctx)
# propagate cancel to children
for child in children:
child._cancelFrom(ctx, err)
# propagateCancel establishes setup so that whenever a parent is canceled,
# ctx and its children are canceled too.
def _propagateCancel(_BaseCtx ctx):
pforeignv = [] # parents with !pynilchan .done() for foreign contexts
for parent in ctx._parentv:
# if parent can never be canceled (e.g. it is background) - we
# don't need to propagate cancel from it.
pdone = parent.done()
if pdone == pynilchan:
continue
# parent is cancellable - glue to propagate cancel from it to us
if isinstance(parent, _BaseCtx):
_parent = <_BaseCtx>parent
with _parent._mu:
if _parent._err is not None:
ctx._cancel(_parent._err)
else:
_parent._children.add(ctx)
else:
if _ready(pdone):
ctx._cancel(parent.err())
else:
pforeignv.append(parent)
if len(pforeignv) == 0:
return
# there are some foreign contexts to propagate cancel from
def _():
_, _rx = pyselect(
ctx._done.recv, # 0
*[_.done().recv for _ in pforeignv] # 1 + ...
)
# 0. nothing - already canceled
if _ > 0:
ctx._cancel(pforeignv[_-1].err())
pygo(_)
# _CancelCtx is context that can be canceled.
cdef class _CancelCtx(_BaseCtx):
def __init__(_CancelCtx ctx, *parentv):
super(_CancelCtx, ctx).__init__(pychan(dtype='C.structZ'), *parentv)
# _ValueCtx is context that carries key -> value.
cdef class _ValueCtx(_BaseCtx):
# (key, value) specific to this context.
# the rest of the keys are inherited from parents.
# does not change after setup.
cdef object _key
cdef object _value
def __init__(_ValueCtx ctx, object key, object value, parent):
super(_ValueCtx, ctx).__init__(None, parent)
ctx._key = key
ctx._value = value
def value(_ValueCtx ctx, object key):
if ctx._key is key:
return ctx._value
return super(_ValueCtx, ctx).value(key)
# _TimeoutCtx is context that is canceled on timeout.
cdef class _TimeoutCtx(_CancelCtx):
cdef double _deadline
cdef object _timer # pytime.Timer
def __init__(_TimeoutCtx ctx, double timeout, double deadline, parent):
super(_TimeoutCtx, ctx).__init__(parent)
assert timeout > 0
ctx._deadline = deadline
ctx._timer = pytime.after_func(timeout, lambda: ctx._cancel(pydeadlineExceeded))
def deadline(_TimeoutCtx ctx):
return ctx._deadline
# cancel -> stop timer
def _cancelFrom(_TimeoutCtx ctx, cancelFrom, err):
super(_TimeoutCtx, ctx)._cancelFrom(cancelFrom, err)
ctx._timer.stop()
# _ready returns whether channel ch is ready.
def _ready(pychan ch):
_, _rx = pyselect(
ch.recv, # 0
pydefault, # 1
)
if _ == 0:
return True
if _ == 1:
return False
cdef cxx.set[Context] childrenOK
for _ in pychildrenOK:
childrenOK.insert((<PyContext>_).ctx)
# ---- for tests ---- cdef cxx.set[Context] children = _tctxchildren_pyexc(pyctx.ctx)
if children != childrenOK:
raise AssertionError("context children differ") # TODO provide details
def _tctxchildren(_BaseCtx ctx): # -> ctx._children cdef cxx.set[Context] _tctxchildren_pyexc(Context ctx) nogil except +topyexc:
return ctx._children return _tctxchildren(ctx)
// Copyright (C) 2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package context mirrors and amends Go package context.
// See context.h for package overview.
#include "golang/context.h"
#include "golang/cxx.h"
#include "golang/errors.h"
#include "golang/sync.h"
#include "golang/time.h"
#include <math.h>
#include <vector>
using std::pair;
using std::make_pair;
using std::function;
using std::vector;
// golang::context::
namespace golang {
namespace context {
using cxx::set;
static bool _ready(chan<structZ> ch);
// _Background implements root context that is never canceled.
struct _Background final : _Context, object {
void incref() {
object::incref();
}
void decref() {
if (__decref())
delete this;
}
double deadline() { return INFINITY; }
chan<structZ> done() { return NULL; }
error err() { return NULL; }
interface value(const void *key) { return NULL; }
};
static Context _background = adoptref(static_cast<_Context *>(new _Background()));
Context background() {
return _background;
}
const error canceled = errors::New("context canceled");
const error deadlineExceeded = errors::New("deadline exceeded");
// _BaseCtx is the common base for Contexts implemented in this package.
struct _BaseCtx : _Context, object {
// parents of this context - either _BaseCtx* or generic Context.
// does not change after setup.
vector<Context> _parentv;
sync::Mutex _mu;
set<refptr<_BaseCtx>> _children; // children of this context - we propagate cancel there
error _err;
chan<structZ> _done; // chan | nil (nil: => see parent)
void incref() {
object::incref();
}
void decref() {
if (__decref())
delete this;
}
virtual ~_BaseCtx() {}
_BaseCtx(chan<structZ> done, const vector<Context>& parentv) {
_BaseCtx& ctx = *this;
ctx._parentv = parentv;
// chan: if context can be canceled on its own
// nil: if context can not be canceled on its own
ctx._done = done;
if (done == NULL) {
if (parentv.size() != 1)
panic("BUG: _BaseCtx: done==nil, but len(parentv) != 1");
}
ctx._propagateCancel();
}
chan<structZ> done() {
_BaseCtx& ctx = *this;
if (ctx._done != NULL)
return ctx._done;
return ctx._parentv[0]->done();
}
error err() {
_BaseCtx& ctx = *this;
ctx._mu.lock();
defer([&]() {
ctx._mu.unlock();
});
return ctx._err;
}
interface value(const void *key) {
_BaseCtx& ctx = *this;
for (auto parent : ctx._parentv) {
interface v = parent->value(key);
if (v != NULL)
return v;
}
return NULL;
}
double deadline() {
_BaseCtx& ctx = *this;
double d = INFINITY;
for (auto parent : ctx._parentv) {
double pd = parent->deadline();
if (pd < d)
d = pd;
}
return d;
}
// _cancel cancels ctx and its children.
void _cancel(error err) {
_BaseCtx& ctx = *this;
return ctx._cancelFrom(NULL, err);
}
// _cancelFrom cancels ctx and its children.
// if cancelFrom != nil it indicates which ctx parent cancellation was the cause for ctx cancel.
virtual void _cancelFrom(Context cancelFrom, error err) {
_BaseCtx& ctx = *this;
set<refptr<_BaseCtx>> children;
ctx._mu.lock();
if (ctx._err != NULL) {
ctx._mu.unlock();
return; // already canceled
}
ctx._err = err;
ctx._children.swap(children);
ctx._mu.unlock();
if (ctx._done != NULL)
ctx._done.close();
// no longer need to propagate cancel from parent after we are canceled
refptr<_BaseCtx> bctx = newref(&ctx);
for (auto parent : ctx._parentv) {
if (parent == cancelFrom)
continue;
_BaseCtx *_parent = dynamic_cast<_BaseCtx *>(parent._ptr());
if (_parent != NULL) {
_parent->_mu.lock();
_parent->_children.erase(bctx);
_parent->_mu.unlock();
}
}
// propagate cancel to children
Context cctx = newref(static_cast<_Context*>(&ctx));
for (auto child : children)
child->_cancelFrom(cctx, err);
}
// _propagateCancel establishes setup so that whenever a parent is canceled,
// ctx and its children are canceled too.
void _propagateCancel() {
_BaseCtx& ctx = *this;
refptr<_BaseCtx> bctx = newref(&ctx);
vector<Context> pforeignv; // parents with !nil .done() for foreign contexts
for (auto parent : ctx._parentv) {
// if parent can never be canceled (e.g. it is background) - we
// don't need to propagate cancel from it.
chan<structZ> pdone = parent->done();
if (pdone == NULL)
continue;
// parent is cancellable - glue to propagate cancel from it to us
_BaseCtx *_parent = dynamic_cast<_BaseCtx *>(parent._ptr());
if (_parent != NULL) {
_parent->_mu.lock();
if (_parent->_err != NULL)
ctx._cancel(_parent->_err);
else
_parent->_children.insert(bctx);
_parent->_mu.unlock();
}
else {
if (_ready(pdone))
ctx._cancel(parent->err());
else
pforeignv.push_back(parent);
}
}
if (pforeignv.size() == 0)
return;
// there are some foreign contexts to propagate cancel from
go([bctx,pforeignv]() {
vector<_selcase> sel(1+pforeignv.size());
sel[0] = bctx->_done.recvs(); // 0
for (size_t i=0; i<pforeignv.size(); i++)
sel[1+i] = pforeignv[i]->done().recvs(); // 1 + ...
int _ = select(sel);
// 0. nothing - already canceled
if (_ > 0)
bctx->_cancel(pforeignv[_-1]->err());
});
}
};
// _CancelCtx is context that can be canceled.
struct _CancelCtx : _BaseCtx {
_CancelCtx(const vector<Context>& parentv)
: _BaseCtx(makechan<structZ>(), parentv) {}
};
// _ValueCtx is context that carries key -> value.
struct _ValueCtx : _BaseCtx {
// (key, value) specific to this context.
// the rest of the keys are inherited from parents.
// does not change after setup.
const void *_key;
interface _value;
_ValueCtx(const void *key, interface value, Context parent)
: _BaseCtx(NULL, {parent}) {
_ValueCtx& ctx = *this;
ctx._key = key;
ctx._value = value;
}
interface value(const void *key) {
_ValueCtx& ctx = *this;
if (ctx._key == key)
return ctx._value;
return _BaseCtx::value(key);
}
};
// _TimeoutCtx is context that is canceled on timeout.
struct _TimeoutCtx : _CancelCtx {
double _deadline;
time::Timer _timer;
_TimeoutCtx(double timeout, double deadline, Context parent)
: _CancelCtx({parent}) {
_TimeoutCtx& ctx = *this;
if (timeout <= 0)
panic("BUG: _TimeoutCtx: timeout <= 0");
ctx._deadline = deadline;
refptr<_TimeoutCtx> ctxref = newref(&ctx); // pass ctx reference to timer
ctx._timer = time::after_func(timeout, [ctxref]() { ctxref->_cancel(deadlineExceeded); });
}
double deadline() {
_TimeoutCtx& ctx = *this;
return ctx._deadline;
}
// cancel -> stop timer
void _cancelFrom(Context cancelFrom, error err) {
_TimeoutCtx& ctx = *this;
_CancelCtx::_cancelFrom(cancelFrom, err);
ctx._timer->stop();
}
};
pair<Context, function<void()>>
with_cancel(Context parent) {
refptr<_CancelCtx> cctx = adoptref(new _CancelCtx({parent}));
Context ctx = newref (static_cast<_Context*>(cctx._ptr()));
return make_pair(ctx, [cctx]() { cctx->_cancel(canceled); });
}
Context
with_value(Context parent, const void *key, interface value) {
return adoptref(static_cast<_Context*>(new _ValueCtx(key, value, parent)));
}
pair<Context, function<void()>>
with_deadline(Context parent, double deadline) {
// parent's deadline is before deadline -> just use parent
double pdead = parent->deadline();
if (pdead <= deadline)
return with_cancel(parent);
// timeout <= 0 -> already canceled
double timeout = deadline - time::now();
if (timeout <= 0) {
Context ctx;
function<void()> cancel;
tie(ctx, cancel) = with_cancel(parent);
cancel();
return make_pair(ctx, cancel);
}
refptr<_TimeoutCtx> tctx = adoptref(new _TimeoutCtx(timeout, deadline, parent));
Context ctx = newref (static_cast<_Context*>(tctx._ptr()));
return make_pair(ctx, [tctx]() { tctx->_cancel(canceled); });
}
pair<Context, function<void()>>
with_timeout(Context parent, double timeout) {
return with_deadline(parent, time::now() + timeout);
}
pair<Context, function<void()>>
merge(Context parent1, Context parent2) {
refptr<_CancelCtx> cctx = adoptref(new _CancelCtx({parent1, parent2}));
Context ctx = newref (static_cast<_Context*>(cctx._ptr()));
return make_pair(ctx, [cctx]() { cctx->_cancel(canceled); });
}
// _ready returns whether channel ch is ready.
static bool _ready(chan<structZ> ch) {
int _ = select({
ch.recvs(), // 0
_default, // 1
});
return (_ == 0);
}
// _tctxchildren returns context's children, assuming context is instance of _BaseCtx.
set<Context> _tctxchildren(Context ctx) {
_BaseCtx *_bctx = dynamic_cast<_BaseCtx*>(ctx._ptr());
if (_bctx == NULL)
panic("context is not instance of golang.context._BaseCtx");
set<Context> children;
_bctx->_mu.lock();
defer([&]() {
_bctx->_mu.unlock();
});
for (auto bchild : _bctx->_children) {
Context cchild = newref(static_cast<_Context*>(bchild._ptr()));
children.insert(cchild);
}
return children;
}
}} // golang::context::
#ifndef _NXD_LIBGOLANG_CONTEXT_H
#define _NXD_LIBGOLANG_CONTEXT_H
// Copyright (C) 2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package context mirrors and amends Go package context.
//
// - `Context` represents operational context that carries deadline, cancellation
// signal and immutable context-local key -> value dict.
// - `background` returns empty context that is never canceled.
// - `with_cancel` creates new context that can be canceled on its own.
// - `with_deadline` creates new context with deadline.
// - `with_timeout` creates new context with timeout.
// - `with_value` creates new context with attached key=value.
// - `merge` creates new context from 2 parents(*).
//
// See also https://golang.org/pkg/context for Go context package documentation.
// See also https://blog.golang.org/context for overview.
//
// (*) not provided in Go version.
#include <golang/libgolang.h>
#include <golang/cxx.h>
// golang::context::
namespace golang {
namespace context {
// Context is the interface that every context must implement.
//
// A context carries deadline, cancellation signal and immutable context-local
// key -> value dict.
struct _Context : _interface {
// deadline() returns context deadline or +inf, if there is no deadline.
virtual double deadline() = 0; // -> time | INFINITY
// done returns channel that is closed when the context is canceled.
virtual chan<structZ> done() = 0;
// err returns nil if done is not yet closed, or error that explains why context was canceled.
virtual error err() = 0;
// value returns value associated with key, or nil, if context has no key.
virtual interface value(const void *key) = 0; // -> value | nil
};
typedef refptr<_Context> Context;
// background returns empty context that is never canceled.
LIBGOLANG_API Context background();
// canceled is the error returned by Context.err when context is canceled.
extern LIBGOLANG_API const error canceled;
// deadlineExceeded is the error returned by Context.err when time goes past context's deadline.
extern LIBGOLANG_API const error deadlineExceeded;
// with_cancel creates new context that can be canceled on its own.
//
// Returned context inherits from parent and in particular is canceled when
// parent is done.
//
// The caller should explicitly call cancel to release context resources as soon
// the context is no longer needed.
LIBGOLANG_API std::pair<Context, std::function<void()>>
with_cancel(Context parent); // -> ctx, cancel
// with_value creates new context with key=value.
//
// Returned context inherits from parent and in particular has all other
// (key, value) pairs provided by parent.
LIBGOLANG_API Context
with_value(Context parent, const void *key, interface value); // -> ctx
// with_deadline creates new context with deadline.
//
// The deadline of created context is the earliest of provided deadline or
// deadline of parent. Created context will be canceled when time goes past
// context deadline or cancel called, whichever happens first.
//
// The caller should explicitly call cancel to release context resources as soon
// the context is no longer needed.
LIBGOLANG_API std::pair<Context, std::function<void()>>
with_deadline(Context parent, double deadline); // -> ctx, cancel
// with_timeout creates new context with timeout.
//
// it is shorthand for with_deadline(parent, now+timeout).
LIBGOLANG_API std::pair<Context, std::function<void()>>
with_timeout(Context parent, double timeout); // -> ctx, cancel
// merge merges 2 contexts into 1.
//
// The result context:
//
// - is done when parent1 or parent2 is done, or cancel called, whichever happens first,
// - has deadline = min(parent1.Deadline, parent2.Deadline),
// - has associated values merged from parent1 and parent2, with parent1 taking precedence.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
//
// Note: on Go side merge is not part of stdlib context and is provided by
// https://godoc.org/lab.nexedi.com/kirr/go123/xcontext#hdr-Merging_contexts
LIBGOLANG_API std::pair<Context, std::function<void()>>
merge(Context parent1, Context parent2); // -> ctx, cancel
// for testing
LIBGOLANG_API cxx::set<Context> _tctxchildren(Context ctx);
}} // golang::context::
#endif // _NXD_LIBGOLANG_CONTEXT_H
# cython: language_level=2
# Copyright (C) 2019 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Package context mirrors and amends Go package context.
See _context.pxd for package documentation.
"""
# redirect cimport: golang.context -> golang._context (see __init__.pxd for rationale)
from golang._context cimport *
...@@ -20,18 +20,19 @@ ...@@ -20,18 +20,19 @@
from __future__ import print_function, absolute_import from __future__ import print_function, absolute_import
from golang import context, _context, time, nilchan from golang import nilchan, select, default
from golang._context import _tctxchildren as tctxchildren, _ready as ready from golang import context, _context, time
from golang._context import _tctxAssertChildren as tctxAssertChildren
from golang.time_test import dt from golang.time_test import dt
# assertCtx asserts on state of _BaseCtx* # assertCtx asserts on state of _BaseCtx*
def assertCtx(ctx, children, deadline=None, err=None, done=False): def assertCtx(ctx, children, deadline=None, err=None, done=False):
assert isinstance(ctx, _context._BaseCtx) assert isinstance(ctx, _context.PyContext)
assert ctx.deadline() == deadline assert ctx.deadline() == deadline
assert ctx.err() is err assert ctx.err() is err
ctxdone = ctx.done() ctxdone = ctx.done()
assert ready(ctxdone) == done assert ready(ctxdone) == done
assert tctxchildren(ctx) == children tctxAssertChildren(ctx, children)
for i in range(10): # repeated .done() returns the same pyobject for i in range(10): # repeated .done() returns the same pyobject
assert ctx.done() is ctxdone assert ctx.done() is ctxdone
...@@ -262,3 +263,17 @@ def test_deadline(): ...@@ -262,3 +263,17 @@ def test_deadline():
time.sleep(11*dt) time.sleep(11*dt)
assertCtx(ctx, Z, deadline=d, err=D, done=Y) assertCtx(ctx, Z, deadline=d, err=D, done=Y)
# ---- misc ----
# _ready returns whether channel ch is ready.
def ready(ch):
_, _rx = select(
ch.recv, # 0
default, # 1
)
if _ == 0:
return True
if _ == 1:
return False
...@@ -166,6 +166,9 @@ def Extension(name, sources, **kw): ...@@ -166,6 +166,9 @@ def Extension(name, sources, **kw):
dependv.append('%s/golang/libgolang.h' % pygo) dependv.append('%s/golang/libgolang.h' % pygo)
dependv.append('%s/golang/_golang.pxd' % pygo) dependv.append('%s/golang/_golang.pxd' % pygo)
dependv.append('%s/golang/__init__.pxd' % pygo) dependv.append('%s/golang/__init__.pxd' % pygo)
dependv.append('%s/golang/context.h' % pygo)
dependv.append('%s/golang/context.pxd' % pygo)
dependv.append('%s/golang/_context.pxd' % pygo)
dependv.append('%s/golang/cxx.h' % pygo) dependv.append('%s/golang/cxx.h' % pygo)
dependv.append('%s/golang/cxx.pxd' % pygo) dependv.append('%s/golang/cxx.pxd' % pygo)
dependv.append('%s/golang/errors.h' % pygo) dependv.append('%s/golang/errors.h' % pygo)
......
...@@ -193,11 +193,13 @@ setup( ...@@ -193,11 +193,13 @@ setup(
x_dsos = [DSO('golang.runtime.libgolang', x_dsos = [DSO('golang.runtime.libgolang',
['golang/runtime/libgolang.cpp', ['golang/runtime/libgolang.cpp',
'golang/context.cpp',
'golang/errors.cpp', 'golang/errors.cpp',
'golang/sync.cpp', 'golang/sync.cpp',
'golang/time.cpp'], 'golang/time.cpp'],
depends = [ depends = [
'golang/libgolang.h', 'golang/libgolang.h',
'golang/context.h',
'golang/cxx.h', 'golang/cxx.h',
'golang/errors.h', 'golang/errors.h',
'golang/sync.h', 'golang/sync.h',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment