Commit 9ae824cc authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

refs #5351 add circular_buffer, no comments yet

git-svn-id: file:///svn/toku/tokudb@48715 c7de825b-a66e-492c-adef-691d508d4ae1
parent abdf7c7c
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
namespace toku {
template<typename T>
void circular_buffer<T>::init(T * const array, size_t cap) {
invariant_notnull(array);
toku_mutex_init(&m_lock, nullptr);
toku_cond_init(&m_cond, nullptr);
m_array = array;
m_cap = cap;
m_begin = 0;
m_limit = 0;
}
template<typename T>
void circular_buffer<T>::deinit(void) {
lock();
invariant(this->is_empty());
unlock();
toku_cond_destroy(&m_cond);
toku_mutex_destroy(&m_lock);
}
template<typename T>
void circular_buffer<T>::lock(void) {
toku_mutex_lock(&m_lock);
}
template<typename T>
void circular_buffer<T>::unlock(void) {
toku_mutex_unlock(&m_lock);
}
template<typename T>
size_t circular_buffer<T>::size(void) const {
toku_mutex_assert_locked(&m_lock);
return m_limit - m_begin;
}
template<typename T>
bool circular_buffer<T>::is_empty(void) const {
return size() == 0;
}
template<typename T>
bool circular_buffer<T>::is_full(void) const {
return size() == m_cap;
}
template<typename N>
__attribute__((const))
static inline N mod(N a, N b) {
return ((a % b) + a) % b;
}
template<typename T>
T *circular_buffer<T>::get_addr(size_t idx) {
toku_mutex_assert_locked(&m_lock);
invariant(idx >= m_begin);
invariant(idx < m_limit);
return &m_array[mod(idx, m_cap)];
}
template<typename T>
void circular_buffer<T>::push_and_maybe_signal_unlocked(const T &elt) {
toku_mutex_assert_locked(&m_lock);
invariant(!is_full());
bool will_signal = is_empty();
size_t location = m_limit++;
*get_addr(location) = elt;
if (will_signal) {
toku_cond_signal(&m_cond);
}
}
template<typename T>
void circular_buffer<T>::push(const T &elt) {
lock();
while (is_full()) {
toku_cond_wait(&m_cond, &m_lock);
}
push_and_maybe_signal_unlocked(elt);
unlock();
}
template<typename T>
bool circular_buffer<T>::trypush(const T &elt) {
bool pushed = false;
lock();
if (!is_full()) {
push_and_maybe_signal_unlocked(elt);
pushed = true;
}
unlock();
return pushed;
}
template<typename T>
T circular_buffer<T>::pop_and_maybe_signal_unlocked(void) {
toku_mutex_assert_locked(&m_lock);
invariant(!is_empty());
bool will_signal = is_full();
T ret = *get_addr(m_begin);
++m_begin;
if (will_signal) {
toku_cond_signal(&m_cond);
}
return ret;
}
template<typename T>
T circular_buffer<T>::pop(void) {
lock();
while (is_empty()) {
toku_cond_wait(&m_cond, &m_lock);
}
T ret = pop_and_maybe_signal_unlocked();
unlock();
return ret;
}
template<typename T>
bool circular_buffer<T>::trypop(T * const eltp) {
bool popped = false;
invariant_notnull(eltp);
lock();
if (!is_empty()) {
*eltp = pop_and_maybe_signal_unlocked();
popped = true;
}
unlock();
return popped;
}
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#ifndef CIRCULAR_BUFFER_H
#define CIRCULAR_BUFFER_H
#include <stdbool.h>
#include <stddef.h>
#include "toku_pthread.h"
namespace toku {
template<typename T>
class circular_buffer {
public:
__attribute__((nonnull))
void init(T * const array, size_t cap);
void deinit(void);
void push(const T &);
bool trypush(const T &);
T pop(void);
__attribute__((nonnull))
bool trypop(T * const);
private:
void lock(void);
void unlock(void);
size_t size(void) const;
bool is_empty(void) const;
bool is_full(void) const;
T *get_addr(size_t);
void push_and_maybe_signal_unlocked(const T &elt);
T pop_and_maybe_signal_unlocked(void);
toku_mutex_t m_lock;
toku_cond_t m_cond;
T *m_array;
size_t m_cap;
size_t m_begin, m_limit;
};
}
#include "circular_buffer.cc"
#endif // CIRCULAR_BUFFER_H
......@@ -47,6 +47,7 @@ if(BUILD_TESTING)
endforeach(test)
add_helgrind_test(portability/helgrind_test_partitioned_counter $<TARGET_FILE:test_partitioned_counter>)
add_helgrind_test(portability/helgrind_test-circular-buffer $<TARGET_FILE:test-circular-buffer>)
foreach(test ${tests})
add_test(portability/${test} ${test})
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <unistd.h>
#include "toku_assert.h"
#include "circular_buffer.h"
#include "memory.h"
#include "toku_time.h"
#include "test.h"
static int verbose = 0;
static volatile bool running;
static void *producer(void *extra) {
toku::circular_buffer<uint32_t> *buf = static_cast<toku::circular_buffer<uint32_t> *>(extra);
while (running) {
buf->push(random());
usleep(random() % 10000);
}
return nullptr;
}
struct consumer_extra {
toku::circular_buffer<uint32_t> *buf;
uint32_t xorsum;
};
static void *consumer(void *extra) {
struct consumer_extra *e = static_cast<struct consumer_extra *>(extra);
while (running) {
e->xorsum ^= e->buf->pop();
usleep(random() % 1000);
}
uint32_t x;
bool popped = e->buf->trypop(&x);
while (popped) {
e->xorsum ^= x;
popped = e->buf->trypop(&x);
}
return nullptr;
}
static void test_with_threads(void) {
const size_t asize = 10000;
uint32_t array[asize];
toku::circular_buffer<uint32_t> buf;
ZERO_STRUCT(buf);
buf.init(array, asize);
bool swapped = __sync_bool_compare_and_swap(&running, false, true);
invariant(swapped);
struct consumer_extra extra = { .buf = &buf, .xorsum = 0 };
toku_pthread_t consumer_thd;
int r = toku_pthread_create(&consumer_thd, nullptr, consumer, &extra);
invariant_zero(r);
const int nproducers = 10;
toku_pthread_t producer_thds[nproducers];
for (int i = 0; i < nproducers; ++i) {
r = toku_pthread_create(&producer_thds[i], nullptr, producer, &buf);
invariant_zero(r);
}
usleep(20 * 1000 * 1000);
swapped = __sync_bool_compare_and_swap(&running, true, false);
invariant(swapped);
for (int i = 0; i < nproducers; ++i) {
r = toku_pthread_join(producer_thds[i], nullptr);
invariant_zero(r);
}
r = toku_pthread_join(consumer_thd, nullptr);
invariant_zero(r);
buf.deinit();
if (verbose) {
printf("%" PRIu32 "\n", extra.xorsum);
}
}
int test_main(int argc, char *const argv[]) {
{
const char *progname=argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0],"-v")==0) {
++verbose;
} else if (strcmp(argv[0],"-q")==0) {
verbose=0;
} else {
fprintf(stderr, "Usage:\n %s [-v] [-q]\n", progname);
exit(1);
}
argc--; argv++;
}
}
test_with_threads();
return 0;
}
......@@ -108,12 +108,12 @@ toku_mutex_unlock(toku_mutex_t *mutex) {
#if TOKU_PTHREAD_DEBUG
static inline void
toku_mutex_assert_locked(toku_mutex_t *mutex) {
toku_mutex_assert_locked(const toku_mutex_t *mutex) {
invariant(mutex->locked);
}
#else
static inline void
toku_mutex_assert_locked(toku_mutex_t *mutex __attribute__((unused))) {
toku_mutex_assert_locked(const toku_mutex_t *mutex __attribute__((unused))) {
}
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment