Commit 24ca9e75 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Fix #5295, #5292. {{{svn merge -r46285:46297 ../tokudb.5295b}}}

git-svn-id: file:///svn/toku/tokudb@46309 c7de825b-a66e-492c-adef-691d508d4ae1
parent c28b01aa
......@@ -20,6 +20,7 @@
#include "log-internal.h"
#include "kibbutz.h"
#include "background_job_manager.h"
#include "partitioned_counter.h"
///////////////////////////////////////////////////////////////////////////////////
// Engine status
......@@ -30,12 +31,40 @@
// These should be in the cachetable object, but we make them file-wide so that gdb can get them easily.
// They were left here after engine status cleanup (#2949, rather than moved into the status struct)
// so they are still easily available to the debugger and to save lots of typing.
static uint64_t cachetable_miss;
static uint64_t cachetable_misstime; // time spent waiting for disk read
static uint64_t cachetable_puts; // how many times has a newly created node been put into the cachetable?
static uint64_t cachetable_prefetches; // how many times has a block been prefetched into the cachetable?
static uint64_t cachetable_evictions;
static uint64_t cleaner_executions; // number of times the cleaner thread's loop has executed
// If we had constructors and destructors, this would be cleaner. For now, we initialize with setup_cachetable_statistics().
static PARTITIONED_COUNTER cachetable_miss;
static PARTITIONED_COUNTER cachetable_misstime; // time spent waiting for disk read
static PARTITIONED_COUNTER cachetable_puts; // how many times has a newly created node been put into the cachetable?
static PARTITIONED_COUNTER cachetable_prefetches; // how many times has a block been prefetched into the cachetable?
static PARTITIONED_COUNTER cachetable_evictions;
static PARTITIONED_COUNTER cleaner_executions; // number of times the cleaner thread's loop has executed
static bool cachetables_inited = false;
void toku_cachetables_init(void) {
assert(!cachetables_inited);
cachetables_inited = true;
cachetable_miss = create_partitioned_counter();
cachetable_misstime = create_partitioned_counter();
cachetable_puts = create_partitioned_counter();
cachetable_prefetches = create_partitioned_counter();
cachetable_evictions = create_partitioned_counter();
cleaner_executions = create_partitioned_counter();
}
void toku_cachetables_destroy(void) {
#define DESTROY(x) destroy_partitioned_counter(x); x=NULL;
assert(cachetables_inited);
cachetables_inited = false;
DESTROY(cachetable_miss);
DESTROY(cachetable_misstime);
DESTROY(cachetable_puts);
DESTROY(cachetable_prefetches);
DESTROY(cachetable_evictions);
DESTROY(cleaner_executions);
#undef DESTROY
}
static CACHETABLE_STATUS_S ct_status;
......@@ -183,10 +212,10 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
if (!ct_status.initialized) {
status_init();
}
STATUS_VALUE(CT_MISS) = cachetable_miss;
STATUS_VALUE(CT_MISSTIME) = cachetable_misstime;
STATUS_VALUE(CT_PUTS) = cachetable_puts;
STATUS_VALUE(CT_PREFETCHES) = cachetable_prefetches;
STATUS_VALUE(CT_MISS) = read_partitioned_counter(cachetable_miss);
STATUS_VALUE(CT_MISSTIME) = read_partitioned_counter(cachetable_misstime);
STATUS_VALUE(CT_PUTS) = read_partitioned_counter(cachetable_puts);
STATUS_VALUE(CT_PREFETCHES) = read_partitioned_counter(cachetable_prefetches);
STATUS_VALUE(CT_SIZE_CURRENT) = ct->size_current;
STATUS_VALUE(CT_SIZE_LIMIT) = ct->size_limit;
STATUS_VALUE(CT_SIZE_WRITING) = ct->size_evicting;
......@@ -194,8 +223,8 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
STATUS_VALUE(CT_SIZE_LEAF) = ct->size_leaf;
STATUS_VALUE(CT_SIZE_ROLLBACK) = ct->size_rollback;
STATUS_VALUE(CT_SIZE_CACHEPRESSURE) = ct->size_cachepressure;
STATUS_VALUE(CT_EVICTIONS) = cachetable_evictions;
STATUS_VALUE(CT_CLEANER_EXECUTIONS) = cleaner_executions;
STATUS_VALUE(CT_EVICTIONS) = read_partitioned_counter(cachetable_evictions);
STATUS_VALUE(CT_CLEANER_EXECUTIONS) = read_partitioned_counter(cleaner_executions);
STATUS_VALUE(CT_CLEANER_PERIOD) = toku_get_cleaner_period_unlocked(ct);
STATUS_VALUE(CT_CLEANER_ITERATIONS) = toku_get_cleaner_iterations_unlocked(ct);
*statp = ct_status;
......@@ -857,7 +886,7 @@ static void cachetable_free_pair(CACHETABLE ct, PAIR p) {
void *write_extraargs = p->write_extraargs;
PAIR_ATTR old_attr = p->attr;
cachetable_evictions++;
increment_partitioned_counter(cachetable_evictions, 1);
cachetable_unlock(ct);
PAIR_ATTR new_attr = p->attr;
// Note that flush_callback is called with write_me false, so the only purpose of this
......@@ -1279,7 +1308,7 @@ static int cachetable_put_internal(
}
}
// flushing could change the table size, but wont' change the fullhash
cachetable_puts++;
increment_partitioned_counter(cachetable_puts, 1);
PAIR p = cachetable_insert_at(
ct,
cachefile,
......@@ -1916,8 +1945,8 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
// The pair being fetched will be marked as pending if a checkpoint happens during the
// fetch because begin_checkpoint will mark as pending any pair that is locked even if it is clean.
cachetable_fetch_pair(ct, cachefile, p, fetch_callback, read_extraargs, true);
cachetable_miss++;
cachetable_misstime += get_tnow() - t0;
increment_partitioned_counter(cachetable_miss, 1);
increment_partitioned_counter(cachetable_misstime, get_tnow() - t0);
goto got_value;
}
got_value:
......@@ -2142,8 +2171,8 @@ int toku_cachetable_get_and_pin_nonblocking (
run_unlockers(unlockers); // we hold the ct mutex.
uint64_t t0 = get_tnow();
cachetable_fetch_pair(ct, cf, p, fetch_callback, read_extraargs, false);
cachetable_miss++;
cachetable_misstime += get_tnow() - t0;
increment_partitioned_counter(cachetable_miss, 1);
increment_partitioned_counter(cachetable_misstime, get_tnow() - t0);
cachetable_unlock(ct);
return TOKUDB_TRY_AGAIN;
}
......@@ -2216,7 +2245,7 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, uint32_t fullhash,
// if not found then create a pair in the READING state and fetch it
if (p == 0) {
cachetable_prefetches++;
increment_partitioned_counter(cachetable_prefetches, 1);
r = bjm_add_background_job(cf->bjm);
assert_zero(r);
p = cachetable_insert_at(
......@@ -3207,7 +3236,7 @@ toku_cleaner_thread (void *cachetable_v)
assert(ct);
uint32_t num_iterations = toku_get_cleaner_iterations(ct);
for (uint32_t i = 0; i < num_iterations; ++i) {
cleaner_executions++;
increment_partitioned_counter(cleaner_executions, 1);
cachetable_lock(ct);
PAIR best_pair = NULL;
int n_seen = 0;
......
......@@ -507,5 +507,11 @@ extern int toku_cachetable_get_checkpointing_user_data_status(void);
int
toku_cleaner_thread (void *cachetable_v);
void toku_cachetables_init (void);
// Effect: Initialize the cachetables module. CDall this before calling any other cachetable operations.
void toku_cachetables_destroy (void);
// Effect: Deinitialize the cachetables module. CDall this after calling any other cachetable operations to free resources that may have been allocated.
// To use the cachetable module again, call toku_cachetables_init() again.
#endif
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ifndef _DOUBLY_LINKED_LIST_H_
#define _DOUBLY_LINKED_LIST_H_
#ident "$Id: partitioned_counter.cc 46098 2012-07-24 21:58:41Z bkuszmaul $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
//******************************************************************************
//
// Overview: A doubly linked list with elements of type T.
// Each element that wants to be put into the list provides a
// LinkedListElement<T> as well as a pointer to the the object of type T.
// Typically, the user embeds the linked list element into the object itself,
// for example as
// struct foo {
// toku::LinkedListElement<struct foo *> linked_list_elt;
// ... other elements of foo
// };
// then when inserting foo into a list defined as
// toku::DoublyLinkedList<struct foo *> list_of_foos;
// you write
// struct foo f;
// list_of_foos->insert(&f->linked_list_elt, &f);
//
// Operations: Constructor and deconstructors are provided (they don't
// need to anything but fill in a field) for the DoublyLinkedList.
// Operations to insert an element and remove it, as well as to pop
// an element out of the list.
// Also a LinkedListElement class is provided with a method to get a
// pointer to the object of type T.
//******************************************************************************
#include <stdbool.h>
//#define BEGIN_TOKUNAMESPACE namespace toku {
//#define END_TOKUNAMESPACE };
#define BEGIN_TOKUNAMESPACE
#define END_TOKUNAMESPACE
BEGIN_TOKUNAMESPACE
template<typename T> class DoublyLinkedList;
template<typename T> class LinkedListElement {
friend class DoublyLinkedList<T>;
private:
T container;
LinkedListElement<T> *prev, *next;
public:
T get_container(void) {
return container;
}
};
template<typename T> class DoublyLinkedList {
public:
void init (void);
// Effect: Initialize a doubly linked list (to be empty).
void insert(LinkedListElement<T> *ll_elt, T container);
// Effect: Add an item to a linked list.
// Implementation note: Push the item to the head of the list.
void remove(LinkedListElement<T> *ll_elt);
// Effect: Remove an item from a linked list.
// Requires: The item is in the list identified by head.
bool pop(LinkedListElement<T> **ll_eltp);
// Effect: if the list is empty, return false.
// Otherwise return true and set *ll_eltp to the first item, and remove that item from the list.
template<typename extra_t> int iterate(int (*fun)(T container, extra_t extra), extra_t extra);
// Effect: Call fun(e, extra) on every element of the linked list. If ever fun returns nonzero, then quit early and return that value.
// If fun always return zero, then this function returns zero.
private:
LinkedListElement<T> *m_first;
};
//******************************************************************************
// DoublyLinkedList implementation starts here.
//******************************************************************************
#include <stddef.h>
template<typename T> void DoublyLinkedList<T>::init(void) {
m_first = NULL;
}
template<typename T> void DoublyLinkedList<T>::insert(LinkedListElement<T> *ll_elt, T container) {
LinkedListElement<T> *old_first = m_first;
ll_elt->container = container;
ll_elt->next = old_first;
ll_elt->prev = NULL;
if (old_first!=NULL) {
old_first->prev = ll_elt;
}
m_first = ll_elt;
}
template<typename T> void DoublyLinkedList<T>::remove(LinkedListElement<T> *ll_elt) {
LinkedListElement<T> *old_prev = ll_elt->prev;
LinkedListElement<T> *old_next = ll_elt->next;
if (old_prev==NULL) {
m_first = old_next;
} else {
old_prev->next = old_next;
}
if (old_next==NULL) {
/* nothing */
} else {
old_next->prev = old_prev;
}
}
template<typename T> bool DoublyLinkedList<T>::pop(LinkedListElement<T> **ll_eltp) {
LinkedListElement<T> *first = m_first;
if (first) {
assert(first->prev==NULL);
m_first = first->next;
if (first->next) {
first->next->prev = NULL;
}
first->next=NULL;
*ll_eltp = first;
return true;
} else {
return false;
}
}
template<typename T>
template<typename extra_t>
int DoublyLinkedList<T>::iterate(int (*fun)(T container, extra_t extra), extra_t extra) {
for (LinkedListElement<T> *le = m_first; le; le=le->next) {
int r = fun(le->container, extra);
if (r!=0) return r;
}
return 0;
}
END_TOKUNAMESPACE
#endif
......@@ -127,6 +127,7 @@ basement nodes, bulk fetch, and partial fetch:
#include <ft-flusher.h>
#include <valgrind/helgrind.h>
#include "txn_manager.h"
#include "partitioned_counter.h"
#if defined(HAVE_CILK)
#include <cilk/cilk.h>
......@@ -5498,10 +5499,10 @@ int toku_ft_layer_init(void) {
r = toku_portability_init();
if (r) { goto exit; }
partitioned_counters_init();
toku_checkpoint_init();
toku_ft_serialize_layer_init();
toku_cachetables_init();
toku_mutex_init(&ft_open_close_lock, NULL);
exit:
return r;
......@@ -5509,9 +5510,10 @@ exit:
void toku_ft_layer_destroy(void) {
toku_mutex_destroy(&ft_open_close_lock);
toku_cachetables_destroy();
toku_ft_serialize_layer_destroy();
toku_checkpoint_destroy();
partitioned_counters_destroy();
//Portability must be cleaned up last
toku_portability_destroy();
}
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ifndef _GROWABLE_ARRAY_H_
#define _GROWABLE_ARRAY_H_
#ident "$Id: partitioned_counter.cc 46098 2012-07-24 21:58:41Z bkuszmaul $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
//******************************************************************************
//
// Overview: A growable array is a little bit like std::vector except that
// it doesn't have constructors (hence can be used in static constructs, since
// the google style guide says no constructors), and it's a little simpler.
// Operations:
// init and deinit (we don't have constructors and destructors).
// fetch_unchecked to get values out.
// store_unchecked to put values in.
// push to add an element at the end
// get_size to find out the size
// get_memory_size to find out how much memory the data stucture is using.
//
//******************************************************************************
//#define BEGIN_TOKUNAMESPACE namespace toku {
//#define END_TOKUNAMESPACE };
#define BEGIN_TOKUNAMESPACE
#define END_TOKUNAMESPACE
BEGIN_TOKUNAMESPACE
template<typename T> class GrowableArray {
public:
void init (void)
// Effect: Initialize the array to contain no elements.
{
m_array=NULL;
m_size=0;
m_size_limit=0;
}
void deinit (void)
// Effect: Deinitialize the array (freeing any memory it uses, for example).
{
toku_free(m_array);
m_array =NULL;
m_size =0;
m_size_limit=0;
}
T fetch_unchecked (size_t i)
// Effect: Fetch the ith element. If i is out of range, the system asserts.
{
return m_array[i];
}
void store_unchecked (size_t i, T v)
// Effect: Store v in the ith element. If i is out of range, the system asserts.
{
assert(i<m_size);
m_array[i]=v;
}
void push (T v)
// Effect: Add v to the end of the array (increasing the size). The amortized cost of this operation is constant.
// Implementation hint: Double the size of the array when it gets too big so that the amortized cost stays constant.
{
if (m_size>=m_size_limit) {
if (m_array==NULL) {
m_size_limit=1;
} else {
m_size_limit*=2;
}
XREALLOC_N(m_size_limit, m_array);
}
m_array[m_size++]=v;
}
size_t get_size (void)
// Effect: Return the number of elements in the array.
{
return m_size;
}
size_t memory_size(void)
// Effect: Return the size (in bytes) that the array occupies in memory. This is really only an estimate.
{
return sizeof(*this)+sizeof(T)*m_size_limit;
}
private:
T *m_array;
size_t m_size;
size_t m_size_limit; // How much space is allocated in array.
};
END_TOKUNAMESPACE
#endif
......@@ -3,135 +3,298 @@
#ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "partitioned_counter.h"
#include "memory.h"
#include <pthread.h>
#include <valgrind/helgrind.h>
#include <sys/types.h>
#include <pthread.h>
struct local_counter {
unsigned long sum;
struct local_counter *prev, *next;
PARTITIONED_COUNTER owner;
};
#include "memory.h"
#include "partitioned_counter.h"
#include "doubly_linked_list.h"
#include "growable_array.h"
struct partitioned_counter {
unsigned long sum_of_dead;
pthread_key_t key;
struct local_counter *first, *last;
};
//******************************************************************************
// Representation: The representation of a partitioned counter
// comprises a sum, called sum_of_dead; an index, called the ckey,
// which indexes into a thread-local array to find a thread-local
// part of the counter; and a linked list of thread-local parts.
// There is also a linked list, for each thread that has a
// thread-local part of any counter, of all the thread-local parts of
// all the counters.
// Abstraction function: The sum is represented by the sum of _sum and
// the sum's of the thread-local parts of the counter.
// Representation invariant: Every thread-local part is in the linked
// list of the thread-local parts of its counter, as well as in the
// linked list of the counters of a the thread.
//******************************************************************************
//******************************************************************************
// The mutex for the PARTITIONED_COUNTER
// We have a single mutex for all the counters because
// (a) the mutex is obtained infrequently, and
// (b) it helps us avoid race conditions when destroying the counters.
static pthread_mutex_t partitioned_counter_mutex = PTHREAD_MUTEX_INITIALIZER;
// The alternative that I couldn't make work is to have a mutex per counter.
// But the problem is that the counter can be destroyed before threads
// terminate, or maybe a thread terminates before the counter is destroyed.
// If the counter is destroyed first, then the mutex is no longer available.
//******************************************************************************
static pthread_mutex_t partitioned_counter_mutex = PTHREAD_MUTEX_INITIALIZER;
static void pc_lock (void) {
static void pc_lock (void)
// Effect: Lock the mutex.
{
int r = pthread_mutex_lock(&partitioned_counter_mutex);
assert(r==0);
}
static void pc_unlock (void) {
static void pc_unlock (void)
// Effect: Unlock the mutex.
{
int r = pthread_mutex_unlock(&partitioned_counter_mutex);
assert(r==0);
}
static void local_destroy_counter (void *counterp) {
//******************************************************************************
// Key creation primivites.
//******************************************************************************
static void pk_create (pthread_key_t *key, void (*destructor)(void*)) {
int r = pthread_key_create(key, destructor);
assert(r==0);
}
static void pk_delete (pthread_key_t key) {
int r = pthread_key_delete(key);
assert(r==0);
}
static void pk_setspecific (pthread_key_t key, const void *value) {
int r = pthread_setspecific(key, value);
assert(r==0);
}
//******************************************************************************
// The counter itself.
// The thread local part of a counter, comprising the thread-local sum a pointer
// to the partitioned_counter, a pointer to the thread_local list head, and two
// linked lists. One of the lists is all the thread-local parts that belong to
// the same counter, and the other is all the thread-local parts that belogn to
// the same thread.
//******************************************************************************
struct local_counter;
struct partitioned_counter {
uint64_t sum_of_dead; // The sum of all thread-local counts from threads that have terminated.
uint64_t pc_key; // A unique integer among all counters that have been created but not yet destroyed.
DoublyLinkedList<struct local_counter *> ll_counter_head; // A linked list of all the thread-local information for this counter.
};
struct local_counter {
uint64_t sum; // The thread-local sum.
PARTITIONED_COUNTER owner_pc; // The partitioned counter that this is part of.
GrowableArray<struct local_counter *> *thread_local_array; // The thread local array for this thread holds this local_counter at offset owner_pc->pc_key.
LinkedListElement<struct local_counter *> ll_in_counter; // Element for the doubly-linked list of thread-local information for this PARTITIONED_COUNTER.
};
// Try to get it it into one cache line by aligning it.
static __thread GrowableArray<struct local_counter *> thread_local_array;
// I want this to be static, but I have to use hidden visibility instead because it's a friend function.
static void destroy_thread_local_part_of_partitioned_counters (void *ignore_me);
static void destroy_thread_local_part_of_partitioned_counters (void *ignore_me __attribute__((__unused__)))
// Effect: This function is called whenever a thread terminates using the
// destructor of the thread_destructor_key (defined below). First grab the
// lock, then go through all the partitioned counters and removes the part that
// is local to this thread. We don't actually need the contents of the
// thread_destructor_key except to cause this function to run. The content of
// the key is a static string, so don't try to free it.
{
pc_lock();
struct local_counter *CAST_FROM_VOIDP(lc, counterp);
PARTITIONED_COUNTER owner = lc->owner;
// Save the sum
for (size_t i=0; i<thread_local_array.get_size(); i++) {
struct local_counter *lc = thread_local_array.fetch_unchecked(i);
if (lc==NULL) continue;
PARTITIONED_COUNTER owner = lc->owner_pc;
owner->sum_of_dead += lc->sum;
// Remove from linked list.
if (lc->prev) {
lc->prev->next = lc->next;
} else {
owner->first = lc->next;
owner->ll_counter_head.remove(&lc->ll_in_counter);
toku_free(lc);
}
if (lc->next) {
lc->next->prev = lc->prev;
thread_local_array.deinit();
pc_unlock();
}
//******************************************************************************
// We employ a system-wide pthread_key simply to get a notification when a
// thread terminates. The key will simply contain a constant string (it's "dont
// care", but it doesn't matter what it is, as long as it's not NULL. We need
// a constructor function to set up the pthread_key. We used a constructor
// function intead of a C++ constructor because that's what we are used to,
// rather than because it's necessarily better. Whenever a thread tries to
// increment a partitioned_counter for the first time, it sets the
// pthread_setspecific for the thread_destructor_key. It's OK if the key gets
// setspecific multiple times, it's always the same value. When a thread (that
// has created a thread-local part of any partitioned counter) terminates, the
// destroy_thread_local_part_of_partitioned_counters will run. It may run
// before or after other pthread_key destructors, but the thread-local
// ll_thread_head variable is still present until the thread is completely done
// running.
//******************************************************************************
static pthread_key_t thread_destructor_key;
//******************************************************************************
// We don't like using up pthread_keys (macos provides only 128 of them),
// so we built our own.
//******************************************************************************
bool *counters_in_use = NULL;
uint64_t counters_in_use_size = 0;
static uint64_t allocate_counter (void)
// Effect: Find an unused counter number, and allocate it, returning the counter number.
// Requires: The pc mutex is held before calling.
{
for (uint64_t i=0; i<counters_in_use_size; i++) {
if (!counters_in_use[i]) {
counters_in_use[i]=true;
return i;
}
}
uint64_t old_size = counters_in_use_size;
if (counters_in_use_size==0) {
counters_in_use_size = 1;
} else {
owner->last = lc->prev;
counters_in_use_size *= 2;
}
// Free the local part of the counter and return.
toku_free(lc);
{
int r = pthread_setspecific(owner->key, NULL);
assert(r==0);
XREALLOC_N(counters_in_use_size, counters_in_use);
for (uint64_t i=old_size; i<counters_in_use_size; i++) {
counters_in_use[i] = false;
}
pc_unlock();
assert(old_size < counters_in_use_size);
counters_in_use[old_size] = true;
return old_size;
}
static void free_counter(uint64_t counternum)
// Effect: Free a counter.
// Requires: The pc mutex is held before calling.
{
assert(counternum < counters_in_use_size);
assert(counters_in_use[counternum]);
counters_in_use[counternum] = false;
}
static void destroy_counters (void) {
toku_free(counters_in_use);
counters_in_use=NULL;
counters_in_use_size=0;
}
//******************************************************************************
// Now for the code that actually creates a counter.
//******************************************************************************
PARTITIONED_COUNTER create_partitioned_counter(void)
// Effect: Create a counter, initialized to zero.
{
PARTITIONED_COUNTER MALLOC(result);
PARTITIONED_COUNTER XMALLOC(result);
result->sum_of_dead = 0;
{
int r = pthread_key_create(&result->key, local_destroy_counter);
assert(r==0);
}
result->first = NULL;
result->last = NULL;
result->pc_key = allocate_counter();
result->ll_counter_head.init();
return result;
}
void destroy_partitioned_counter (PARTITIONED_COUNTER pc)
// Effect: Destroy the counter. No operations on that counter are permitted after this.
// Implementation note: Since we have a global lock, we can destroy all the key-specific versions as well.
void destroy_partitioned_counter(PARTITIONED_COUNTER pc)
// Effect: Destroy the counter. No operations on this counter are permitted after.
// Implementation note: Since we have a global lock, we can destroy all the thread-local
// versions as well.
{
pc_lock();
while (pc->first) {
struct local_counter *next = pc->first->next;
assert(pc->first->owner==pc);
toku_free(pc->first);
pc->first = next;
}
{
int r = pthread_key_delete(pc->key);
assert(r==0);
uint64_t pc_key = pc->pc_key;
LinkedListElement<struct local_counter *> *first;
while (pc->ll_counter_head.pop(&first)) {
// We just removed first from the counter list, now we must remove it from the thread-local array.
struct local_counter *lc = first->get_container();
assert(pc == lc->owner_pc);
GrowableArray<struct local_counter *> *tla = lc->thread_local_array;
tla->store_unchecked(pc_key, NULL);
toku_free(lc);
}
toku_free(pc);
free_counter(pc_key);
pc_unlock();
}
void increment_partitioned_counter (PARTITIONED_COUNTER pc, unsigned long amount)
static inline struct local_counter *get_thread_local_counter(uint64_t pc_key, GrowableArray<struct local_counter *> *a)
{
if (pc_key >= a->get_size()) {
return NULL;
} else {
return a->fetch_unchecked(pc_key);
}
}
void increment_partitioned_counter(PARTITIONED_COUNTER pc, uint64_t amount)
// Effect: Increment the counter by amount.
// Requires: No overflows. This is a 64-bit unsigned counter.
// Requires: You may not increment this after a destroy has occured.
{
struct local_counter *CAST_FROM_VOIDP(lc, pthread_getspecific(pc->key));
// Only this thread is allowed to modify thread_local_array, except for setting tla->array[pc_key] to NULL
// when a counter is destroyed (and in that case there should be no race because no other thread should be
// trying to access the same local counter at the same time.
uint64_t pc_key = pc->pc_key;
struct local_counter *lc = get_thread_local_counter(pc_key, &thread_local_array);
if (lc==NULL) {
pc_lock();
MALLOC(lc);
// Set things up so that this thread terminates, the thread-local parts of the counter will be destroyed and merged into their respective counters.
pk_setspecific(thread_destructor_key, "dont care");
XMALLOC(lc);
lc->sum = 0;
HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&lc->sum, sizeof(lc->sum)); // the counter increment is kind of racy.
lc->prev = pc->last;
lc->next = NULL;
lc->owner = pc;
if (pc->first==NULL) {
pc->first = lc;
} else {
pc->last->next = lc;
lc->owner_pc = pc;
lc->thread_local_array = &thread_local_array;
pc_lock(); // Might as well do the malloc without holding the pc lock. But the rest of this work needs the lock.
// Grow the array if needed, filling in NULLs
while (thread_local_array.get_size() <= pc_key) {
thread_local_array.push(NULL);
}
pc->last = lc;
if (pc->first==NULL) pc->first=lc;
int r = pthread_setspecific(pc->key, lc);
assert(r==0);
thread_local_array.store_unchecked(pc_key, lc);
pc->ll_counter_head.insert(&lc->ll_in_counter, lc);
pc_unlock();
}
lc->sum += amount;
}
unsigned long read_partitioned_counter (PARTITIONED_COUNTER pc)
static int sumit(struct local_counter *lc, uint64_t *sum) {
(*sum)+=lc->sum;
return 0;
}
uint64_t read_partitioned_counter(PARTITIONED_COUNTER pc)
// Effect: Return the current value of the counter.
// Implementation note: Sum all the thread-local counts along with the sum_of_the_dead.
{
pc_lock();
unsigned long sum = pc->sum_of_dead;
for (struct local_counter *lc = pc->first; lc; lc=lc->next) {
sum += lc->sum;
}
uint64_t sum = pc->sum_of_dead;
int r = pc->ll_counter_head.iterate<uint64_t *>(sumit, &sum);
assert(r==0);
pc_unlock();
return sum;
}
void partitioned_counters_init(void)
// Effect: Initialize any partitioned counters data structures that must be set up before any partitioned counters run.
{
pk_create(&thread_destructor_key, destroy_thread_local_part_of_partitioned_counters);
}
void partitioned_counters_destroy(void)
// Effect: Destroy any partitioned counters data structures.
{
pk_delete(thread_destructor_key);
destroy_counters();
}
......@@ -27,19 +27,74 @@
// destroy_partitioned_counter Destroy it.
// increment_partitioned_counter Increment it. This is the frequent operation.
// read_partitioned_counter Get the current value. This is infrequent.
// See partitioned_counter.cc for the abstraction function and representation invariant.
//
// The google style guide says to avoid using constructors, and it appears that
// constructors may have broken all the tests, because they called
// pthread_key_create before the key was actually created. So the google style
// guide may have some wisdom there...
//
// This version does not use constructors, essentially reverrting to the google C++ style guide.
//
#include "fttypes.h"
// The old C interface. This required a bunch of explicit ___attribute__((__destructor__)) functions to remember to destroy counters at the end.
typedef struct partitioned_counter *PARTITIONED_COUNTER;
PARTITIONED_COUNTER create_partitioned_counter(void);
// Effect: Create a counter, initialized to zero.
void destroy_partitioned_counter (PARTITIONED_COUNTER);
void destroy_partitioned_counter(PARTITIONED_COUNTER);
// Effect: Destroy the counter. No operations on that counter are permitted after this.
void increment_partitioned_counter (PARTITIONED_COUNTER, unsigned long amount);
void increment_partitioned_counter(PARTITIONED_COUNTER, uint64_t amount);
// Effect: Increment the counter by amount.
// Requires: No overflows. This is a 64-bit unsigned counter.
unsigned long read_partitioned_counter (PARTITIONED_COUNTER);
uint64_t read_partitioned_counter(PARTITIONED_COUNTER);
// Effect: Return the current value of the counter.
void partitioned_counters_init(void);
// Effect: Initialize any partitioned counters data structures that must be set up before any partitioned counters run.
void partitioned_counters_destroy(void);
// Effect: Destroy any partitioned counters data structures.
#if 0
#include <pthread.h>
#include "fttypes.h"
// Used inside the PARTITIONED_COUNTER.
struct linked_list_head {
struct linked_list_element *first;
};
class PARTITIONED_COUNTER {
public:
PARTITIONED_COUNTER(void);
// Effect: Construct a counter, initialized to zero.
~PARTITIONED_COUNTER(void);
// Effect: Destruct the counter.
void increment(uint64_t amount);
// Effect: Increment the counter by amount. This is a 64-bit unsigned counter, and if you overflow it, you will get overflowed results (that is mod 2^64).
// Requires: Don't use this from a static constructor or destructor.
uint64_t read(void);
// Effect: Read the sum.
// Requires: Don't use this from a static constructor or destructor.
private:
uint64_t _sum_of_dead; // The sum of all thread-local counts from threads that have terminated.
pthread_key_t _key; // The pthread_key which gives us the hook to construct and destruct thread-local storage.
struct linked_list_head _ll_counter_head; // A linked list of all the thread-local information for this counter.
// This function is used to destroy the thread-local part of the state when a thread terminates.
// But it's not the destructor for the local part of the counter, it's a destructor on a "dummy" key just so that we get a notification when a thread ends.
friend void destroy_thread_local_part_of_partitioned_counters (void *);
};
#endif
#endif
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: test_partitioned_counter.cc 46088 2012-07-24 17:30:28Z bkuszmaul $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "test.h"
#include "doubly_linked_list.h"
static void check_is_empty (DoublyLinkedList<int> *l) {
LinkedListElement<int> *re;
bool r = l->pop(&re);
assert(!r);
}
static void test_doubly_linked_list (void) {
DoublyLinkedList<int> l;
l.init();
LinkedListElement<int> e0, e1;
l.insert(&e0, 3);
{
LinkedListElement<int> *re;
bool r = l.pop(&re);
assert(r);
assert(re==&e0);
assert(re->get_container()==3);
}
check_is_empty(&l);
l.insert(&e0, 0);
l.insert(&e1, 1);
{
bool in[2]={true,true};
for (int i=0; i<2; i++) {
LinkedListElement<int> *re;
bool r = l.pop(&re);
assert(r);
int v = re->get_container();
assert(v==0 || v==1);
assert(in[v]);
in[v]=false;
}
}
check_is_empty(&l);
}
const int N=100;
bool in[N];
DoublyLinkedList<int> l;
LinkedListElement<int> elts[N];
static void maybe_insert_random(void) {
int x = random()%N;
if (!in[x]) {
if (verbose) printf("I%d ", x);
l.insert(&elts[x], x);
in[x]=true;
}
}
static bool checked[N];
static int check_count;
static int check_is_in(int v, int deadbeef) {
assert(deadbeef=0xdeadbeef);
assert(0<=v && v<N);
assert(!checked[v]);
assert(in[v]);
checked[v]=true;
check_count++;
return 0;
}
static int quit_count=0;
static int quit_early(int v __attribute__((__unused__)), int beefbeef) {
assert(beefbeef=0xdeadbeef);
quit_count++;
if (quit_count==check_count) return check_count;
else return 0;
}
static void check_equal(void) {
check_count=0;
for (int i=0; i<N; i++) checked[i]=false;
{
int r = l.iterate<int>(check_is_in, 0xdeadbeef);
assert(r==0);
}
for (int i=0; i<N; i++) assert(checked[i]==in[i]);
if (check_count>0) {
check_count=1+random()%check_count; // quit after 1 or more iterations
quit_count=0;
int r = l.iterate<int>(quit_early, 0xbeefbeef);
assert(r==check_count);
}
}
static void test_doubly_linked_list_randomly(void) {
l.init();
for (int i=0; i<N; i++) in[i]=false;
for (int i=0; i<N/2; i++) maybe_insert_random();
if (verbose) printf("\n");
for (int i=0; i<N*N; i++) {
int x = random()%N;
if (in[x]) {
if (random()%2==0) {
if (verbose) printf("%dR%d ", i, x);
l.remove(&elts[x]);
in[x]=false;
} else {
LinkedListElement<int> *re;
bool r = l.pop(&re);
assert(r);
int v = re->get_container();
assert(in[v]);
in[v]=false;
if (verbose) printf("%dP%d ", i, v);
}
} else {
l.insert(&elts[x], x);
in[x]=true;
if (verbose) printf("%dI%d ", i, x);
}
check_equal();
}
if (verbose) printf("\n");
LinkedListElement<int> *re;
while (l.pop(&re)) {
int v = re->get_container();
assert(in[v]);
in[v]=false;
if (verbose) printf("P%d ", v);
}
for (int i=0; i<N; i++) assert(!in[i]);
if (verbose) printf("\n");
}
int test_main (int argc, const char *argv[]) {
default_parse_args(argc, argv);
test_doubly_linked_list();
for (int i=0; i<4; i++) {
test_doubly_linked_list_randomly();
}
return 0;
}
......@@ -16,12 +16,16 @@
* alf 16-core server (xeon E5-2665 2.4GHz) sandybridge
*
* mork mindy bradley alf
* 0.3ns 1.07ns 1.27ns 0.58ns to do a ++, but it's got a race in it.
* 28.0ns 20.47ns 18.75ns 39.38ns to do a sync_fetch_and_add().
* 0.4ns 0.29ns 0.71ns 0.19ns to do with a single version of a counter
* 0.33ns 0.69ns 0.18ns pure thread-local variable (no way to add things up)
* 1.22ns 1.07ns 1.27ns 0.61ns to do a ++, but it's got a race in it.
* 27.11ns 20.47ns 18.75ns 34.15ns to do a sync_fetch_and_add().
* 0.26ns 0.29ns 0.71ns 0.19ns to do with a single version of a counter
* 0.35ns 0.33ns 0.69ns 0.18ns pure thread-local variable (no way to add things up)
* 0.76ns 1.50ns 0.35ns partitioned_counter.c (using link-time optimization, otherwise the function all overwhelms everything)
* 2.21ns 3.32ns 0.70ns partitioned_counter.c (using gcc, the C version at r46097, not C++) This one is a little slower because it has an extra branch in it.
*
* Surprisingly, compiling this code without -fPIC doesn't make it any faster (even the pure thread-local variable is the same). -fPIC access to
* thread-local variables look slower since they have a function all, but they don't seem to be any slower in practice. In fact, even the puretl-ptr test
* which simply increments a thread-local pointer is basically the same speed as accessing thread_local variable.
*
* How it works. Each thread has a thread-local counter structure with an integer in it. To increment, we increment the thread-local structure.
* The other operation is to query the counters to get the sum of all the thread-local variables.
......@@ -42,75 +46,100 @@
#include "toku_assert.h"
#include "partitioned_counter.h"
#include "memory.h"
#include "test.h"
// The test code includes the fastest version I could figure out to make, implemented below.
struct counter_s {
bool inited;
int counter;
volatile int counter;
struct counter_s *prev, *next;
int myid;
};
static __thread struct counter_s counter = {false,0, NULL,NULL,0};
static int finished_counter=0; // counter for all threads that are done.
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static struct counter_s *head=NULL, *tail=NULL;
// We use a single mutex for anything complex. We'd like to use a mutex per partitioned counter, but we must cope with the possibility of a race between
// a terminating pthread (which calls destroy_counter()), and a call to the counter destructor. So we use a global mutex.
static pthread_mutex_t pc_mutex = PTHREAD_MUTEX_INITIALIZER;
static struct counter_s *head=NULL;
static pthread_key_t counter_key;
static void destroy_counter (void *counterp) {
static void pc_lock (void)
// Effect: Lock the pc mutex.
{
int r = pthread_mutex_lock(&pc_mutex);
assert(r==0);
}
static void pc_unlock (void)
// Effect: Unlock the pc mutex.
{
int r = pthread_mutex_unlock(&pc_mutex);
assert(r==0);
}
static void destroy_counter (void *counterp)
// Effect: This is the function passed to pthread_key_create that is to run whenever a thread terminates.
// The thread-local part of the counter must be copied into the shared state, and the thread-local part of the counter must be
// removed from the linked list of all thread-local parts.
{
assert((struct counter_s*)counterp==&counter);
{ int r = pthread_mutex_lock(&mutex); assert(r==0); }
pc_lock();
if (counter.prev==NULL) {
assert(head==&counter);
head = counter.next;
} else {
counter.prev->next = counter.next;
}
if (counter.next==NULL) {
assert(tail==&counter);
tail = counter.prev;
} else {
if (counter.next!=NULL) {
counter.next->prev = counter.prev;
}
finished_counter += counter.counter;
HELGRIND_VALGRIND_HG_ENABLE_CHECKING(&counter.counter, sizeof(counter.counter)); // stop ignoring races
//printf("finished counter now %d\n", finished_counter);
{ int r = pthread_mutex_unlock(&mutex); assert(r==0); }
pc_unlock();
}
static int idcounter=0;
static inline void increment (void) {
if (!counter.inited) {
{ int r = pthread_mutex_lock(&mutex); assert(r==0); }
{ int r = pthread_setspecific(counter_key, &counter); assert(r==0); }
counter.prev = tail;
counter.next = NULL;
if (head==NULL) {
head = &counter;
tail = &counter;
} else {
tail->next = &counter;
tail = &counter;
pc_lock();
struct counter_s *cp = &counter;
{ int r = pthread_setspecific(counter_key, cp); assert(r==0); }
cp->prev = NULL;
cp->next = head;
if (head!=NULL) {
head->prev = cp;
}
counter.counter = 0;
counter.inited = true;
counter.myid = idcounter++;
head = cp;
#ifdef __INTEL_COMPILER
__memory_barrier(); // for some reason I don't understand, ICC needs a memory barrier here. -Bradley
#endif
cp->counter = 0;
cp->inited = true;
cp->myid = idcounter++;
HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&counter.counter, sizeof(counter.counter)); // the counter increment is kind of racy.
{ int r = pthread_mutex_unlock(&mutex); assert(r==0); }
pc_unlock();
}
counter.counter++;
}
static int getvals (void) {
{ int r = pthread_mutex_lock(&mutex); assert(r==0); }
pc_lock();
int sum=finished_counter;
for (struct counter_s *p=head; p; p=p->next) {
sum+=p->counter;
}
{ int r = pthread_mutex_unlock(&mutex); assert(r==0); }
pc_unlock();
return sum;
}
/**********************************************************************************/
/* And now for some actual test code. */
/**********************************************************************************/
static const int N=10000000;
static const int T=20;
......@@ -189,7 +218,33 @@ static void timeit (const char *description, void* (*f)(void*)) {
pt_join(threads[i], NULL);
}
gettimeofday(&end, 0);
printf("%-9s Time=%.6fs (%7.3fns per increment)\n", description, tdiff(&start, &end), (1e9*tdiff(&start, &end)/T)/N);
printf("%-10s Time=%.6fs (%7.3fns per increment)\n", description, tdiff(&start, &end), (1e9*tdiff(&start, &end)/T)/N);
}
// Do a measurement where it really is only a pointer dereference to increment the variable, which is thread local.
static void* tl_doit_ptr (void *v) {
volatile uint64_t *p = (uint64_t *)v;
for (int i=0; i<N; i++) {
(*p)++;
}
return v;
}
static void timeit_with_thread_local_pointer (const char *description, void* (*f)(void*)) {
struct timeval start, end;
pthread_t threads[T];
struct { uint64_t values[8] __attribute__((__aligned__(64))); } values[T]; // pad to different cache lines.
gettimeofday(&start, 0);
for (int i=0; i<T; i++) {
values[i].values[0]=0;
pt_create(&threads[i], f, &values[i].values[0]);
}
for (int i=0; i<T; i++) {
pt_join(threads[i], &values[i].values[0]);
}
gettimeofday(&end, 0);
printf("%-10s Time=%.6fs (%7.3fns per increment)\n", description, tdiff(&start, &end), (1e9*tdiff(&start, &end)/T)/N);
}
static int verboseness_cmdarg=0;
......@@ -211,93 +266,116 @@ static void parse_args (int argc, const char *argv[]) {
static void do_timeit (void) {
{ int r = pthread_key_create(&counter_key, destroy_counter); assert(r==0); }
pc = create_partitioned_counter();
printf("%d threads\n%d increments per thread\n", T, N);
timeit("++", old_doit_nonatomic);
timeit("atomic++", old_doit);
timeit("fast", new_doit);
timeit("puretl", tl_doit);
timeit_with_thread_local_pointer("puretl-ptr", tl_doit_ptr);
pc = create_partitioned_counter();
timeit("pc", pc_doit);
destroy_partitioned_counter(pc);
}
struct test_arguments {
PARTITIONED_COUNTER pc;
unsigned long limit;
unsigned long total_increment_per_writer;
volatile unsigned long unfinished_count;
uint64_t limit;
uint64_t total_increment_per_writer;
volatile uint64_t unfinished_count;
};
static void *reader_test_fun (void *ta_v) {
struct test_arguments *ta = (struct test_arguments *)ta_v;
unsigned long lastval = 0;
printf("reader starting\n");
uint64_t lastval = 0;
while (ta->unfinished_count>0) {
unsigned long thisval = read_partitioned_counter(ta->pc);
uint64_t thisval = read_partitioned_counter(ta->pc);
assert(lastval <= thisval);
assert(thisval <= ta->limit);
lastval = thisval;
if (verboseness_cmdarg && (0==(thisval & (thisval-1)))) printf("Thisval=%ld\n", thisval);
if (verboseness_cmdarg && (0==(thisval & (thisval-1)))) printf("ufc=%ld Thisval=%ld\n", ta->unfinished_count,thisval);
}
unsigned long thisval = read_partitioned_counter(ta->pc);
uint64_t thisval = read_partitioned_counter(ta->pc);
assert(thisval==ta->limit);
return ta_v;
}
static void *writer_test_fun (void *ta_v) {
struct test_arguments *ta = (struct test_arguments *)ta_v;
printf("writer starting\n");
for (unsigned long i=0; i<ta->total_increment_per_writer; i++) {
for (uint64_t i=0; i<ta->total_increment_per_writer; i++) {
if (i%1000 == 0) sched_yield();
increment_partitioned_counter(ta->pc, 1);
}
printf("writer done\n");
__sync_fetch_and_sub(&ta->unfinished_count, 1);
uint64_t c __attribute__((__unused__)) = __sync_fetch_and_sub(&ta->unfinished_count, 1);
return ta_v;
}
static void do_testit (void) {
const int NGROUPS = 2;
PARTITIONED_COUNTER pcs[NGROUPS];
unsigned long limits[NGROUPS];
uint64_t limits[NGROUPS];
limits [0] = 2000000;
limits [1] = 1000000;
unsigned long n_writers[NGROUPS];
uint64_t n_writers[NGROUPS];
n_writers[0] = 20;
n_writers[1] = 40;
struct test_arguments tas[NGROUPS];
pthread_t reader_threads[NGROUPS];
pthread_t *writer_threads[NGROUPS];
for (int i=0; i<NGROUPS; i++) {
pcs[i] = create_partitioned_counter();
tas[i].pc = pcs[i];
tas[i].pc = create_partitioned_counter();
tas[i].limit = limits[i];
tas[i].unfinished_count = n_writers[i];
tas[i].total_increment_per_writer = limits[i]/n_writers[i];
assert(tas[i].total_increment_per_writer * n_writers[i] == limits[i]);
pt_create(&reader_threads[i], reader_test_fun, &tas[i]);
MALLOC_N(n_writers[i], writer_threads[i]);
for (unsigned long j=0; j<n_writers[i] ; j++) {
for (uint64_t j=0; j<n_writers[i] ; j++) {
pt_create(&writer_threads[i][j], writer_test_fun, &tas[i]);
}
}
for (int i=0; i<NGROUPS; i++) {
pt_join(reader_threads[i], &tas[i]);
for (unsigned long j=0; j<n_writers[i] ; j++) {
for (uint64_t j=0; j<n_writers[i] ; j++) {
pt_join(writer_threads[i][j], &tas[i]);
}
toku_free(writer_threads[i]);
destroy_partitioned_counter(pcs[i]);
destroy_partitioned_counter(tas[i].pc);
}
}
int main (int argc, const char *argv[]) {
volatile int spinwait=0;
static void* test2_fun (void* mypc_v) {
PARTITIONED_COUNTER mypc = (PARTITIONED_COUNTER)mypc_v;
increment_partitioned_counter(mypc, 3);
spinwait=1;
while (spinwait==1);
// mypc no longer points at a valid data structure.
return NULL;
}
static void do_testit2 (void)
// This test checks to see what happens if a thread is still live when we destruct a counter.
// A thread increments the counter, then lets us know through a spin wait, then waits until we destroy the counter.
{
pthread_t t;
{
PARTITIONED_COUNTER mypc = create_partitioned_counter();
pt_create(&t, test2_fun, mypc);
while(spinwait==0); // wait until he incremented the counter.
assert(read_partitioned_counter(mypc)==3);
destroy_partitioned_counter(mypc);
} // leave scope, so the counter goes away.
spinwait=2; // tell the other guy to finish up.
pt_join(t, NULL);
}
int test_main (int argc, const char *argv[]) {
parse_args(argc, argv);
if (time_cmdarg) {
do_timeit();
} else {
do_testit();
do_testit2();
}
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment