Commit 4e1d58ab authored by Xavier Thompson's avatar Xavier Thompson

deque.hpp: Use locking implementation

parent 4d2a0099
...@@ -4,13 +4,12 @@ ...@@ -4,13 +4,12 @@
#include <atomic> #include <atomic>
#include <cstdint> #include <cstdint>
#include <exception> #include <exception>
#include <optional>
#include <thread> #include <thread>
#include <vector> #include <vector>
#include <typon/fundamental/deque.hpp> #include <typon/fundamental/deque.hpp>
#include <typon/fundamental/event_count.hpp> #include <typon/fundamental/event_count.hpp>
#include <typon/fundamental/gc.hpp>
#include <typon/fundamental/optional.hpp>
#include <typon/fundamental/random.hpp> #include <typon/fundamental/random.hpp>
#include <typon/core/continuation.hpp> #include <typon/core/continuation.hpp>
...@@ -22,9 +21,8 @@ namespace typon ...@@ -22,9 +21,8 @@ namespace typon
struct Scheduler struct Scheduler
{ {
using uint = unsigned int; using uint = unsigned int;
using Deque = fdt::lock_free::deque<Continuation>; using Deque = fdt::deque<Continuation>;
using Task = typename Deque::pop_type; using Task = std::optional<Continuation>;
using GC = fdt::lock_free::gc;
static inline thread_local uint thread_id; static inline thread_local uint thread_id;
...@@ -48,17 +46,7 @@ namespace typon ...@@ -48,17 +46,7 @@ namespace typon
static Task pop() noexcept static Task pop() noexcept
{ {
Scheduler & scheduler = get(); return get()._deque[thread_id].pop();
Deque & deque = scheduler._deque[thread_id];
Task task = deque.pop();
if (task.match(Deque::Prune))
{
if (auto array = deque.shrink())
{
scheduler._gc.retire(array);
}
}
return task;
} }
std::atomic<uint> _actives = 0; std::atomic<uint> _actives = 0;
...@@ -68,12 +56,10 @@ namespace typon ...@@ -68,12 +56,10 @@ namespace typon
std::atomic_bool _done {false}; std::atomic_bool _done {false};
fdt::lock_free::event_count<> _notifyer; fdt::lock_free::event_count<> _notifyer;
const uint _concurrency; const uint _concurrency;
GC _gc;
Scheduler(uint concurrency) noexcept Scheduler(uint concurrency) noexcept
: _deque(concurrency + 1) : _deque(concurrency + 1)
, _concurrency(concurrency) , _concurrency(concurrency)
, _gc(concurrency)
{ {
thread_id = concurrency; thread_id = concurrency;
...@@ -131,7 +117,6 @@ namespace typon ...@@ -131,7 +117,6 @@ namespace typon
void explore_task(Task & task) noexcept void explore_task(Task & task) noexcept
{ {
_gc.enter(thread_id);
for (uint i = 0; i < _concurrency * 2 + 1; i++) for (uint i = 0; i < _concurrency * 2 + 1; i++)
{ {
uint id = fdt::random::random() % _concurrency; uint id = fdt::random::random() % _concurrency;
...@@ -149,55 +134,51 @@ namespace typon ...@@ -149,55 +134,51 @@ namespace typon
break; break;
} }
} }
_gc.leave(thread_id);
} }
bool wait_for_task(Task & task) noexcept bool wait_for_task(Task & task) noexcept
{ {
wait_for_task: while (true)
_thieves.fetch_add(1);
explore_task:
explore_task(task);
if (task)
{ {
if (_thieves.fetch_sub(1) == 1) _thieves.fetch_add(1);
explore_task(task);
if (task)
{ {
_notifyer.notify_one(); if (_thieves.fetch_sub(1) == 1)
{
_notifyer.notify_one();
}
return true;
} }
return true; auto key = _notifyer.prepare_wait();
} task = _deque.back().steal();
auto key = _notifyer.prepare_wait();
task = _deque.back().steal();
if (task || task.match(Deque::Abort))
{
_notifyer.cancel_wait();
if (task) if (task)
{ {
_notifyer.cancel_wait();
if (_thieves.fetch_sub(1) == 1) if (_thieves.fetch_sub(1) == 1)
{ {
_notifyer.notify_one(); _notifyer.notify_one();
} }
return true; return true;
} }
goto explore_task; if (_done.load())
}
if (_done.load())
{
_notifyer.cancel_wait();
_notifyer.notify_all();
_thieves.fetch_sub(1);
return false;
}
if (_thieves.fetch_sub(1) == 1)
{
if (_actives.load() > 0)
{ {
_notifyer.cancel_wait(); _notifyer.cancel_wait();
goto wait_for_task; _notifyer.notify_all();
_thieves.fetch_sub(1);
return false;
} }
if (_thieves.fetch_sub(1) == 1)
{
if (_actives.load() > 0)
{
_notifyer.cancel_wait();
continue;
}
}
_notifyer.wait(key);
return true;
} }
_notifyer.wait(key);
return true;
} }
}; };
......
#ifndef TYPON_FUNDAMENTAL_DEQUE_HPP_INCLUDED #ifndef TYPON_FUNDAMENTAL_DEQUE_HPP_INCLUDED
#define TYPON_FUNDAMENTAL_DEQUE_HPP_INCLUDED #define TYPON_FUNDAMENTAL_DEQUE_HPP_INCLUDED
#include <atomic>
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
#include <mutex>
#include <optional>
#include <type_traits> #include <type_traits>
#include <utility> #include <utility>
#include <typon/fundamental/optional.hpp>
#include <typon/fundamental/ring_buffer.hpp> #include <typon/fundamental/ring_buffer.hpp>
namespace typon::fdt::lock_free namespace typon::fdt
{ {
template <typename T> template <typename T>
struct deque struct deque
{ {
using array_type = ring_buffer<T>; using array_type = ring_buffer<T>;
using pop_type = fdt::optional<T>;
using u8 = typename ring_buffer<T>::u8; using u8 = typename ring_buffer<T>::u8;
using u64 = typename ring_buffer<T>::u64; using u64 = typename ring_buffer<T>::u64;
static constexpr typename pop_type::template state<1> Abort {}; u64 _top {0};
static constexpr typename pop_type::template state<2> Prune {}; u64 _bottom {0};
array_type * _array;
using enum std::memory_order; std::mutex _mutex;
const u8 _bits;
std::atomic<u64> _top {1};
std::atomic<u64> _bottom {1};
std::atomic<array_type *> _array;
deque(u8 bits = 3) noexcept deque(u8 bits = 3) noexcept
: _bits(bits) : _array(new array_type(bits))
, _array(new array_type(bits))
{} {}
~deque() ~deque()
{ {
delete _array.load(relaxed); delete _array;
} }
void push(T x) noexcept void push(T x) noexcept
{ {
u64 bottom = _bottom.load(relaxed); std::lock_guard<std::mutex> lock(_mutex);
u64 top = _top.load(acquire); if (_bottom - _top > _array->capacity() - 1)
array_type * array = _array.load(relaxed);
if (bottom - top > array->capacity() - 1)
{ {
array = array->grow(top, bottom); _array = _array->grow(_top, _bottom);
_array.store(array);
} }
array->put(bottom, x); _array->put(_bottom, x);
std::atomic_thread_fence(release); _bottom++;
_bottom.store(bottom + 1, relaxed);
} }
pop_type pop() noexcept std::optional<T> pop() noexcept
{ {
u64 bottom = _bottom.load(relaxed) - 1; std::lock_guard<std::mutex> lock(_mutex);
array_type * array = _array.load(relaxed); std::optional<T> x {};
_bottom.store(bottom, relaxed); if (_top < _bottom)
std::atomic_thread_fence(seq_cst);
u64 top = _top.load(relaxed);
pop_type x {};
if (top <= bottom)
{
x = array->get(bottom);
if (top == bottom)
{
if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed))
{
x = {};
}
_bottom.store(bottom + 1, relaxed);
}
}
else
{ {
_bottom.store(bottom + 1, relaxed); _bottom--;
x = _array->get(_bottom);
} }
u64 capacity = array->capacity(); if (_array->capacity() > (_bottom - _top) * 4)
if (capacity > u64(1) << _bits && capacity > (bottom - top) * 4)
{ {
x.set_state(Prune); if (auto next = _array->shrink(_top, _bottom))
{
delete _array;
_array = next;
}
} }
return x; return x;
} }
array_type * shrink() noexcept std::optional<T> steal() noexcept
{ {
u64 bottom = _bottom.load(relaxed); std::lock_guard<std::mutex> lock(_mutex);
array_type * array = _array.load(relaxed); std::optional<T> x {};
u64 top = _top.load(relaxed); if (_top < _bottom)
array_type * next = array->shrink(top, bottom);
if (next)
{ {
_array.store(next, relaxed); x = _array->get(_top);
return array; _top++;
} }
return nullptr; return x;
}
pop_type steal() noexcept
{
u64 top = _top.load(acquire);
std::atomic_thread_fence(seq_cst);
u64 bottom = _bottom.load(acquire);
if (top < bottom)
{
array_type * array = _array.load(consume);
T x = array->get(top);
if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed))
{
return { Abort };
}
return { x };
}
return {};
} }
}; };
......
#ifndef TYPON_FUNDAMENTAL_GC_HPP_INCLUDED
#define TYPON_FUNDAMENTAL_GC_HPP_INCLUDED
#include <atomic>
#include <bit>
#include <cstdint>
#include <deque>
#include <type_traits>
#include <typon/fundamental/meta.hpp>
#include <typon/fundamental/ring_buffer.hpp>
namespace typon::fdt::lock_free
{
struct gc
{
using u64 = std::uint_fast64_t;
using uint = unsigned int;
using enum std::memory_order;
struct node
{
const u64 _stamp;
std::atomic<node *> _next {nullptr};
node(u64 stamp) noexcept : _stamp(stamp) {};
virtual ~node() {}
};
template <typename T>
struct garbage : node
{
T * _ptr;
garbage(T * ptr, u64 stamp) noexcept
: node(stamp)
, _ptr(ptr)
{}
virtual ~garbage()
{
delete _ptr;
}
};
const uint _concurrency;
const uint _bits;
std::atomic<u64> * const _stamps;
std::atomic<u64> _state {0};
std::atomic<node *> _head;
std::atomic<node *> _tail;
gc(uint concurrency) noexcept
: _concurrency(concurrency)
, _bits(std::bit_width(concurrency))
, _stamps(new std::atomic<u64>[concurrency])
{
auto first = new node(0);
_head.store(first);
_tail.store(first);
for (uint i = 0; i < _concurrency; i++)
{
_stamps[i].store(u64(-1));
}
}
void enter(uint id) noexcept
{
auto state = _state.fetch_add((1 << _bits) + 1);
_stamps[id].store(state >> _bits);
}
template <typename T>
void retire(T * ptr) noexcept
{
auto state = _state.load();
if ((state & ((1 << _bits) - 1)) == 0)
{
delete ptr;
}
else
{
auto node = new garbage<T> { ptr, state >> _bits };
auto head = _head.exchange(node);
head->_next.store(node);
}
}
void leave(uint id) noexcept
{
_state.fetch_sub(1);
_stamps[id].store(u64(-1));
if (_tail.load())
{
reclaim(oldest());
}
}
u64 oldest() noexcept
{
u64 oldest = u64(-1);
for (uint i = 0; i < _concurrency; i++)
{
u64 stamp = _stamps[i].load(relaxed);
if (stamp < oldest)
{
oldest = stamp;
}
}
return oldest;
}
void reclaim(u64 oldest) noexcept
{
auto tail = _tail.load();
while (true)
{
if (tail->_stamp >= oldest)
{
break;
}
auto next = tail->_next.load();
if (!next)
{
break;
}
if (_tail.compare_exchange_strong(tail, next))
{
delete tail;
tail = next;
}
}
}
~gc()
{
delete[] _stamps;
auto tail = _tail.load();
while (tail)
{
auto next = tail->_next.load();
delete tail;
tail = next;
}
}
};
}
#endif // TYPON_FUNDAMENTAL_GC_HPP_INCLUDED
#ifndef TYPON_FUNDAMENTAL_OPTIONAL_HPP_INCLUDED
#define TYPON_FUNDAMENTAL_OPTIONAL_HPP_INCLUDED
#include <type_traits>
namespace typon::fdt
{
template <typename T>
requires std::is_trivially_copyable_v<T>
struct optional
{
template <unsigned char I>
using state = std::integral_constant<unsigned char, I>;
unsigned char _state;
union
{
T _value;
};
optional() noexcept : _state(0) {}
template <unsigned char I>
optional(state<I> state) noexcept : _state(state() << 1 & (~1)) {}
optional(T value) noexcept : _state(1), _value(value) {}
template <unsigned char I>
optional(T value, state<I> state) noexcept
: _state((state() << 1) | 1)
, _value(value)
{}
~optional()
{
if (_state & 1)
{
std::destroy_at(std::addressof(_value));
}
}
operator bool() noexcept
{
return _state & 1;
}
template <unsigned char I>
bool match(state<I> state) noexcept
{
return state() == _state >> 1;
}
template <unsigned char I>
void set_state(state<I> state) noexcept
{
_state = (state() << 1) | (_state & 1);
}
T * operator->() noexcept
{
return std::addressof(_value);
}
T & operator*() noexcept
{
return _value;
}
};
}
#endif // TYPON_FUNDAMENTAL_OPTIONAL_HPP_INCLUDED
#ifndef TYPON_FUNDAMENTAL_RINGBUFFER_HPP_INCLUDED #ifndef TYPON_FUNDAMENTAL_RINGBUFFER_HPP_INCLUDED
#define TYPON_FUNDAMENTAL_RINGBUFFER_HPP_INCLUDED #define TYPON_FUNDAMENTAL_RINGBUFFER_HPP_INCLUDED
#include <atomic>
#include <bit> #include <bit>
#include <cstdint> #include <cstdint>
#include <type_traits> #include <type_traits>
namespace typon::fdt::lock_free namespace typon::fdt
{ {
template <typename T> template <typename T>
requires std::is_trivially_copyable_v<T>
struct ring_buffer struct ring_buffer
{ {
using u8 = std::uint_least8_t; using u8 = std::uint_least8_t;
using u64 = std::uint_fast64_t; using u64 = std::uint_fast64_t;
using enum std::memory_order;
const u64 _mask; const u64 _mask;
ring_buffer * _next; ring_buffer * _next;
std::atomic<T> * const _array; T * const _array;
ring_buffer(u8 bits, ring_buffer * next = nullptr) noexcept ring_buffer(u8 bits, ring_buffer * next = nullptr) noexcept
: _mask((u64(1) << bits) - 1) : _mask((u64(1) << bits) - 1)
, _next(next) , _next(next)
, _array(new std::atomic<T>[this->capacity()]) , _array(new T[this->capacity()])
{} {}
~ring_buffer() ~ring_buffer()
...@@ -45,12 +41,12 @@ namespace typon::fdt::lock_free ...@@ -45,12 +41,12 @@ namespace typon::fdt::lock_free
void put(u64 index, T object) noexcept void put(u64 index, T object) noexcept
{ {
_array[index & _mask].store(std::move(object), relaxed); _array[index & _mask] = std::move(object);
} }
T get(u64 index) noexcept T get(u64 index) noexcept
{ {
return _array[index & _mask].load(relaxed); return _array[index & _mask];
} }
ring_buffer * fill(ring_buffer * sink, u64 start, u64 end) noexcept ring_buffer * fill(ring_buffer * sink, u64 start, u64 end) noexcept
...@@ -84,7 +80,6 @@ namespace typon::fdt::lock_free ...@@ -84,7 +80,6 @@ namespace typon::fdt::lock_free
return nullptr; return nullptr;
} }
return fill(std::exchange(last->_next, nullptr), start, end); return fill(std::exchange(last->_next, nullptr), start, end);
} }
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment