Commit 5b626121 authored by Xavier Thompson's avatar Xavier Thompson

deque.hpp: claim stolen slot before reading it

parent 2f66baef
...@@ -32,7 +32,7 @@ namespace typon ...@@ -32,7 +32,7 @@ namespace typon
std::vector<std::thread> _thread; std::vector<std::thread> _thread;
std::atomic_bool _done {false}; std::atomic_bool _done {false};
fdt::lock_free::event_count<> _notifyer; fdt::lock_free::event_count<> _notifyer;
const uint _parallelism; const uint _concurrency;
static Scheduler & get() noexcept static Scheduler & get() noexcept
{ {
...@@ -57,13 +57,13 @@ namespace typon ...@@ -57,13 +57,13 @@ namespace typon
return get()._deque[thread_id].pop(); return get()._deque[thread_id].pop();
} }
Scheduler(uint parallelism) noexcept Scheduler(uint concurrency) noexcept
: _deque(parallelism + 1) : _deque(concurrency + 1, Deque(concurrency + 1))
, _parallelism(parallelism) , _concurrency(concurrency)
{ {
thread_id = parallelism; thread_id = concurrency;
for (uint id = 0; id < parallelism; id++) for (uint id = 0; id < concurrency; id++)
{ {
_thread.emplace_back([this, id]() { _thread.emplace_back([this, id]() {
thread_id = id; thread_id = id;
...@@ -117,9 +117,9 @@ namespace typon ...@@ -117,9 +117,9 @@ namespace typon
void explore_task(Task & task) noexcept void explore_task(Task & task) noexcept
{ {
for (uint i = 0; i < _parallelism * 2 + 1; i++) for (uint i = 0; i < _concurrency * 2 + 1; i++)
{ {
uint id = fdt::random::random() % _parallelism; uint id = fdt::random::random() % _concurrency;
if (id == thread_id) if (id == thread_id)
{ {
task = _deque.back().steal(); task = _deque.back().steal();
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#define TYPON_FUNDAMENTAL_DEQUE_HPP_INCLUDED #define TYPON_FUNDAMENTAL_DEQUE_HPP_INCLUDED
#include <atomic> #include <atomic>
#include <bit>
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
#include <type_traits> #include <type_traits>
...@@ -32,9 +33,16 @@ namespace typon::fdt::lock_free ...@@ -32,9 +33,16 @@ namespace typon::fdt::lock_free
std::atomic<u64> _top {1}; std::atomic<u64> _top {1};
std::atomic<u64> _bottom {1}; std::atomic<u64> _bottom {1};
std::atomic<array_type *> _array; std::atomic<array_type *> _array;
const unsigned int _concurrency;
deque(u8 bits = 2) noexcept deque(unsigned int concurrency) noexcept
: _array(new array_type(bits)) : _array(new array_type(std::bit_width(concurrency) + 1))
, _concurrency(concurrency)
{}
deque(const deque & other) noexcept
: _array(new array_type(std::bit_width(other._concurrency) + 1))
, _concurrency(other._concurrency)
{} {}
~deque() ~deque()
...@@ -47,7 +55,7 @@ namespace typon::fdt::lock_free ...@@ -47,7 +55,7 @@ namespace typon::fdt::lock_free
u64 bottom = _bottom.load(relaxed); u64 bottom = _bottom.load(relaxed);
u64 top = _top.load(acquire); u64 top = _top.load(acquire);
array_type * array = _array.load(relaxed); array_type * array = _array.load(relaxed);
if (bottom - top > array->capacity() - 1) if (bottom - top > array->capacity() - _concurrency)
{ {
array = array->grow(top, bottom); array = array->grow(top, bottom);
_array.store(array); _array.store(array);
...@@ -92,12 +100,16 @@ namespace typon::fdt::lock_free ...@@ -92,12 +100,16 @@ namespace typon::fdt::lock_free
if (top < bottom) if (top < bottom)
{ {
array_type * array = _array.load(consume); array_type * array = _array.load(consume);
T x = array->get(top);
if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed))
{ {
return { Abort }; return { Abort };
} }
return { x }; // NB: The original algorithm reads the slot before claiming it.
// We read it after. To make sure it isn't overwritten, the push
// operation leaves at least as many slots free as there are
// potentially concurrent stealers.
// This also means reading non-atomically is not a data race.
return { array->get(top) };
} }
return { Empty }; return { Empty };
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment