Commit 76e79c92 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

refs #5351 fixes to circular buffer

git-svn-id: file:///svn/toku/tokudb@48718 c7de825b-a66e-492c-adef-691d508d4ae1
parent 9ae824cc
......@@ -9,20 +9,26 @@ namespace toku {
template<typename T>
void circular_buffer<T>::init(T * const array, size_t cap) {
invariant_notnull(array);
toku_mutex_init(&m_lock, nullptr);
toku_cond_init(&m_cond, nullptr);
m_array = array;
m_cap = cap;
m_begin = 0;
m_limit = 0;
toku_mutex_init(&m_lock, nullptr);
toku_cond_init(&m_push_cond, nullptr);
toku_cond_init(&m_pop_cond, nullptr);
m_push_waiters = 0;
m_pop_waiters = 0;
}
template<typename T>
void circular_buffer<T>::deinit(void) {
lock();
invariant(this->is_empty());
invariant(is_empty());
invariant_zero(m_push_waiters);
invariant_zero(m_pop_waiters);
unlock();
toku_cond_destroy(&m_cond);
toku_cond_destroy(&m_pop_cond);
toku_cond_destroy(&m_push_cond);
toku_mutex_destroy(&m_lock);
}
......@@ -70,11 +76,10 @@ namespace toku {
void circular_buffer<T>::push_and_maybe_signal_unlocked(const T &elt) {
toku_mutex_assert_locked(&m_lock);
invariant(!is_full());
bool will_signal = is_empty();
size_t location = m_limit++;
*get_addr(location) = elt;
if (will_signal) {
toku_cond_signal(&m_cond);
if (m_pop_waiters > 0) {
toku_cond_signal(&m_pop_cond);
}
}
......@@ -82,7 +87,9 @@ namespace toku {
void circular_buffer<T>::push(const T &elt) {
lock();
while (is_full()) {
toku_cond_wait(&m_cond, &m_lock);
++m_push_waiters;
toku_cond_wait(&m_push_cond, &m_lock);
--m_push_waiters;
}
push_and_maybe_signal_unlocked(elt);
unlock();
......@@ -92,7 +99,7 @@ namespace toku {
bool circular_buffer<T>::trypush(const T &elt) {
bool pushed = false;
lock();
if (!is_full()) {
if (!is_full() && m_push_waiters == 0) {
push_and_maybe_signal_unlocked(elt);
pushed = true;
}
......@@ -104,11 +111,10 @@ namespace toku {
T circular_buffer<T>::pop_and_maybe_signal_unlocked(void) {
toku_mutex_assert_locked(&m_lock);
invariant(!is_empty());
bool will_signal = is_full();
T ret = *get_addr(m_begin);
++m_begin;
if (will_signal) {
toku_cond_signal(&m_cond);
if (m_push_waiters > 0) {
toku_cond_signal(&m_push_cond);
}
return ret;
}
......@@ -117,7 +123,9 @@ namespace toku {
T circular_buffer<T>::pop(void) {
lock();
while (is_empty()) {
toku_cond_wait(&m_cond, &m_lock);
++m_pop_waiters;
toku_cond_wait(&m_pop_cond, &m_lock);
--m_pop_waiters;
}
T ret = pop_and_maybe_signal_unlocked();
unlock();
......@@ -129,7 +137,7 @@ namespace toku {
bool popped = false;
invariant_notnull(eltp);
lock();
if (!is_empty()) {
if (!is_empty() && m_pop_waiters == 0) {
*eltp = pop_and_maybe_signal_unlocked();
popped = true;
}
......
......@@ -47,11 +47,13 @@ private:
T pop_and_maybe_signal_unlocked(void);
toku_mutex_t m_lock;
toku_cond_t m_cond;
T *m_array;
size_t m_cap;
size_t m_begin, m_limit;
toku_mutex_t m_lock;
toku_cond_t m_push_cond;
toku_cond_t m_pop_cond;
int m_push_waiters, m_pop_waiters;
};
}
......
......@@ -13,18 +13,20 @@
#include "toku_assert.h"
#include "circular_buffer.h"
#include "memory.h"
#include "toku_time.h"
#include "test.h"
static int verbose = 0;
static volatile bool running;
static volatile bool producers_joined;
static void *producer(void *extra) {
toku::circular_buffer<uint32_t> *buf = static_cast<toku::circular_buffer<uint32_t> *>(extra);
while (running) {
buf->push(random());
usleep(random() % 10000);
if (running) {
usleep(random() % 1000);
}
}
return nullptr;
......@@ -38,15 +40,15 @@ struct consumer_extra {
static void *consumer(void *extra) {
struct consumer_extra *e = static_cast<struct consumer_extra *>(extra);
while (running) {
while (!producers_joined) {
e->xorsum ^= e->buf->pop();
usleep(random() % 1000);
if (running) {
usleep(random() % 100);
}
}
uint32_t x;
bool popped = e->buf->trypop(&x);
while (popped) {
while (e->buf->trypop(&x)) {
e->xorsum ^= x;
popped = e->buf->trypop(&x);
}
return nullptr;
......@@ -83,6 +85,8 @@ static void test_with_threads(void) {
r = toku_pthread_join(producer_thds[i], nullptr);
invariant_zero(r);
}
swapped = __sync_bool_compare_and_swap(&producers_joined, false, true);
invariant(swapped);
r = toku_pthread_join(consumer_thd, nullptr);
invariant_zero(r);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment