Commit 549d499e authored by Xavier Thompson's avatar Xavier Thompson

Move and split deque.hpp to fundamental

parent c57ef80c
#ifndef TYPON_DEQUE_HPP_INCLUDED
#define TYPON_DEQUE_HPP_INCLUDED
#ifndef TYPON_FUNDAMENTAL_DEQUE_HPP_INCLUDED
#define TYPON_FUNDAMENTAL_DEQUE_HPP_INCLUDED
#include <atomic>
#include <cstdint>
......@@ -8,99 +8,36 @@
#include <utility>
#include <typon/fundamental/optional.hpp>
#include <typon/fundamental/ring_buffer.hpp>
#include <typon/fundamental/scope.hpp>
namespace typon
namespace typon::fdt::lock_free
{
template <typename T>
requires std::is_trivially_copyable_v<T>
struct RingBuffer
struct deque
{
using u8 = std::uint_least8_t;
using u64 = std::uint_fast64_t;
using array_type = ring_buffer<T>;
using pop_type = fdt::optional<T, 2>;
using enum std::memory_order;
const u8 _bits;
const u64 _mask;
RingBuffer * const _next;
std::atomic<T> * const _array;
RingBuffer(u8 bits, RingBuffer * next = nullptr) noexcept
: _bits(bits)
, _mask(this->capacity() - 1)
, _next(next)
, _array(new std::atomic<T>[this->capacity()])
{}
~RingBuffer()
{
delete [] _array;
if (_next)
{
delete _next;
}
}
u64 capacity() noexcept
{
return u64(1) << _bits;
}
void put(u64 index, T object) noexcept
{
_array[index & _mask].store(std::move(object), relaxed);
}
using u8 = typename ring_buffer<T>::u8;
using u64 = typename ring_buffer<T>::u64;
T get(u64 index) noexcept
{
return _array[index & _mask].load(relaxed);
}
RingBuffer * fill(RingBuffer * sink, u64 start, u64 end) noexcept
{
for (u64 i = start; i < end; i++)
{
sink->put(i, get(i));
}
return sink;
}
RingBuffer * grow(u64 start, u64 end) noexcept
{
return fill(new RingBuffer(_bits + 1, this), start, end);
}
RingBuffer * shrink(u64 start, u64 end) noexcept
{
return fill(std::exchange(_next, nullptr), start, end);
}
};
template <typename T>
struct Deque
{
using u8 = typename RingBuffer<T>::u8;
using u64 = typename RingBuffer<T>::u64;
using Array = RingBuffer<T>;
using Pop = fdt::optional<T, 2>;
static constexpr typename Pop::template state<0> Empty {};
static constexpr typename Pop::template state<1> Abort {};
static constexpr typename pop_type::template state<0> Empty {};
static constexpr typename pop_type::template state<1> Abort {};
using enum std::memory_order;
std::atomic<u64> _top {1};
std::atomic<u64> _bottom {1};
std::atomic<Array *> _array;
std::atomic<array_type *> _array;
Deque(u8 bits = 2) noexcept
: _array(new Array(bits))
deque(u8 bits = 2) noexcept
: _array(new array_type(bits))
{}
~Deque()
~deque()
{
delete _array;
}
......@@ -109,7 +46,7 @@ namespace typon
{
u64 bottom = _bottom.load(relaxed);
u64 top = _top.load(acquire);
Array * array = _array.load(relaxed);
array_type * array = _array.load(relaxed);
if (bottom - top > array->capacity() - 1)
{
array = array->grow(top, bottom);
......@@ -120,10 +57,10 @@ namespace typon
_bottom.store(bottom + 1, relaxed);
}
Pop pop() noexcept
pop_type pop() noexcept
{
u64 bottom = _bottom.load(relaxed) - 1;
Array * array = _array.load(relaxed);
array_type * array = _array.load(relaxed);
_bottom.store(bottom, relaxed);
std::atomic_thread_fence(seq_cst);
u64 top = _top.load(relaxed);
......@@ -137,23 +74,23 @@ namespace typon
{
return { x };
}
if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed))
bool win = _top.compare_exchange_strong(top, top + 1, seq_cst, relaxed);
_bottom.store(top + 1, relaxed);
if (win)
{
_bottom.store(top + 1, relaxed);
return { Empty };
return { x };
}
_bottom.store(top + 1, relaxed);
return { x };
return { Empty };
}
Pop steal() noexcept
pop_type steal() noexcept
{
u64 top = _top.load(acquire);
std::atomic_thread_fence(seq_cst);
u64 bottom = _bottom.load(acquire);
if (top < bottom)
{
Array * array = _array.load(consume);
array_type * array = _array.load(consume);
T x = array->get(top);
if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed))
{
......@@ -168,4 +105,4 @@ namespace typon
}
#endif // TYPON_DEQUE_HPP_INCLUDED
#endif // TYPON_FUNDAMENTAL_DEQUE_HPP_INCLUDED
#ifndef TYPON_FUNDAMENTAL_RINGBUFFER_HPP_INCLUDED
#define TYPON_FUNDAMENTAL_RINGBUFFER_HPP_INCLUDED
#include <atomic>
namespace typon::fdt::lock_free
{
template <typename T>
requires std::is_trivially_copyable_v<T>
struct ring_buffer
{
using u8 = std::uint_least8_t;
using u64 = std::uint_fast64_t;
using enum std::memory_order;
const u8 _bits;
const u64 _mask;
ring_buffer * const _next;
std::atomic<T> * const _array;
ring_buffer(u8 bits, ring_buffer * next = nullptr) noexcept
: _bits(bits)
, _mask(this->capacity() - 1)
, _next(next)
, _array(new std::atomic<T>[this->capacity()])
{}
~ring_buffer()
{
delete [] _array;
if (_next)
{
delete _next;
}
}
u64 capacity() noexcept
{
return u64(1) << _bits;
}
void put(u64 index, T object) noexcept
{
_array[index & _mask].store(std::move(object), relaxed);
}
T get(u64 index) noexcept
{
return _array[index & _mask].load(relaxed);
}
ring_buffer * fill(ring_buffer * sink, u64 start, u64 end) noexcept
{
for (u64 i = start; i < end; i++)
{
sink->put(i, get(i));
}
return sink;
}
ring_buffer * grow(u64 start, u64 end) noexcept
{
return fill(new ring_buffer(_bits + 1, this), start, end);
}
ring_buffer * shrink(u64 start, u64 end) noexcept
{
return fill(std::exchange(_next, nullptr), start, end);
}
};
}
#endif // TYPON_FUNDAMENTAL_RINGBUFFER_HPP_INCLUDED
......@@ -7,12 +7,12 @@
#include <thread>
#include <vector>
#include <typon/fundamental/deque.hpp>
#include <typon/fundamental/event_count.hpp>
#include <typon/fundamental/optional.hpp>
#include <typon/fundamental/random.hpp>
#include <typon/continuation.hpp>
#include <typon/deque.hpp>
namespace typon
......@@ -21,8 +21,8 @@ namespace typon
struct Scheduler
{
using uint = unsigned int;
using Deque = Deque<continuation_handle>;
using Task = typename Deque::Pop;
using Deque = fdt::lock_free::deque<continuation_handle>;
using Task = typename Deque::pop_type;
static inline thread_local uint thread_id;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment