Commit d0a9f6aa authored by Xavier Thompson's avatar Xavier Thompson

WIP: Add gc.hpp

parent 61a969e2
......@@ -9,6 +9,7 @@
#include <typon/fundamental/deque.hpp>
#include <typon/fundamental/event_count.hpp>
#include <typon/fundamental/gc.hpp>
#include <typon/fundamental/optional.hpp>
#include <typon/fundamental/random.hpp>
......@@ -23,17 +24,10 @@ namespace typon
using uint = unsigned int;
using Deque = fdt::lock_free::deque<Continuation>;
using Task = typename Deque::pop_type;
using GC = fdt::lock_free::gc;
static inline thread_local uint thread_id;
std::atomic<uint> _actives = 0;
std::atomic<uint> _thieves = 0;
std::vector<Deque> _deque;
std::vector<std::thread> _thread;
std::atomic_bool _done {false};
fdt::lock_free::event_count<> _notifyer;
const uint _concurrency;
static Scheduler & get() noexcept
{
static Scheduler scheduler {std::thread::hardware_concurrency()};
......@@ -42,24 +36,51 @@ namespace typon
static void push(Continuation task) noexcept
{
get()._deque[thread_id].push(task);
Scheduler & scheduler = get();
if (auto garbage = scheduler._deque[thread_id].push(task))
{
scheduler._gc.retire(garbage);
}
}
static void schedule(Continuation task) noexcept
{
Scheduler & scheduler = get();
scheduler._deque[thread_id].push(task);
Scheduler & scheduler = get();
if (auto garbage = scheduler._deque[thread_id].push(task))
{
scheduler._gc.retire(garbage);
}
scheduler._notifyer.notify_one();
}
static Task pop() noexcept
{
return get()._deque[thread_id].pop();
Scheduler & scheduler = get();
Deque & deque = scheduler._deque[thread_id];
Task task = deque.pop();
if (task.match(Deque::Compress) || task.match(Deque::Prune))
{
if (auto array = deque.shrink())
{
scheduler._gc.retire(array);
}
}
return task;
}
std::atomic<uint> _actives = 0;
std::atomic<uint> _thieves = 0;
std::vector<Deque> _deque;
std::vector<std::thread> _thread;
std::atomic_bool _done {false};
fdt::lock_free::event_count<> _notifyer;
const uint _concurrency;
GC _gc;
Scheduler(uint concurrency) noexcept
: _deque(concurrency + 1)
, _concurrency(concurrency)
, _gc(concurrency)
{
thread_id = concurrency;
......@@ -117,6 +138,7 @@ namespace typon
void explore_task(Task & task) noexcept
{
_gc.enter(thread_id);
for (uint i = 0; i < _concurrency * 2 + 1; i++)
{
uint id = fdt::random::random() % _concurrency;
......@@ -134,6 +156,7 @@ namespace typon
break;
}
}
_gc.leave(thread_id);
}
bool wait_for_task(Task & task) noexcept
......
......@@ -19,13 +19,15 @@ namespace typon::fdt::lock_free
struct deque
{
using array_type = ring_buffer<T>;
using pop_type = fdt::optional<T, 2>;
using pop_type = fdt::optional<T>;
using u8 = typename ring_buffer<T>::u8;
using u64 = typename ring_buffer<T>::u64;
static constexpr typename pop_type::template state<0> Empty {};
static constexpr typename pop_type::template state<1> Abort {};
static constexpr typename pop_type::template state<2> Abort {};
static constexpr typename pop_type::template state<4> Compress {};
static constexpr typename pop_type::template state<3> Prune {};
using enum std::memory_order;
......@@ -42,19 +44,22 @@ namespace typon::fdt::lock_free
delete _array.load(relaxed);
}
void push(T x) noexcept
auto push(T x) noexcept
{
u64 bottom = _bottom.load(relaxed);
u64 top = _top.load(acquire);
array_type * array = _array.load(relaxed);
array_type * garbage = nullptr;
if (bottom - top > array->capacity() - 1)
{
garbage = array;
array = array->grow(top, bottom);
_array.store(array);
}
array->put(bottom, x);
std::atomic_thread_fence(release);
_bottom.store(bottom + 1, relaxed);
return garbage;
}
pop_type pop() noexcept
......@@ -64,7 +69,16 @@ namespace typon::fdt::lock_free
_bottom.store(bottom, relaxed);
std::atomic_thread_fence(seq_cst);
u64 top = _top.load(relaxed);
pop_type x { Empty };
u64 capacity = array->capacity();
pop_type x;
if (capacity > 16)
{
x = { Compress };
}
else
{
x = { Empty };
}
if (top <= bottom)
{
x = array->get(bottom);
......@@ -72,10 +86,24 @@ namespace typon::fdt::lock_free
{
if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed))
{
x = { Empty };
if (capacity > 16)
{
x = { Compress };
}
else
{
x = { Empty };
}
}
_bottom.store(bottom + 1, relaxed);
}
if (capacity > 16 && bottom - top > capacity / 4)
{
if (x)
{
x = { *x, Prune };
}
}
}
else
{
......@@ -84,6 +112,20 @@ namespace typon::fdt::lock_free
return x;
}
array_type * shrink() noexcept
{
u64 bottom = _bottom.load(relaxed);
array_type * array = _array.load(relaxed);
u64 top = _top.load(relaxed);
array_type * next = array->shrink(top, bottom);
if (next)
{
_array.store(next, relaxed);
return array;
}
return nullptr;
}
pop_type steal() noexcept
{
u64 top = _top.load(acquire);
......
#ifndef TYPON_FUNDAMENTAL_GC_HPP_INCLUDED
#define TYPON_FUNDAMENTAL_GC_HPP_INCLUDED
#include <atomic>
#include <bit>
#include <cstdint>
#include <deque>
#include <type_traits>
#include <typon/fundamental/meta.hpp>
#include <typon/fundamental/ring_buffer.hpp>
namespace typon::fdt::lock_free
{
struct gc
{
using u64 = std::uint_fast64_t;
using uint = unsigned int;
using enum std::memory_order;
struct node
{
const u64 _stamp;
std::atomic<node *> _next {nullptr};
node(u64 stamp) noexcept : _stamp(stamp) {};
virtual ~node() {}
};
template <typename T>
struct garbage : node
{
T * _ptr;
garbage(T * ptr, u64 stamp) noexcept
: node(stamp)
, _ptr(ptr)
{}
virtual ~garbage()
{
delete _ptr;
}
};
const uint _concurrency;
std::atomic<u64> * const _stamps;
std::atomic<u64> _stamp {0};
std::atomic<node *> _head {nullptr};
std::atomic<node *> _tail {nullptr};
gc(uint concurrency) noexcept
: _concurrency(concurrency)
, _stamps(new std::atomic<u64>[concurrency])
{
for (uint i = 0; i < _concurrency; i++)
{
_stamps[i].store(u64(-1));
}
}
void enter(uint id) noexcept
{
_stamps[id].store(_stamp.fetch_add(1));
}
template <typename T>
void retire(T * ptr) noexcept
{
auto stamp = _stamp.load();
auto node = new garbage<T> { ptr, stamp };
auto head = _head.exchange(node);
if (head)
{
head->_next.store(node);
}
else
{
_tail.store(node);
}
}
void leave(uint id) noexcept
{
_stamps[id].store(u64(-1));
if (_tail.load())
{
reclaim(oldest());
}
}
u64 oldest() noexcept
{
u64 oldest = u64(-1);
for (uint i = 0; i < _concurrency; i++)
{
u64 stamp = _stamps[i].load(relaxed);
if (stamp < oldest)
{
oldest = stamp;
}
}
return oldest;
}
void reclaim(u64 oldest) noexcept
{
while (auto tail = _tail.load())
{
if (tail->_stamp >= oldest || !tail->_next)
{
break;
}
if (_tail.compare_exchange_strong(tail, tail->_next))
{
delete tail;
}
}
}
};
}
#endif // TYPON_FUNDAMENTAL_GC_HPP_INCLUDED
......@@ -12,6 +12,9 @@ namespace typon::fdt::meta
using type = T<Args...>;
};
template <typename... T>
static constexpr bool always_false_v { false };
}
......
......@@ -7,16 +7,19 @@
namespace typon::fdt
{
template <typename T, unsigned char N>
template <typename T>
requires std::is_trivially_copyable_v<T>
struct optional
{
static_assert(N > 0, "N must be greater than 0");
template <unsigned char I>
requires (I <= N)
using state = std::integral_constant<unsigned char, I>;
template <unsigned char I>
using empty_state = state<2 * I>;
template <unsigned char I>
using engaged_state = state<2 * I + 1>;
unsigned char _state;
union
{
......@@ -26,14 +29,18 @@ namespace typon::fdt
optional() noexcept : _state(0) {}
template <unsigned char I>
requires (I < N)
requires (!(I & 1))
optional(state<I> state) noexcept : _state(state) {}
optional(T value) noexcept : _state(N), _value(value) {}
optional(T value) noexcept : _state(1), _value(value) {}
template <unsigned char I>
requires (bool(I & 1))
optional(T value, state<I> state) noexcept : _state(state), _value(value) {}
~optional()
{
if (_state == N)
if (_state & 1)
{
std::destroy_at(std::addressof(_value));
}
......@@ -41,7 +48,7 @@ namespace typon::fdt
operator bool() noexcept
{
return _state == N;
return _state & 1;
}
template <unsigned char I>
......
......@@ -19,22 +19,16 @@ namespace typon::fdt::lock_free
using enum std::memory_order;
const u64 _mask;
ring_buffer * _next;
std::atomic<T> * const _array;
ring_buffer(u8 bits, ring_buffer * next = nullptr) noexcept
ring_buffer(u8 bits) noexcept
: _mask((u64(1) << bits) - 1)
, _next(next)
, _array(new std::atomic<T>[this->capacity()])
{}
~ring_buffer()
{
delete [] _array;
if (_next)
{
delete _next;
}
}
u64 capacity() noexcept
......@@ -63,13 +57,19 @@ namespace typon::fdt::lock_free
ring_buffer * grow(u64 start, u64 end) noexcept
{
auto buffer = new ring_buffer(std::countr_one(_mask) + 1, this);
auto buffer = new ring_buffer(std::countr_one(_mask) + 1);
return fill(buffer, start, end);
}
ring_buffer * shrink(u64 start, u64 end) noexcept
{
return fill(std::exchange(_next, nullptr), start, end);
return nullptr;
u8 bits = std::countr_one(_mask);
if (bits < 3)
{
return nullptr;
}
return fill(new ring_buffer(bits - 1), start, end);
}
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment