Commit f79f9af7 authored by Xavier Thompson's avatar Xavier Thompson

Enable opt-in deferred reclamation of forks

parent 673cea98
...@@ -63,15 +63,19 @@ namespace typon ...@@ -63,15 +63,19 @@ namespace typon
{ {
return span->_coroutine; return span->_coroutine;
} }
auto rank = coroutine.promise()._rank;
if (auto & exception = coroutine.promise()._exception) if (auto & exception = coroutine.promise()._exception)
{ {
span->set_exception(exception, coroutine.promise()._rank); span->set_exception(exception, rank);
} }
if (!(rank & 1))
{
auto & ref = coroutine.promise()._node._ref; auto & ref = coroutine.promise()._node._ref;
if (!ref.exchange(false, std::memory_order_acq_rel)) if (!ref.exchange(false, std::memory_order_acq_rel))
{ {
coroutine.destroy(); coroutine.destroy();
} }
}
u64 n = span->_n.fetch_sub(1, std::memory_order_acq_rel); u64 n = span->_n.fetch_sub(1, std::memory_order_acq_rel);
if (n == 1) if (n == 1)
{ {
...@@ -85,25 +89,45 @@ namespace typon ...@@ -85,25 +89,45 @@ namespace typon
} }
}; };
struct awaitable struct awaitable : std::suspend_always
{ {
std::coroutine_handle<promise_type> _coroutine; std::coroutine_handle<promise_type> _coroutine;
awaitable(std::coroutine_handle<promise_type> coroutine) noexcept template <typename Promise>
: _coroutine(coroutine) auto await_suspend(std::coroutine_handle<Promise> continuation) noexcept
{} {
Span * span = &(continuation.promise()._span);
_coroutine.promise()._span = span;
_coroutine.promise()._rank = (span->_thefts << 1);
std::coroutine_handle<> on_stack_handle = _coroutine;
Scheduler::push(span);
return on_stack_handle;
}
bool await_ready() noexcept auto await_resume()
{
auto thefts = _coroutine.promise()._span->_thefts;
auto rank = _coroutine.promise()._rank;
return Forked<T>(_coroutine, (thefts == (rank >> 1)), true);
}
};
auto operator co_await() &&
{ {
return false; return awaitable { {}, _coroutine };
} }
struct noloop_awaitable : std::suspend_always
{
std::coroutine_handle<promise_type> _coroutine;
template <typename Promise> template <typename Promise>
auto await_suspend(std::coroutine_handle<Promise> continuation) noexcept auto await_suspend(std::coroutine_handle<Promise> continuation) noexcept
{ {
Span * span = &(continuation.promise()._span); Span * span = &(continuation.promise()._span);
_coroutine.promise()._span = span; _coroutine.promise()._span = span;
_coroutine.promise()._rank = span->_thefts; _coroutine.promise()._rank = (span->_thefts << 1) + 1;
std::coroutine_handle<> on_stack_handle = _coroutine; std::coroutine_handle<> on_stack_handle = _coroutine;
Scheduler::push(span); Scheduler::push(span);
...@@ -114,13 +138,18 @@ namespace typon ...@@ -114,13 +138,18 @@ namespace typon
{ {
auto thefts = _coroutine.promise()._span->_thefts; auto thefts = _coroutine.promise()._span->_thefts;
auto rank = _coroutine.promise()._rank; auto rank = _coroutine.promise()._rank;
return Forked<T>(_coroutine, (thefts == rank)); bool ready = thefts == (rank >> 1);
if (!ready)
{
_coroutine.promise()._span->_children.push_back(_coroutine);
}
return Forked<T>(_coroutine, ready, false);
} }
}; };
auto operator co_await() && auto noloop() &&
{ {
return awaitable { _coroutine }; return noloop_awaitable { {}, _coroutine };
} }
}; };
......
...@@ -111,7 +111,7 @@ namespace typon ...@@ -111,7 +111,7 @@ namespace typon
Result<T> * _result = nullptr; Result<T> * _result = nullptr;
template <typename Promise> template <typename Promise>
Forked(std::coroutine_handle<Promise> coroutine, bool ready) Forked(std::coroutine_handle<Promise> coroutine, bool ready, bool owning)
{ {
if (ready) if (ready)
{ {
...@@ -120,7 +120,7 @@ namespace typon ...@@ -120,7 +120,7 @@ namespace typon
} }
else else
{ {
this->_node = &(coroutine.promise()._node); this->_node = owning ? &(coroutine.promise()._node) : nullptr;
_result = &(coroutine.promise()); _result = &(coroutine.promise());
} }
} }
...@@ -197,11 +197,11 @@ namespace typon ...@@ -197,11 +197,11 @@ namespace typon
ForkNode * _node; ForkNode * _node;
template <typename Promise> template <typename Promise>
Forked(std::coroutine_handle<Promise> coroutine, bool ready) Forked(std::coroutine_handle<Promise> coroutine, bool ready, bool owning)
{ {
_ready = ready; _ready = ready;
_result = &(coroutine.promise()); _result = &(coroutine.promise());
_node = &(coroutine.promise()._node); _node = (owning | ready) ? &(coroutine.promise()._node) : nullptr;
if (ready) if (ready)
{ {
if (auto & exception = coroutine.promise()._exception) if (auto & exception = coroutine.promise()._exception)
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include <cstdint> #include <cstdint>
#include <exception> #include <exception>
#include <limits> #include <limits>
#include <vector>
#include <typon/defer.hpp> #include <typon/defer.hpp>
#include <typon/theft_point.hpp> #include <typon/theft_point.hpp>
...@@ -31,6 +32,8 @@ namespace typon ...@@ -31,6 +32,8 @@ namespace typon
std::atomic<Error *> _error { nullptr }; std::atomic<Error *> _error { nullptr };
std::vector<std::coroutine_handle<>> _children;
std::atomic<u64> _n = UMAX; std::atomic<u64> _n = UMAX;
Span(std::coroutine_handle<> coroutine) noexcept Span(std::coroutine_handle<> coroutine) noexcept
...@@ -43,6 +46,16 @@ namespace typon ...@@ -43,6 +46,16 @@ namespace typon
{ {
delete error; delete error;
} }
clear_children();
}
void clear_children() noexcept
{
for (auto & child : _children)
{
child.destroy();
}
_children.clear();
} }
void check_exception() void check_exception()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment