Commit 68ca2bb2 authored by Xavier Thompson's avatar Xavier Thompson

Introduce detached concurrency with future.hpp

parent 0bf63dcb
......@@ -15,6 +15,8 @@ namespace typon
std::coroutine_handle<> _coroutine;
u64 _thefts = 0;
Continuation() noexcept {}
Continuation(std::coroutine_handle<> coroutine) noexcept
: _coroutine(coroutine)
{}
......
#ifndef TYPON_CORE_FUTURE_HPP_INCLUDED
#define TYPON_CORE_FUTURE_HPP_INCLUDED
#include <atomic>
#include <coroutine>
#include <cstdint>
#include <typon/core/continuation.hpp>
#include <typon/core/result.hpp>
#include <typon/core/scheduler.hpp>
#include <typon/core/stack.hpp>
namespace typon
{
template <typename T = void>
struct [[nodiscard]] Future
{
struct promise_type;
using u64 = Continuation::u64;
using enum std::memory_order;
static constexpr std::uintptr_t no_waiter {0};
static constexpr std::uintptr_t ready {1};
static constexpr std::uintptr_t discarded {2};
std::coroutine_handle<promise_type> _coroutine;
bool _ready = false;
Future(std::coroutine_handle<promise_type> coroutine) noexcept
: _coroutine(coroutine)
{}
Future(const Future &) = delete;
Future& operator=(const Future &) = delete;
Future(Future && other) noexcept
: _coroutine(std::exchange(other._coroutine, nullptr))
, _ready(other._ready)
{}
Future & operator=(Future other) noexcept
{
std::swap(_coroutine, other._coroutine);
std::swap(_ready, other._ready);
return *this;
}
~Future()
{
if (_coroutine)
{
if (_ready)
{
_coroutine.destroy();
}
else
{
auto state =_coroutine.promise()._state.exchange(discarded, acq_rel);
if (state == ready)
{
_coroutine.destroy();
}
}
}
}
struct promise_type : Result<T>
{
std::atomic<std::uintptr_t> _state { no_waiter };
Future get_return_object() noexcept
{
return { std::coroutine_handle<promise_type>::from_promise(*this) };
}
std::suspend_always initial_suspend() noexcept
{
return {};
}
auto final_suspend() noexcept
{
struct awaitable : std::suspend_always
{
std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept
{
auto continuation = Scheduler::peek();
if (Scheduler::pop())
{
return continuation->_coroutine;
}
auto state = coroutine.promise()._state.exchange(ready, acq_rel);
if (state == discarded)
{
coroutine.destroy();
}
else
{
if (state > 2)
{
Scheduler::enable(reinterpret_cast<Stack *>(state));
}
}
return std::noop_coroutine();
}
};
return awaitable {};
}
};
auto operator co_await() &&
{
struct awaitable
{
Future _future;
std::coroutine_handle<promise_type> _coroutine;
Continuation _continuation;
awaitable(Future && f, std::coroutine_handle<promise_type> c) noexcept
: _future(std::move(f))
, _coroutine(c)
{}
bool await_ready() noexcept
{
return false;
}
auto await_suspend(std::coroutine_handle<> continuation) noexcept
{
_continuation._coroutine = continuation;
std::coroutine_handle<> on_stack_handle = _coroutine;
Scheduler::push(&(_continuation));
return on_stack_handle;
}
auto await_resume() noexcept
{
_future._ready = !_continuation._thefts;
return std::move(_future);
}
};
return awaitable { std::move(*this), _coroutine };
}
auto get()
{
struct awaitable
{
std::coroutine_handle<promise_type> _coroutine;
bool & _ready;
bool await_ready() noexcept
{
return _ready || _coroutine.promise()._state.load(acquire) == ready;
}
void await_suspend(std::coroutine_handle<> continuation) noexcept
{
auto stack = Scheduler::suspend(continuation);
auto state = reinterpret_cast<std::uintptr_t>(stack);
if (_coroutine.promise()._state.exchange(state, acq_rel) == ready)
{
Scheduler::enable(stack);
}
}
auto await_resume()
{
return _coroutine.promise().get();
}
};
return awaitable { _coroutine, this->_ready };
}
};
}
#endif // TYPON_CORE_FUTURE_HPP_INCLUDED
......@@ -74,6 +74,11 @@ namespace typon
return result;
}
static Continuation * peek() noexcept
{
return get()._stack[thread_id]->peek();
}
static auto suspend(std::coroutine_handle<> coroutine) noexcept
{
Stack * stack = std::exchange(get()._stack[thread_id], nullptr);
......
......@@ -75,6 +75,13 @@ namespace typon
return false;
}
Continuation * peek() noexcept
{
u64 bottom = _bottom.load(relaxed) - 1;
ring_buffer * buffer = _buffer.load(relaxed);
return buffer->get(bottom);
}
Continuation * steal() noexcept
{
u64 top = _top.load(acquire);
......
......@@ -5,6 +5,7 @@
#include <typon/core/fork.hpp>
#include <typon/core/forked.hpp>
#include <typon/core/future.hpp>
#include <typon/core/join.hpp>
#include <typon/core/promise.hpp>
#include <typon/core/root.hpp>
......@@ -23,6 +24,16 @@ namespace typon
co_return co_await std::move(local_task);
}
template <typename Task>
Future<typename Task::promise_type::value_type> future(Task task)
{
// Put the task in a local variable to ensure its destructor will
// be called on co_return instead of only on coroutine destruction.
Task local_task = std::move(task);
co_return co_await std::move(local_task);
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment