Commit 90d72eba authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge partitioned counters onto main. Fixes #5267.

git-svn-id: file:///svn/toku/tokudb@46044 c7de825b-a66e-492c-adef-691d508d4ae1
parent daa30d26
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "log-internal.h" #include "log-internal.h"
#include "kibbutz.h" #include "kibbutz.h"
#include "background_job_manager.h" #include "background_job_manager.h"
#include "partitioned_counter.h"
/////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////
// Engine status // Engine status
...@@ -30,12 +31,14 @@ ...@@ -30,12 +31,14 @@
// These should be in the cachetable object, but we make them file-wide so that gdb can get them easily. // These should be in the cachetable object, but we make them file-wide so that gdb can get them easily.
// They were left here after engine status cleanup (#2949, rather than moved into the status struct) // They were left here after engine status cleanup (#2949, rather than moved into the status struct)
// so they are still easily available to the debugger and to save lots of typing. // so they are still easily available to the debugger and to save lots of typing.
static u_int64_t cachetable_miss;
static u_int64_t cachetable_misstime; // time spent waiting for disk read // if we had constructors and destructors, this would be cleaner. For now, we initialize with setup_cachetable_statistics().
static u_int64_t cachetable_puts; // how many times has a newly created node been put into the cachetable? static PARTITIONED_COUNTER cachetable_miss;
static u_int64_t cachetable_prefetches; // how many times has a block been prefetched into the cachetable? static PARTITIONED_COUNTER cachetable_misstime; // time spent waiting for disk read
static u_int64_t cachetable_evictions; static PARTITIONED_COUNTER cachetable_puts; // how many times has a newly created node been put into the cachetable?
static u_int64_t cleaner_executions; // number of times the cleaner thread's loop has executed static PARTITIONED_COUNTER cachetable_prefetches; // how many times has a block been prefetched into the cachetable?
static PARTITIONED_COUNTER cachetable_evictions;
static PARTITIONED_COUNTER cleaner_executions; // number of times the cleaner thread's loop has executed
static CACHETABLE_STATUS_S ct_status; static CACHETABLE_STATUS_S ct_status;
...@@ -183,10 +186,10 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) { ...@@ -183,10 +186,10 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
if (!ct_status.initialized) { if (!ct_status.initialized) {
status_init(); status_init();
} }
STATUS_VALUE(CT_MISS) = cachetable_miss; STATUS_VALUE(CT_MISS) = cachetable_miss.read();
STATUS_VALUE(CT_MISSTIME) = cachetable_misstime; STATUS_VALUE(CT_MISSTIME) = cachetable_misstime.read();
STATUS_VALUE(CT_PUTS) = cachetable_puts; STATUS_VALUE(CT_PUTS) = cachetable_puts.read();
STATUS_VALUE(CT_PREFETCHES) = cachetable_prefetches; STATUS_VALUE(CT_PREFETCHES) = cachetable_prefetches.read();
STATUS_VALUE(CT_SIZE_CURRENT) = ct->size_current; STATUS_VALUE(CT_SIZE_CURRENT) = ct->size_current;
STATUS_VALUE(CT_SIZE_LIMIT) = ct->size_limit; STATUS_VALUE(CT_SIZE_LIMIT) = ct->size_limit;
STATUS_VALUE(CT_SIZE_WRITING) = ct->size_evicting; STATUS_VALUE(CT_SIZE_WRITING) = ct->size_evicting;
...@@ -194,8 +197,8 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) { ...@@ -194,8 +197,8 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
STATUS_VALUE(CT_SIZE_LEAF) = ct->size_leaf; STATUS_VALUE(CT_SIZE_LEAF) = ct->size_leaf;
STATUS_VALUE(CT_SIZE_ROLLBACK) = ct->size_rollback; STATUS_VALUE(CT_SIZE_ROLLBACK) = ct->size_rollback;
STATUS_VALUE(CT_SIZE_CACHEPRESSURE) = ct->size_cachepressure; STATUS_VALUE(CT_SIZE_CACHEPRESSURE) = ct->size_cachepressure;
STATUS_VALUE(CT_EVICTIONS) = cachetable_evictions; STATUS_VALUE(CT_EVICTIONS) = cachetable_evictions.read();
STATUS_VALUE(CT_CLEANER_EXECUTIONS) = cleaner_executions; STATUS_VALUE(CT_CLEANER_EXECUTIONS) = cleaner_executions.read();
STATUS_VALUE(CT_CLEANER_PERIOD) = toku_get_cleaner_period_unlocked(ct); STATUS_VALUE(CT_CLEANER_PERIOD) = toku_get_cleaner_period_unlocked(ct);
STATUS_VALUE(CT_CLEANER_ITERATIONS) = toku_get_cleaner_iterations_unlocked(ct); STATUS_VALUE(CT_CLEANER_ITERATIONS) = toku_get_cleaner_iterations_unlocked(ct);
*statp = ct_status; *statp = ct_status;
...@@ -857,7 +860,7 @@ static void cachetable_free_pair(CACHETABLE ct, PAIR p) { ...@@ -857,7 +860,7 @@ static void cachetable_free_pair(CACHETABLE ct, PAIR p) {
void *write_extraargs = p->write_extraargs; void *write_extraargs = p->write_extraargs;
PAIR_ATTR old_attr = p->attr; PAIR_ATTR old_attr = p->attr;
cachetable_evictions++; cachetable_evictions.increment(1);
cachetable_unlock(ct); cachetable_unlock(ct);
PAIR_ATTR new_attr = p->attr; PAIR_ATTR new_attr = p->attr;
// Note that flush_callback is called with write_me FALSE, so the only purpose of this // Note that flush_callback is called with write_me FALSE, so the only purpose of this
...@@ -1279,7 +1282,7 @@ static int cachetable_put_internal( ...@@ -1279,7 +1282,7 @@ static int cachetable_put_internal(
} }
} }
// flushing could change the table size, but wont' change the fullhash // flushing could change the table size, but wont' change the fullhash
cachetable_puts++; cachetable_puts.increment(1);
PAIR p = cachetable_insert_at( PAIR p = cachetable_insert_at(
ct, ct,
cachefile, cachefile,
...@@ -1916,8 +1919,8 @@ int toku_cachetable_get_and_pin_with_dep_pairs ( ...@@ -1916,8 +1919,8 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
// The pair being fetched will be marked as pending if a checkpoint happens during the // The pair being fetched will be marked as pending if a checkpoint happens during the
// fetch because begin_checkpoint will mark as pending any pair that is locked even if it is clean. // fetch because begin_checkpoint will mark as pending any pair that is locked even if it is clean.
cachetable_fetch_pair(ct, cachefile, p, fetch_callback, read_extraargs, TRUE); cachetable_fetch_pair(ct, cachefile, p, fetch_callback, read_extraargs, TRUE);
cachetable_miss++; cachetable_miss.increment(1);
cachetable_misstime += get_tnow() - t0; cachetable_misstime.increment(get_tnow() - t0);
goto got_value; goto got_value;
} }
got_value: got_value:
...@@ -2142,8 +2145,8 @@ int toku_cachetable_get_and_pin_nonblocking ( ...@@ -2142,8 +2145,8 @@ int toku_cachetable_get_and_pin_nonblocking (
run_unlockers(unlockers); // we hold the ct mutex. run_unlockers(unlockers); // we hold the ct mutex.
u_int64_t t0 = get_tnow(); u_int64_t t0 = get_tnow();
cachetable_fetch_pair(ct, cf, p, fetch_callback, read_extraargs, FALSE); cachetable_fetch_pair(ct, cf, p, fetch_callback, read_extraargs, FALSE);
cachetable_miss++; cachetable_miss.increment(1);
cachetable_misstime += get_tnow() - t0; cachetable_misstime.increment(get_tnow() - t0);
cachetable_unlock(ct); cachetable_unlock(ct);
return TOKUDB_TRY_AGAIN; return TOKUDB_TRY_AGAIN;
} }
...@@ -2216,7 +2219,7 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, ...@@ -2216,7 +2219,7 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
// if not found then create a pair in the READING state and fetch it // if not found then create a pair in the READING state and fetch it
if (p == 0) { if (p == 0) {
cachetable_prefetches++; cachetable_prefetches.increment(1);
r = bjm_add_background_job(cf->bjm); r = bjm_add_background_job(cf->bjm);
assert_zero(r); assert_zero(r);
p = cachetable_insert_at( p = cachetable_insert_at(
...@@ -3207,7 +3210,7 @@ toku_cleaner_thread (void *cachetable_v) ...@@ -3207,7 +3210,7 @@ toku_cleaner_thread (void *cachetable_v)
assert(ct); assert(ct);
u_int32_t num_iterations = toku_get_cleaner_iterations(ct); u_int32_t num_iterations = toku_get_cleaner_iterations(ct);
for (u_int32_t i = 0; i < num_iterations; ++i) { for (u_int32_t i = 0; i < num_iterations; ++i) {
cleaner_executions++; cleaner_executions.increment(1);
cachetable_lock(ct); cachetable_lock(ct);
PAIR best_pair = NULL; PAIR best_pair = NULL;
int n_seen = 0; int n_seen = 0;
...@@ -3310,10 +3313,6 @@ toku_cleaner_thread (void *cachetable_v) ...@@ -3310,10 +3313,6 @@ toku_cleaner_thread (void *cachetable_v)
void __attribute__((__constructor__)) toku_cachetable_helgrind_ignore(void); void __attribute__((__constructor__)) toku_cachetable_helgrind_ignore(void);
void void
toku_cachetable_helgrind_ignore(void) { toku_cachetable_helgrind_ignore(void) {
HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&cachetable_miss, sizeof cachetable_miss);
HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&cachetable_misstime, sizeof cachetable_misstime);
HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&cachetable_puts, sizeof cachetable_puts);
HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&cachetable_prefetches, sizeof cachetable_prefetches);
HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&cachetable_evictions, sizeof cachetable_evictions); HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&cachetable_evictions, sizeof cachetable_evictions);
HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&cleaner_executions, sizeof cleaner_executions); HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&cleaner_executions, sizeof cleaner_executions);
HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&ct_status, sizeof ct_status); HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&ct_status, sizeof ct_status);
......
This diff is collapsed.
...@@ -27,7 +27,15 @@ ...@@ -27,7 +27,15 @@
// destroy_partitioned_counter Destroy it. // destroy_partitioned_counter Destroy it.
// increment_partitioned_counter Increment it. This is the frequent operation. // increment_partitioned_counter Increment it. This is the frequent operation.
// read_partitioned_counter Get the current value. This is infrequent. // read_partitioned_counter Get the current value. This is infrequent.
// See partitioned_counter.cc for the abstraction function and representation invariant.
//
// Restrictions: You may not access a partitioned_counter during
// destructor operation. So don't put engine-status in a destructor
// or a destructor function.
//
#if 0
// The old C interface. This required a bunch of explicit ___attribute__((__destructor__)) functions to remember to destroy counters at the end.
typedef struct partitioned_counter *PARTITIONED_COUNTER; typedef struct partitioned_counter *PARTITIONED_COUNTER;
PARTITIONED_COUNTER create_partitioned_counter(void); PARTITIONED_COUNTER create_partitioned_counter(void);
// Effect: Create a counter, initialized to zero. // Effect: Create a counter, initialized to zero.
...@@ -35,11 +43,47 @@ PARTITIONED_COUNTER create_partitioned_counter(void); ...@@ -35,11 +43,47 @@ PARTITIONED_COUNTER create_partitioned_counter(void);
void destroy_partitioned_counter (PARTITIONED_COUNTER); void destroy_partitioned_counter (PARTITIONED_COUNTER);
// Effect: Destroy the counter. No operations on that counter are permitted after this. // Effect: Destroy the counter. No operations on that counter are permitted after this.
void increment_partitioned_counter (PARTITIONED_COUNTER, unsigned long amount); void increment_partitioned_counter (PARTITIONED_COUNTER, u_int64_t amount);
// Effect: Increment the counter by amount. // Effect: Increment the counter by amount.
// Requires: No overflows. This is a 64-bit unsigned counter. // Requires: No overflows. This is a 64-bit unsigned counter.
unsigned long read_partitioned_counter (PARTITIONED_COUNTER); u_int64_t read_partitioned_counter (PARTITIONED_COUNTER);
// Effect: Return the current value of the counter. // Effect: Return the current value of the counter.
#endif
#include <pthread.h>
#include "fttypes.h"
// Used inside the PARTITIONED_COUNTER.
struct linked_list_head {
struct linked_list_element *first;
};
class PARTITIONED_COUNTER {
public:
PARTITIONED_COUNTER(void);
// Effect: Construct a counter, initialized to zero.
~PARTITIONED_COUNTER(void);
// Effect: Destruct the counter.
void increment(u_int64_t amount);
// Effect: Increment the counter by amount. This is a 64-bit unsigned counter, and if you overflow it, you will get overflowed results (that is mod 2^64).
// Requires: Don't use this from a static constructor or destructor.
u_int64_t read(void);
// Effect: Read the sum.
// Requires: Don't use this from a static constructor or destructor.
private:
u_int64_t _sum_of_dead; // The sum of all thread-local counts from threads that have terminated.
pthread_key_t _key; // The pthread_key which gives us the hook to construct and destruct thread-local storage.
struct linked_list_head _ll_counter_head; // A linked list of all the thread-local information for this counter.
// This function is used to destroy the thread-local part of the state when a thread terminates.
// But it's not the destructor for the local part of the counter, it's a destructor on a "dummy" key just so that we get a notification when a thread ends.
friend void destroy_thread_local_part_of_partitioned_counters (void *);
};
#endif #endif
...@@ -16,11 +16,11 @@ ...@@ -16,11 +16,11 @@
* alf 16-core server (xeon E5-2665 2.4GHz) sandybridge * alf 16-core server (xeon E5-2665 2.4GHz) sandybridge
* *
* mork mindy bradley alf * mork mindy bradley alf
* 0.3ns 1.07ns 1.27ns 0.58ns to do a ++, but it's got a race in it. * 0.3ns 1.07ns 1.27ns 0.61ns to do a ++, but it's got a race in it.
* 28.0ns 20.47ns 18.75ns 39.38ns to do a sync_fetch_and_add(). * 28.0ns 20.47ns 18.75ns 34.15ns to do a sync_fetch_and_add().
* 0.4ns 0.29ns 0.71ns 0.19ns to do with a single version of a counter * 0.4ns 0.29ns 0.71ns 0.19ns to do with a single version of a counter
* 0.33ns 0.69ns 0.18ns pure thread-local variable (no way to add things up) * 0.33ns 0.69ns 0.18ns pure thread-local variable (no way to add things up)
* 0.76ns 1.50ns 0.35ns partitioned_counter.c (using link-time optimization, otherwise the function all overwhelms everything) * 0.76ns 2.40ns 0.54ns partitioned_counter.c (using gcc link-time optimization, otherwise the function call overwhelms everything)
* *
* *
* How it works. Each thread has a thread-local counter structure with an integer in it. To increment, we increment the thread-local structure. * How it works. Each thread has a thread-local counter structure with an integer in it. To increment, we increment the thread-local structure.
...@@ -43,75 +43,101 @@ ...@@ -43,75 +43,101 @@
#include "toku_assert.h" #include "toku_assert.h"
#include "partitioned_counter.h" #include "partitioned_counter.h"
#include "memory.h" #include "memory.h"
#include "test.h"
// The test code includes the fastest version I could figure out to make, implemented below.
struct counter_s { struct counter_s {
bool inited; bool inited;
int counter; volatile int counter;
struct counter_s *prev, *next; struct counter_s *prev, *next;
int myid; int myid;
}; };
static __thread struct counter_s counter = {false,0, NULL,NULL,0}; static __thread struct counter_s counter = {false,0, NULL,NULL,0};
static int finished_counter=0; // counter for all threads that are done. static int finished_counter=0; // counter for all threads that are done.
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static struct counter_s *head=NULL, *tail=NULL; // We use a single mutex for anything complex. We'd like to use a mutex per partitioned counter, but we must cope with the possibility of a race between
// a terminating pthread (which calls destroy_counter()), and a call to the counter destructor. So we use a global mutex.
static pthread_mutex_t pc_mutex = PTHREAD_MUTEX_INITIALIZER;
static struct counter_s *head=NULL;
static pthread_key_t counter_key; static pthread_key_t counter_key;
static void destroy_counter (void *counterp) { static void pc_lock (void)
// Effect: Lock the pc mutex.
{
int r = pthread_mutex_lock(&pc_mutex);
assert(r==0);
}
static void pc_unlock (void)
// Effect: Unlock the pc mutex.
{
int r = pthread_mutex_unlock(&pc_mutex);
assert(r==0);
}
static void destroy_counter (void *counterp)
// Effect: This is the function passed to pthread_key_create that is to run whenever a thread terminates.
// The thread-local part of the counter must be copied into the shared state, and the thread-local part of the counter must be
// removed from the linked list of all thread-local parts.
{
assert((struct counter_s*)counterp==&counter); assert((struct counter_s*)counterp==&counter);
{ int r = pthread_mutex_lock(&mutex); assert(r==0); } pc_lock();
if (counter.prev==NULL) { if (counter.prev==NULL) {
assert(head==&counter); assert(head==&counter);
head = counter.next; head = counter.next;
} else { } else {
counter.prev->next = counter.next; counter.prev->next = counter.next;
} }
if (counter.next==NULL) { if (counter.next!=NULL) {
assert(tail==&counter);
tail = counter.prev;
} else {
counter.next->prev = counter.prev; counter.next->prev = counter.prev;
} }
finished_counter += counter.counter; finished_counter += counter.counter;
HELGRIND_VALGRIND_HG_ENABLE_CHECKING(&counter.counter, sizeof(counter.counter)); // stop ignoring races HELGRIND_VALGRIND_HG_ENABLE_CHECKING(&counter.counter, sizeof(counter.counter)); // stop ignoring races
//printf("finished counter now %d\n", finished_counter); //printf("finished counter now %d\n", finished_counter);
{ int r = pthread_mutex_unlock(&mutex); assert(r==0); } pc_unlock();
} }
static int idcounter=0; static int idcounter=0;
static inline void increment (void) { static inline void increment (void)
{
if (!counter.inited) { if (!counter.inited) {
{ int r = pthread_mutex_lock(&mutex); assert(r==0); } pc_lock();
{ int r = pthread_setspecific(counter_key, &counter); assert(r==0); } struct counter_s *cp = &counter;
counter.prev = tail; { int r = pthread_setspecific(counter_key, cp); assert(r==0); }
counter.next = NULL; cp->prev = NULL;
if (head==NULL) { cp->next = head;
head = &counter; if (head!=NULL) {
tail = &counter; head->prev = cp;
} else {
tail->next = &counter;
tail = &counter;
} }
counter.counter = 0; head = cp;
counter.inited = true; #ifdef __INTEL_COMPILER
counter.myid = idcounter++; __memory_barrier(); // for some reason I don't understand, ICC needs a memory barrier here. -Bradley
#endif
cp->counter = 0;
cp->inited = true;
cp->myid = idcounter++;
HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&counter.counter, sizeof(counter.counter)); // the counter increment is kind of racy. HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&counter.counter, sizeof(counter.counter)); // the counter increment is kind of racy.
{ int r = pthread_mutex_unlock(&mutex); assert(r==0); } pc_unlock();
} }
counter.counter++; counter.counter++;
} }
static int getvals (void) { static int getvals (void) {
{ int r = pthread_mutex_lock(&mutex); assert(r==0); } pc_lock();
int sum=finished_counter; int sum=finished_counter;
for (struct counter_s *p=head; p; p=p->next) { for (struct counter_s *p=head; p; p=p->next) {
sum+=p->counter; sum+=p->counter;
} }
{ int r = pthread_mutex_unlock(&mutex); assert(r==0); } pc_unlock();
return sum; return sum;
} }
/**********************************************************************************/
/* And now for some actual test code. */
/**********************************************************************************/
static const int N=10000000; static const int N=10000000;
static const int T=20; static const int T=20;
...@@ -120,7 +146,7 @@ static const int T=20; ...@@ -120,7 +146,7 @@ static const int T=20;
PARTITIONED_COUNTER pc; PARTITIONED_COUNTER pc;
static void *pc_doit (void *v) { static void *pc_doit (void *v) {
for (int i=0; i<N; i++) { for (int i=0; i<N; i++) {
increment_partitioned_counter(pc, 1); pc.increment(1);
} }
//printf("val=%ld\n", read_partitioned_counter(pc)); //printf("val=%ld\n", read_partitioned_counter(pc));
return v; return v;
...@@ -212,47 +238,42 @@ static void parse_args (int argc, const char *argv[]) { ...@@ -212,47 +238,42 @@ static void parse_args (int argc, const char *argv[]) {
static void do_timeit (void) { static void do_timeit (void) {
{ int r = pthread_key_create(&counter_key, destroy_counter); assert(r==0); } { int r = pthread_key_create(&counter_key, destroy_counter); assert(r==0); }
pc = create_partitioned_counter();
printf("%d threads\n%d increments per thread\n", T, N); printf("%d threads\n%d increments per thread\n", T, N);
timeit("++", old_doit_nonatomic); timeit("++", old_doit_nonatomic);
timeit("atomic++", old_doit); timeit("atomic++", old_doit);
timeit("fast", new_doit); timeit("fast", new_doit);
timeit("puretl", tl_doit); timeit("puretl", tl_doit);
timeit("pc", pc_doit); timeit("pc", pc_doit);
destroy_partitioned_counter(pc);
} }
struct test_arguments { struct test_arguments {
PARTITIONED_COUNTER pc; PARTITIONED_COUNTER pc;
unsigned long limit; u_int64_t limit;
unsigned long total_increment_per_writer; u_int64_t total_increment_per_writer;
volatile unsigned long unfinished_count; volatile u_int64_t unfinished_count;
}; };
static void *reader_test_fun (void *ta_v) { static void *reader_test_fun (void *ta_v) {
struct test_arguments *ta = (struct test_arguments *)ta_v; struct test_arguments *ta = (struct test_arguments *)ta_v;
unsigned long lastval = 0; u_int64_t lastval = 0;
printf("reader starting\n");
while (ta->unfinished_count>0) { while (ta->unfinished_count>0) {
unsigned long thisval = read_partitioned_counter(ta->pc); u_int64_t thisval = ta->pc.read();
assert(lastval <= thisval); assert(lastval <= thisval);
assert(thisval <= ta->limit); assert(thisval <= ta->limit);
lastval = thisval; lastval = thisval;
if (verboseness_cmdarg && (0==(thisval & (thisval-1)))) printf("Thisval=%ld\n", thisval); if (verboseness_cmdarg && (0==(thisval & (thisval-1)))) printf("Thisval=%ld\n", thisval);
} }
unsigned long thisval = read_partitioned_counter(ta->pc); u_int64_t thisval = ta->pc.read();
assert(thisval==ta->limit); assert(thisval==ta->limit);
return ta_v; return ta_v;
} }
static void *writer_test_fun (void *ta_v) { static void *writer_test_fun (void *ta_v) {
struct test_arguments *ta = (struct test_arguments *)ta_v; struct test_arguments *ta = (struct test_arguments *)ta_v;
printf("writer starting\n"); for (u_int64_t i=0; i<ta->total_increment_per_writer; i++) {
for (unsigned long i=0; i<ta->total_increment_per_writer; i++) {
if (i%1000 == 0) sched_yield(); if (i%1000 == 0) sched_yield();
increment_partitioned_counter(ta->pc, 1); ta->pc.increment(1);
} }
printf("writer done\n");
__sync_fetch_and_sub(&ta->unfinished_count, 1); __sync_fetch_and_sub(&ta->unfinished_count, 1);
return ta_v; return ta_v;
} }
...@@ -260,45 +281,67 @@ static void *writer_test_fun (void *ta_v) { ...@@ -260,45 +281,67 @@ static void *writer_test_fun (void *ta_v) {
static void do_testit (void) { static void do_testit (void) {
const int NGROUPS = 2; const int NGROUPS = 2;
PARTITIONED_COUNTER pcs[NGROUPS]; u_int64_t limits[NGROUPS];
unsigned long limits[NGROUPS]; limits [0] = 200000;
limits [0] = 2000000; limits [1] = 100000;
limits [1] = 1000000; u_int64_t n_writers[NGROUPS];
unsigned long n_writers[NGROUPS]; n_writers[0] = 2;
n_writers[0] = 20; n_writers[1] = 4;
n_writers[1] = 40;
struct test_arguments tas[NGROUPS]; struct test_arguments tas[NGROUPS];
pthread_t reader_threads[NGROUPS]; pthread_t reader_threads[NGROUPS];
pthread_t *writer_threads[NGROUPS]; pthread_t *writer_threads[NGROUPS];
for (int i=0; i<NGROUPS; i++) { for (int i=0; i<NGROUPS; i++) {
pcs[i] = create_partitioned_counter();
tas[i].pc = pcs[i];
tas[i].limit = limits[i]; tas[i].limit = limits[i];
tas[i].unfinished_count = n_writers[i]; tas[i].unfinished_count = n_writers[i];
tas[i].total_increment_per_writer = limits[i]/n_writers[i]; tas[i].total_increment_per_writer = limits[i]/n_writers[i];
assert(tas[i].total_increment_per_writer * n_writers[i] == limits[i]); assert(tas[i].total_increment_per_writer * n_writers[i] == limits[i]);
pt_create(&reader_threads[i], reader_test_fun, &tas[i]); pt_create(&reader_threads[i], reader_test_fun, &tas[i]);
MALLOC_N(n_writers[i], writer_threads[i]); MALLOC_N(n_writers[i], writer_threads[i]);
for (unsigned long j=0; j<n_writers[i] ; j++) { for (u_int64_t j=0; j<n_writers[i] ; j++) {
pt_create(&writer_threads[i][j], writer_test_fun, &tas[i]); pt_create(&writer_threads[i][j], writer_test_fun, &tas[i]);
} }
} }
for (int i=0; i<NGROUPS; i++) { for (int i=0; i<NGROUPS; i++) {
pt_join(reader_threads[i], &tas[i]); pt_join(reader_threads[i], &tas[i]);
for (unsigned long j=0; j<n_writers[i] ; j++) { for (u_int64_t j=0; j<n_writers[i] ; j++) {
pt_join(writer_threads[i][j], &tas[i]); pt_join(writer_threads[i][j], &tas[i]);
} }
toku_free(writer_threads[i]); toku_free(writer_threads[i]);
destroy_partitioned_counter(pcs[i]);
} }
} }
int main (int argc, const char *argv[]) { volatile int spinwait=0;
static void* test2_fun (void* mypc_v) {
PARTITIONED_COUNTER *mypc = (PARTITIONED_COUNTER*)mypc_v;
mypc->increment(3);
spinwait=1;
while (spinwait==1);
// mypc no longer points at a valid data structure.
return NULL;
}
static void do_testit2 (void)
// This test checks to see what happens if a thread is still live when we destruct a counter.
// A thread increments the counter, then lets us know through a spin wait, then waits until we destroy the counter.
{
pthread_t t;
{
PARTITIONED_COUNTER mypc;
pt_create(&t, test2_fun, &mypc);
while(spinwait==0); // wait until he incremented the counter.
assert(mypc.read()==3);
} // leave scope, so the counter goes away.
spinwait=2; // tell the other guy to finish up.
pt_join(t, NULL);
}
int test_main (int argc, const char *argv[]) {
parse_args(argc, argv); parse_args(argc, argv);
if (time_cmdarg) { if (time_cmdarg) {
do_timeit(); do_timeit();
} else { } else {
do_testit(); do_testit();
do_testit2();
} }
return 0; return 0;
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment