Commit c24453b8 authored by Xavier Thompson's avatar Xavier Thompson

Move and split deque.hpp to fundamental

parent 31a41b44
#ifndef TYPON_DEQUE_HPP_INCLUDED #ifndef TYPON_FUNDAMENTAL_DEQUE_HPP_INCLUDED
#define TYPON_DEQUE_HPP_INCLUDED #define TYPON_FUNDAMENTAL_DEQUE_HPP_INCLUDED
#include <atomic> #include <atomic>
#include <cstdint> #include <cstdint>
...@@ -8,99 +8,36 @@ ...@@ -8,99 +8,36 @@
#include <utility> #include <utility>
#include <typon/fundamental/optional.hpp> #include <typon/fundamental/optional.hpp>
#include <typon/fundamental/ring_buffer.hpp>
#include <typon/fundamental/scope.hpp>
namespace typon namespace typon::fdt::lock_free
{ {
template <typename T> template <typename T>
requires std::is_trivially_copyable_v<T> struct deque
struct RingBuffer
{ {
using u8 = std::uint_least8_t; using array_type = ring_buffer<T>;
using u64 = std::uint_fast64_t; using pop_type = fdt::optional<T, 2>;
using enum std::memory_order; using u8 = typename ring_buffer<T>::u8;
using u64 = typename ring_buffer<T>::u64;
const u8 _bits;
const u64 _mask;
RingBuffer * const _next;
std::atomic<T> * const _array;
RingBuffer(u8 bits, RingBuffer * next = nullptr) noexcept
: _bits(bits)
, _mask(this->capacity() - 1)
, _next(next)
, _array(new std::atomic<T>[this->capacity()])
{}
~RingBuffer()
{
delete [] _array;
if (_next)
{
delete _next;
}
}
u64 capacity() noexcept
{
return u64(1) << _bits;
}
void put(u64 index, T object) noexcept
{
_array[index & _mask].store(std::move(object), relaxed);
}
T get(u64 index) noexcept
{
return _array[index & _mask].load(relaxed);
}
RingBuffer * fill(RingBuffer * sink, u64 start, u64 end) noexcept
{
for (u64 i = start; i < end; i++)
{
sink->put(i, get(i));
}
return sink;
}
RingBuffer * grow(u64 start, u64 end) noexcept static constexpr typename pop_type::template state<0> Empty {};
{ static constexpr typename pop_type::template state<1> Abort {};
return fill(new RingBuffer(_bits + 1, this), start, end);
}
RingBuffer * shrink(u64 start, u64 end) noexcept
{
return fill(std::exchange(_next, nullptr), start, end);
}
};
template <typename T>
struct Deque
{
using u8 = typename RingBuffer<T>::u8;
using u64 = typename RingBuffer<T>::u64;
using Array = RingBuffer<T>;
using Pop = fdt::optional<T, 2>;
static constexpr typename Pop::template state<0> Empty {};
static constexpr typename Pop::template state<1> Abort {};
using enum std::memory_order; using enum std::memory_order;
std::atomic<u64> _top {1}; std::atomic<u64> _top {1};
std::atomic<u64> _bottom {1}; std::atomic<u64> _bottom {1};
std::atomic<Array *> _array; std::atomic<array_type *> _array;
Deque(u8 bits = 2) noexcept deque(u8 bits = 2) noexcept
: _array(new Array(bits)) : _array(new array_type(bits))
{} {}
~Deque() ~deque()
{ {
delete _array; delete _array;
} }
...@@ -109,7 +46,7 @@ namespace typon ...@@ -109,7 +46,7 @@ namespace typon
{ {
u64 bottom = _bottom.load(relaxed); u64 bottom = _bottom.load(relaxed);
u64 top = _top.load(acquire); u64 top = _top.load(acquire);
Array * array = _array.load(relaxed); array_type * array = _array.load(relaxed);
if (bottom - top > array->capacity() - 1) if (bottom - top > array->capacity() - 1)
{ {
array = array->grow(top, bottom); array = array->grow(top, bottom);
...@@ -120,10 +57,10 @@ namespace typon ...@@ -120,10 +57,10 @@ namespace typon
_bottom.store(bottom + 1, relaxed); _bottom.store(bottom + 1, relaxed);
} }
Pop pop() noexcept pop_type pop() noexcept
{ {
u64 bottom = _bottom.load(relaxed) - 1; u64 bottom = _bottom.load(relaxed) - 1;
Array * array = _array.load(relaxed); array_type * array = _array.load(relaxed);
_bottom.store(bottom, relaxed); _bottom.store(bottom, relaxed);
std::atomic_thread_fence(seq_cst); std::atomic_thread_fence(seq_cst);
u64 top = _top.load(relaxed); u64 top = _top.load(relaxed);
...@@ -137,23 +74,23 @@ namespace typon ...@@ -137,23 +74,23 @@ namespace typon
{ {
return { x }; return { x };
} }
if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) bool win = _top.compare_exchange_strong(top, top + 1, seq_cst, relaxed);
{
_bottom.store(top + 1, relaxed);
return { Empty };
}
_bottom.store(top + 1, relaxed); _bottom.store(top + 1, relaxed);
if (win)
{
return { x }; return { x };
} }
return { Empty };
}
Pop steal() noexcept pop_type steal() noexcept
{ {
u64 top = _top.load(acquire); u64 top = _top.load(acquire);
std::atomic_thread_fence(seq_cst); std::atomic_thread_fence(seq_cst);
u64 bottom = _bottom.load(acquire); u64 bottom = _bottom.load(acquire);
if (top < bottom) if (top < bottom)
{ {
Array * array = _array.load(consume); array_type * array = _array.load(consume);
T x = array->get(top); T x = array->get(top);
if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed))
{ {
...@@ -168,4 +105,4 @@ namespace typon ...@@ -168,4 +105,4 @@ namespace typon
} }
#endif // TYPON_DEQUE_HPP_INCLUDED #endif // TYPON_FUNDAMENTAL_DEQUE_HPP_INCLUDED
#ifndef TYPON_FUNDAMENTAL_RINGBUFFER_HPP_INCLUDED
#define TYPON_FUNDAMENTAL_RINGBUFFER_HPP_INCLUDED
#include <atomic>
namespace typon::fdt::lock_free
{
template <typename T>
requires std::is_trivially_copyable_v<T>
struct ring_buffer
{
using u8 = std::uint_least8_t;
using u64 = std::uint_fast64_t;
using enum std::memory_order;
const u8 _bits;
const u64 _mask;
ring_buffer * const _next;
std::atomic<T> * const _array;
ring_buffer(u8 bits, ring_buffer * next = nullptr) noexcept
: _bits(bits)
, _mask(this->capacity() - 1)
, _next(next)
, _array(new std::atomic<T>[this->capacity()])
{}
~ring_buffer()
{
delete [] _array;
if (_next)
{
delete _next;
}
}
u64 capacity() noexcept
{
return u64(1) << _bits;
}
void put(u64 index, T object) noexcept
{
_array[index & _mask].store(std::move(object), relaxed);
}
T get(u64 index) noexcept
{
return _array[index & _mask].load(relaxed);
}
ring_buffer * fill(ring_buffer * sink, u64 start, u64 end) noexcept
{
for (u64 i = start; i < end; i++)
{
sink->put(i, get(i));
}
return sink;
}
ring_buffer * grow(u64 start, u64 end) noexcept
{
return fill(new ring_buffer(_bits + 1, this), start, end);
}
ring_buffer * shrink(u64 start, u64 end) noexcept
{
return fill(std::exchange(_next, nullptr), start, end);
}
};
}
#endif // TYPON_FUNDAMENTAL_RINGBUFFER_HPP_INCLUDED
...@@ -7,12 +7,12 @@ ...@@ -7,12 +7,12 @@
#include <thread> #include <thread>
#include <vector> #include <vector>
#include <typon/fundamental/deque.hpp>
#include <typon/fundamental/event_count.hpp> #include <typon/fundamental/event_count.hpp>
#include <typon/fundamental/optional.hpp> #include <typon/fundamental/optional.hpp>
#include <typon/fundamental/random.hpp> #include <typon/fundamental/random.hpp>
#include <typon/continuation.hpp> #include <typon/continuation.hpp>
#include <typon/deque.hpp>
namespace typon namespace typon
...@@ -21,8 +21,8 @@ namespace typon ...@@ -21,8 +21,8 @@ namespace typon
struct Scheduler struct Scheduler
{ {
using uint = unsigned int; using uint = unsigned int;
using Deque = Deque<continuation_handle>; using Deque = fdt::lock_free::deque<continuation_handle>;
using Task = typename Deque::Pop; using Task = typename Deque::pop_type;
static inline thread_local uint thread_id; static inline thread_local uint thread_id;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment