Commit 0bf63dcb authored by Xavier Thompson's avatar Xavier Thompson

Refactor continuation.hpp introducing parent.hpp

parent 15de1efa
#ifndef TYPON_CORE_CONTINUATION_HPP_INCLUDED
#define TYPON_CORE_CONTINUATION_HPP_INCLUDED
#include <atomic>
#include <concepts>
#include <coroutine>
#include <cstddef>
#include <cstdint>
#include <exception>
#include <limits>
namespace typon
{
struct ForkList
{
struct Node
{
Node * _next;
std::coroutine_handle<> _coroutine;
std::exception_ptr * _exception;
};
Node * _first = nullptr;
template <typename Promise>
requires requires (Promise p)
{
{ p._exception } -> std::same_as<std::exception_ptr&>;
{ p._node } -> std::same_as<Node&>;
}
void insert(std::coroutine_handle<Promise> coroutine) noexcept
{
std::exception_ptr * exception = &(coroutine.promise()._exception);
coroutine.promise()._node = { _first, coroutine, exception };
_first = &(coroutine.promise()._node);
}
void check_exceptions()
{
for (auto node = _first; node != nullptr; node = node->_next)
{
std::exception_ptr & exception = *(node->_exception);
if (exception)
{
std::rethrow_exception(exception);
}
}
}
void clear() noexcept
{
auto next = _first;
while(next)
{
auto node = next;
next = next->_next;
node->_coroutine.destroy();
}
_first = nullptr;
}
};
struct Continuation
{
struct Data
{
using u64 = std::uint_fast64_t;
static constexpr u64 UMAX = std::numeric_limits<u64>::max();
std::coroutine_handle<> _coroutine;
std::coroutine_handle<> _continuation;
ForkList _children;
std::atomic<u64> _n = UMAX;
u64 _thefts = 0;
Data(std::coroutine_handle<> coroutine) noexcept
Continuation(std::coroutine_handle<> coroutine) noexcept
: _coroutine(coroutine)
{}
};
Data * _data;
Continuation() noexcept {}
Continuation(std::nullptr_t null) noexcept : _data(null) {}
template <typename Promise>
Continuation(std::coroutine_handle<Promise> coroutine) noexcept
: _data(&(coroutine.promise()._data))
{}
auto & thefts() noexcept
{
return _data->_thefts;
}
auto & children() noexcept
{
return _data->_children;
}
auto & n() noexcept
{
return _data->_n;
}
void resume()
{
_data->_coroutine.resume();
}
operator bool() noexcept
{
return _data;
}
operator std::coroutine_handle<>() noexcept
{
return _data->_coroutine;
}
std::coroutine_handle<> continuation() noexcept
{
if (_data->_coroutine.done())
{
return _data->_continuation;
}
return _data->_coroutine;
}
};
}
......
......@@ -4,8 +4,8 @@
#include <coroutine>
#include <cstdint>
#include <typon/core/continuation.hpp>
#include <typon/core/forked.hpp>
#include <typon/core/parent.hpp>
#include <typon/core/result.hpp>
#include <typon/core/scheduler.hpp>
......@@ -17,7 +17,7 @@ namespace typon
struct [[nodiscard]] Fork
{
struct promise_type;
using u64 = Continuation::Data::u64;
using u64 = Parent::u64;
std::coroutine_handle<promise_type> _coroutine;
......@@ -33,7 +33,7 @@ namespace typon
struct promise_type : Result<T>
{
Continuation _continuation;
Parent * _parent;
ForkList::Node _node;
Fork get_return_object() noexcept
......@@ -52,15 +52,15 @@ namespace typon
{
std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept
{
auto continuation = coroutine.promise()._continuation;
auto parent = coroutine.promise()._parent;
if (Scheduler::pop())
{
return continuation;
return parent->_coroutine;
}
u64 n = continuation.n().fetch_sub(1, std::memory_order_acq_rel);
u64 n = parent->_n.fetch_sub(1, std::memory_order_acq_rel);
if (n == 1)
{
return continuation.continuation();
return parent->continuation();
}
return std::noop_coroutine();
}
......@@ -87,21 +87,22 @@ namespace typon
template <typename Promise>
auto await_suspend(std::coroutine_handle<Promise> continuation) noexcept
{
_coroutine.promise()._continuation = continuation;
_thefts = _coroutine.promise()._continuation.thefts();
Parent * parent = &(continuation.promise()._data);
_coroutine.promise()._parent = parent;
_thefts = parent->_thefts;
std::coroutine_handle<> on_stack_handle = _coroutine;
Scheduler::push(continuation);
Scheduler::push(parent);
return on_stack_handle;
}
auto await_resume()
{
auto continuation = _coroutine.promise()._continuation;
bool stolen = continuation.thefts() > _thefts;
auto parent = _coroutine.promise()._parent;
bool stolen = parent->_thefts > _thefts;
if (stolen)
{
continuation.children().insert(_coroutine);
parent->_children.insert(_coroutine);
}
return Forked<T>(_coroutine, !stolen);
}
......
......@@ -4,7 +4,7 @@
#include <coroutine>
#include <utility>
#include <typon/core/continuation.hpp>
#include <typon/core/parent.hpp>
#include <typon/core/result.hpp>
......@@ -47,10 +47,10 @@ namespace typon
struct promise_type : Result<T>
{
using u64 = Continuation::Data::u64;
static constexpr u64 UMAX = Continuation::Data::UMAX;
using u64 = Parent::u64;
static constexpr u64 UMAX = Parent::UMAX;
Continuation::Data _data;
Parent _data;
promise_type() noexcept
: _data(std::coroutine_handle<promise_type>::from_promise(*this))
......@@ -76,7 +76,7 @@ namespace typon
{
struct awaitable
{
Continuation::Data & _data;
Parent & _data;
bool await_ready() noexcept
{
......@@ -120,7 +120,7 @@ namespace typon
{
std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept
{
Continuation::Data & data = coroutine.promise()._data;
Parent & data = coroutine.promise()._data;
u64 thefts = data._thefts;
if (thefts == 0)
{
......
#ifndef TYPON_CORE_PARENT_HPP_INCLUDED
#define TYPON_CORE_PARENT_HPP_INCLUDED
#include <atomic>
#include <concepts>
#include <coroutine>
#include <cstdint>
#include <exception>
#include <limits>
#include <vector>
#include <typon/core/continuation.hpp>
namespace typon
{
struct ForkList
{
struct Node
{
Node * _next;
std::coroutine_handle<> _coroutine;
std::exception_ptr * _exception;
};
Node * _first = nullptr;
template <typename Promise>
requires requires (Promise p)
{
{ p._exception } -> std::same_as<std::exception_ptr&>;
{ p._node } -> std::same_as<Node&>;
}
void insert(std::coroutine_handle<Promise> coroutine) noexcept
{
std::exception_ptr * exception = &(coroutine.promise()._exception);
coroutine.promise()._node = { _first, coroutine, exception };
_first = &(coroutine.promise()._node);
}
void check_exceptions()
{
for (auto node = _first; node != nullptr; node = node->_next)
{
std::exception_ptr & exception = *(node->_exception);
if (exception)
{
std::rethrow_exception(exception);
}
}
}
void clear() noexcept
{
auto next = _first;
while(next)
{
auto node = next;
next = next->_next;
node->_coroutine.destroy();
}
_first = nullptr;
}
};
struct Parent : Continuation
{
using u64 = Continuation::u64;
static constexpr u64 UMAX = std::numeric_limits<u64>::max();
std::coroutine_handle<> _continuation;
ForkList _children;
std::atomic<u64> _n = UMAX;
Parent(std::coroutine_handle<> coroutine) noexcept
: Continuation(coroutine)
{}
void resume()
{
_coroutine.resume();
}
operator std::coroutine_handle<>() noexcept
{
return _coroutine;
}
std::coroutine_handle<> continuation() noexcept
{
if (_coroutine.done())
{
return _continuation;
}
return _coroutine;
}
};
}
#endif // TYPON_CORE_PARENT_HPP_INCLUDED
......@@ -59,7 +59,7 @@ namespace typon
get()._notifyer.notify_one();
}
static void push(Continuation task) noexcept
static void push(Continuation * task) noexcept
{
get()._stack[thread_id]->push(task);
}
......@@ -179,8 +179,8 @@ namespace typon
{
if (auto task = stack->steal())
{
task.thefts()++;
coroutine = task;
task->_thefts++;
coroutine = task->_coroutine;
}
return;
}
......@@ -188,8 +188,8 @@ namespace typon
{
if (auto task = stack->pop_top())
{
task.thefts()++;
coroutine = task;
task->_thefts++;
coroutine = task->_coroutine;
return;
}
if (stack->_state.compare_exchange_strong(state, Stack::EMPTY))
......
......@@ -9,13 +9,15 @@
#include <typon/fundamental/ring_buffer.hpp>
#include <typon/core/continuation.hpp>
namespace typon
{
struct Stack
{
using ring_buffer = fdt::lock_free::ring_buffer<Continuation>;
using ring_buffer = fdt::lock_free::ring_buffer<Continuation *>;
using u64 = ring_buffer::u64;
using enum std::memory_order;
......@@ -38,7 +40,7 @@ namespace typon
delete _buffer.load(relaxed);
}
void push(Continuation x) noexcept
void push(Continuation * x) noexcept
{
u64 bottom = _bottom.load(relaxed);
u64 top = _top.load(acquire);
......@@ -73,7 +75,7 @@ namespace typon
return false;
}
Continuation steal() noexcept
Continuation * steal() noexcept
{
u64 top = _top.load(acquire);
std::atomic_thread_fence(seq_cst);
......@@ -81,22 +83,22 @@ namespace typon
ring_buffer * buffer = _buffer.load(consume);
if (top < bottom)
{
Continuation x = buffer->get(top);
Continuation * x = buffer->get(top);
if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed))
{
return { nullptr };
return nullptr;
}
return x;
}
return { nullptr };
return nullptr;
}
Continuation pop_top() noexcept
Continuation * pop_top() noexcept
{
u64 top = _top.load(relaxed);
u64 bottom = _bottom.load(relaxed);
auto buffer = _buffer.load(relaxed);
Continuation x { nullptr };
Continuation * x = nullptr;
if (top < bottom)
{
x = buffer->get(top);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment