Commit 78362ced authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

closes #5464 merge partitioned counters as ft-ops.cc status variables to main

git-svn-id: file:///svn/toku/tokudb@47804 c7de825b-a66e-492c-adef-691d508d4ae1
parent 921abd7d
...@@ -627,7 +627,9 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) { ...@@ -627,7 +627,9 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) {
printf(" UINT64, // interpret as uint64_t \n"); printf(" UINT64, // interpret as uint64_t \n");
printf(" CHARSTR, // interpret as char * \n"); printf(" CHARSTR, // interpret as char * \n");
printf(" UNIXTIME, // interpret as time_t \n"); printf(" UNIXTIME, // interpret as time_t \n");
printf(" TOKUTIME // interpret as tokutime_t \n"); printf(" TOKUTIME, // interpret as tokutime_t \n");
printf(" PARCOUNT, // interpret as PARTITIONED_COUNTER\n");
printf(" MAXCOUNT // interpret as MAX_PARTITIONED_COUNTER\n");
printf("} toku_engine_status_display_type; \n"); printf("} toku_engine_status_display_type; \n");
printf("typedef struct __toku_engine_status_row {\n"); printf("typedef struct __toku_engine_status_row {\n");
...@@ -637,6 +639,8 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) { ...@@ -637,6 +639,8 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) {
printf(" union { \n"); printf(" union { \n");
printf(" uint64_t num; \n"); printf(" uint64_t num; \n");
printf(" const char * str; \n"); printf(" const char * str; \n");
printf(" struct partitioned_counter *parcount;\n");
printf(" struct max_partitioned_counter *maxcount;\n");
printf(" } value; \n"); printf(" } value; \n");
printf("} * TOKU_ENGINE_STATUS_ROW, TOKU_ENGINE_STATUS_ROW_S; \n"); printf("} * TOKU_ENGINE_STATUS_ROW, TOKU_ENGINE_STATUS_ROW_S; \n");
......
...@@ -953,9 +953,9 @@ typedef enum { ...@@ -953,9 +953,9 @@ typedef enum {
FT_PARTIAL_EVICTIONS_NONLEAF, // number of nonleaf node partial evictions FT_PARTIAL_EVICTIONS_NONLEAF, // number of nonleaf node partial evictions
FT_PARTIAL_EVICTIONS_LEAF, // number of leaf node partial evictions FT_PARTIAL_EVICTIONS_LEAF, // number of leaf node partial evictions
FT_MSN_DISCARDS, // how many messages were ignored by leaf because of msn FT_MSN_DISCARDS, // how many messages were ignored by leaf because of msn
FT_MAX_WORKDONE, // max workdone value of any buffer //FT_MAX_WORKDONE, // max workdone value of any buffer
FT_TOTAL_RETRIES, // total number of search retries due to TRY_AGAIN FT_TOTAL_RETRIES, // total number of search retries due to TRY_AGAIN
FT_MAX_SEARCH_EXCESS_RETRIES, // max number of excess search retries (retries - treeheight) due to TRY_AGAIN //FT_MAX_SEARCH_EXCESS_RETRIES, // max number of excess search retries (retries - treeheight) due to TRY_AGAIN
FT_SEARCH_TRIES_GT_HEIGHT, // number of searches that required more tries than the height of the tree FT_SEARCH_TRIES_GT_HEIGHT, // number of searches that required more tries than the height of the tree
FT_SEARCH_TRIES_GT_HEIGHTPLUS3, // number of searches that required more tries than the height of the tree plus three FT_SEARCH_TRIES_GT_HEIGHTPLUS3, // number of searches that required more tries than the height of the tree plus three
FT_DISK_FLUSH_LEAF, // number of leaf nodes flushed to disk, not for checkpoint FT_DISK_FLUSH_LEAF, // number of leaf nodes flushed to disk, not for checkpoint
...@@ -969,7 +969,7 @@ typedef enum { ...@@ -969,7 +969,7 @@ typedef enum {
FT_MSG_BYTES_IN, // how many bytes of messages injected at root (for all trees) FT_MSG_BYTES_IN, // how many bytes of messages injected at root (for all trees)
FT_MSG_BYTES_OUT, // how many bytes of messages flushed from h1 nodes to leaves FT_MSG_BYTES_OUT, // how many bytes of messages flushed from h1 nodes to leaves
FT_MSG_BYTES_CURR, // how many bytes of messages currently in trees (estimate) FT_MSG_BYTES_CURR, // how many bytes of messages currently in trees (estimate)
FT_MSG_BYTES_MAX, // how many bytes of messages currently in trees (estimate) //FT_MSG_BYTES_MAX, // how many bytes of messages currently in trees (estimate)
FT_MSG_NUM, // how many messages injected at root FT_MSG_NUM, // how many messages injected at root
FT_MSG_NUM_BROADCAST, // how many broadcast messages injected at root FT_MSG_NUM_BROADCAST, // how many broadcast messages injected at root
FT_NUM_BASEMENTS_DECOMPRESSED_NORMAL, // how many basement nodes were decompressed because they were the target of a query FT_NUM_BASEMENTS_DECOMPRESSED_NORMAL, // how many basement nodes were decompressed because they were the target of a query
......
...@@ -146,11 +146,14 @@ static const uint32_t this_version = FT_LAYOUT_VERSION; ...@@ -146,11 +146,14 @@ static const uint32_t this_version = FT_LAYOUT_VERSION;
*/ */
static FT_STATUS_S ft_status; static FT_STATUS_S ft_status;
#define STATUS_INIT(k,t,l) { \ #define STATUS_INIT(k,t,l) do { \
ft_status.status[k].keyname = #k; \ ft_status.status[k].keyname = #k; \
ft_status.status[k].type = t; \ ft_status.status[k].type = t; \
ft_status.status[k].legend = "brt: " l; \ ft_status.status[k].legend = "brt: " l; \
} if (t == PARCOUNT) { \
ft_status.status[k].value.parcount = create_partitioned_counter(); \
} \
} while (0)
static toku_mutex_t ft_open_close_lock; static toku_mutex_t ft_open_close_lock;
...@@ -159,64 +162,68 @@ status_init(void) ...@@ -159,64 +162,68 @@ status_init(void)
{ {
// Note, this function initializes the keyname, type, and legend fields. // Note, this function initializes the keyname, type, and legend fields.
// Value fields are initialized to zero by compiler. // Value fields are initialized to zero by compiler.
STATUS_INIT(FT_UPDATES, UINT64, "dictionary updates"); STATUS_INIT(FT_UPDATES, PARCOUNT, "dictionary updates");
STATUS_INIT(FT_UPDATES_BROADCAST, UINT64, "dictionary broadcast updates"); STATUS_INIT(FT_UPDATES_BROADCAST, PARCOUNT, "dictionary broadcast updates");
STATUS_INIT(FT_DESCRIPTOR_SET, UINT64, "descriptor set"); STATUS_INIT(FT_DESCRIPTOR_SET, PARCOUNT, "descriptor set");
STATUS_INIT(FT_PARTIAL_EVICTIONS_NONLEAF, UINT64, "nonleaf node partial evictions"); STATUS_INIT(FT_PARTIAL_EVICTIONS_NONLEAF, PARCOUNT, "nonleaf node partial evictions");
STATUS_INIT(FT_PARTIAL_EVICTIONS_LEAF, UINT64, "leaf node partial evictions"); STATUS_INIT(FT_PARTIAL_EVICTIONS_LEAF, PARCOUNT, "leaf node partial evictions");
STATUS_INIT(FT_MSN_DISCARDS, UINT64, "messages ignored by leaf due to msn"); STATUS_INIT(FT_MSN_DISCARDS, PARCOUNT, "messages ignored by leaf due to msn");
STATUS_INIT(FT_MAX_WORKDONE, UINT64, "max workdone over all buffers"); //STATUS_INIT(FT_MAX_WORKDONE, UINT64, "max workdone over all buffers");
STATUS_INIT(FT_TOTAL_RETRIES, UINT64, "total search retries due to TRY_AGAIN"); STATUS_INIT(FT_TOTAL_RETRIES, PARCOUNT, "total search retries due to TRY_AGAIN");
STATUS_INIT(FT_MAX_SEARCH_EXCESS_RETRIES, UINT64, "max excess search retries (retries - tree height) due to TRY_AGAIN"); //STATUS_INIT(FT_MAX_SEARCH_EXCESS_RETRIES, UINT64, "max excess search retries (retries - tree height) due to TRY_AGAIN");
STATUS_INIT(FT_SEARCH_TRIES_GT_HEIGHT, UINT64, "searches requiring more tries than the height of the tree"); STATUS_INIT(FT_SEARCH_TRIES_GT_HEIGHT, PARCOUNT, "searches requiring more tries than the height of the tree");
STATUS_INIT(FT_SEARCH_TRIES_GT_HEIGHTPLUS3, UINT64, "searches requiring more tries than the height of the tree plus three"); STATUS_INIT(FT_SEARCH_TRIES_GT_HEIGHTPLUS3, PARCOUNT, "searches requiring more tries than the height of the tree plus three");
STATUS_INIT(FT_DISK_FLUSH_LEAF, UINT64, "leaf nodes flushed to disk (not for checkpoint)"); STATUS_INIT(FT_DISK_FLUSH_LEAF, PARCOUNT, "leaf nodes flushed to disk (not for checkpoint)");
STATUS_INIT(FT_DISK_FLUSH_NONLEAF, UINT64, "nonleaf nodes flushed to disk (not for checkpoint)"); STATUS_INIT(FT_DISK_FLUSH_NONLEAF, PARCOUNT, "nonleaf nodes flushed to disk (not for checkpoint)");
STATUS_INIT(FT_DISK_FLUSH_LEAF_FOR_CHECKPOINT, UINT64, "leaf nodes flushed to disk (for checkpoint)"); STATUS_INIT(FT_DISK_FLUSH_LEAF_FOR_CHECKPOINT, PARCOUNT, "leaf nodes flushed to disk (for checkpoint)");
STATUS_INIT(FT_DISK_FLUSH_NONLEAF_FOR_CHECKPOINT, UINT64, "nonleaf nodes flushed to disk (for checkpoint)"); STATUS_INIT(FT_DISK_FLUSH_NONLEAF_FOR_CHECKPOINT, PARCOUNT, "nonleaf nodes flushed to disk (for checkpoint)");
STATUS_INIT(FT_CREATE_LEAF, UINT64, "leaf nodes created"); STATUS_INIT(FT_CREATE_LEAF, PARCOUNT, "leaf nodes created");
STATUS_INIT(FT_CREATE_NONLEAF, UINT64, "nonleaf nodes created"); STATUS_INIT(FT_CREATE_NONLEAF, PARCOUNT, "nonleaf nodes created");
STATUS_INIT(FT_DESTROY_LEAF, UINT64, "leaf nodes destroyed"); STATUS_INIT(FT_DESTROY_LEAF, PARCOUNT, "leaf nodes destroyed");
STATUS_INIT(FT_DESTROY_NONLEAF, UINT64, "nonleaf nodes destroyed"); STATUS_INIT(FT_DESTROY_NONLEAF, PARCOUNT, "nonleaf nodes destroyed");
STATUS_INIT(FT_MSG_BYTES_IN, UINT64, "bytes of messages injected at root (all trees)"); STATUS_INIT(FT_MSG_BYTES_IN, PARCOUNT, "bytes of messages injected at root (all trees)");
STATUS_INIT(FT_MSG_BYTES_OUT, UINT64, "bytes of messages flushed from h1 nodes to leaves"); STATUS_INIT(FT_MSG_BYTES_OUT, PARCOUNT, "bytes of messages flushed from h1 nodes to leaves");
STATUS_INIT(FT_MSG_BYTES_CURR, UINT64, "bytes of messages currently in trees (estimate)"); STATUS_INIT(FT_MSG_BYTES_CURR, PARCOUNT, "bytes of messages currently in trees (estimate)");
STATUS_INIT(FT_MSG_BYTES_MAX, UINT64, "max bytes of messages ever in trees (estimate)"); //STATUS_INIT(FT_MSG_BYTES_MAX, UINT64, "max bytes of messages ever in trees (estimate)");
STATUS_INIT(FT_MSG_NUM, UINT64, "messages injected at root"); STATUS_INIT(FT_MSG_NUM, PARCOUNT, "messages injected at root");
STATUS_INIT(FT_MSG_NUM_BROADCAST, UINT64, "broadcast messages injected at root"); STATUS_INIT(FT_MSG_NUM_BROADCAST, PARCOUNT, "broadcast messages injected at root");
STATUS_INIT(FT_NUM_BASEMENTS_DECOMPRESSED_NORMAL, UINT64, "basements decompressed as a target of a query"); STATUS_INIT(FT_NUM_BASEMENTS_DECOMPRESSED_NORMAL, PARCOUNT, "basements decompressed as a target of a query");
STATUS_INIT(FT_NUM_BASEMENTS_DECOMPRESSED_AGGRESSIVE, UINT64, "basements decompressed for prelocked range"); STATUS_INIT(FT_NUM_BASEMENTS_DECOMPRESSED_AGGRESSIVE, PARCOUNT, "basements decompressed for prelocked range");
STATUS_INIT(FT_NUM_BASEMENTS_DECOMPRESSED_PREFETCH, UINT64, "basements decompressed for prefetch"); STATUS_INIT(FT_NUM_BASEMENTS_DECOMPRESSED_PREFETCH, PARCOUNT, "basements decompressed for prefetch");
STATUS_INIT(FT_NUM_BASEMENTS_DECOMPRESSED_WRITE, UINT64, "basements decompressed for write"); STATUS_INIT(FT_NUM_BASEMENTS_DECOMPRESSED_WRITE, PARCOUNT, "basements decompressed for write");
STATUS_INIT(FT_NUM_MSG_BUFFER_DECOMPRESSED_NORMAL, UINT64, "buffers decompressed as a target of a query"); STATUS_INIT(FT_NUM_MSG_BUFFER_DECOMPRESSED_NORMAL, PARCOUNT, "buffers decompressed as a target of a query");
STATUS_INIT(FT_NUM_MSG_BUFFER_DECOMPRESSED_AGGRESSIVE, UINT64, "buffers decompressed for prelocked range"); STATUS_INIT(FT_NUM_MSG_BUFFER_DECOMPRESSED_AGGRESSIVE, PARCOUNT, "buffers decompressed for prelocked range");
STATUS_INIT(FT_NUM_MSG_BUFFER_DECOMPRESSED_PREFETCH, UINT64, "buffers decompressed for prefetch"); STATUS_INIT(FT_NUM_MSG_BUFFER_DECOMPRESSED_PREFETCH, PARCOUNT, "buffers decompressed for prefetch");
STATUS_INIT(FT_NUM_MSG_BUFFER_DECOMPRESSED_WRITE, UINT64, "buffers decompressed for write"); STATUS_INIT(FT_NUM_MSG_BUFFER_DECOMPRESSED_WRITE, PARCOUNT, "buffers decompressed for write");
STATUS_INIT(FT_NUM_PIVOTS_FETCHED_QUERY, UINT64, "pivots fetched for query"); STATUS_INIT(FT_NUM_PIVOTS_FETCHED_QUERY, PARCOUNT, "pivots fetched for query");
STATUS_INIT(FT_NUM_PIVOTS_FETCHED_PREFETCH, UINT64, "pivots fetched for prefetch"); STATUS_INIT(FT_NUM_PIVOTS_FETCHED_PREFETCH, PARCOUNT, "pivots fetched for prefetch");
STATUS_INIT(FT_NUM_PIVOTS_FETCHED_WRITE, UINT64, "pivots fetched for write"); STATUS_INIT(FT_NUM_PIVOTS_FETCHED_WRITE, PARCOUNT, "pivots fetched for write");
STATUS_INIT(FT_NUM_BASEMENTS_FETCHED_NORMAL, UINT64, "basements fetched as a target of a query"); STATUS_INIT(FT_NUM_BASEMENTS_FETCHED_NORMAL, PARCOUNT, "basements fetched as a target of a query");
STATUS_INIT(FT_NUM_BASEMENTS_FETCHED_AGGRESSIVE, UINT64, "basements fetched for prelocked range"); STATUS_INIT(FT_NUM_BASEMENTS_FETCHED_AGGRESSIVE, PARCOUNT, "basements fetched for prelocked range");
STATUS_INIT(FT_NUM_BASEMENTS_FETCHED_PREFETCH, UINT64, "basements fetched for prefetch"); STATUS_INIT(FT_NUM_BASEMENTS_FETCHED_PREFETCH, PARCOUNT, "basements fetched for prefetch");
STATUS_INIT(FT_NUM_BASEMENTS_FETCHED_WRITE, UINT64, "basements fetched for write"); STATUS_INIT(FT_NUM_BASEMENTS_FETCHED_WRITE, PARCOUNT, "basements fetched for write");
STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_NORMAL, UINT64, "buffers fetched as a target of a query"); STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_NORMAL, PARCOUNT, "buffers fetched as a target of a query");
STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE, UINT64, "buffers fetched for prelocked range"); STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE, PARCOUNT, "buffers fetched for prelocked range");
STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_PREFETCH, UINT64, "buffers fetched for prefetch"); STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_PREFETCH, PARCOUNT, "buffers fetched for prefetch");
STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_WRITE, UINT64, "buffers fetched for write"); STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_WRITE, PARCOUNT, "buffers fetched for write");
ft_status.initialized = true; ft_status.initialized = true;
} }
static void status_destroy(void) {
for (int i = 0; i < FT_STATUS_NUM_ROWS; ++i) {
if (ft_status.status[i].type == PARCOUNT) {
destroy_partitioned_counter(ft_status.status[i].value.parcount);
}
}
}
#undef STATUS_INIT #undef STATUS_INIT
void void
toku_ft_get_status(FT_STATUS s) { toku_ft_get_status(FT_STATUS s) {
if (!ft_status.initialized) {
status_init();
}
*s = ft_status; *s = ft_status;
} }
#define STATUS_VALUE(x) ft_status.status[x].value.num #define STATUS_INC(x, d) increment_partitioned_counter(ft_status.status[x].value.parcount, d)
bool is_entire_node_in_memory(FTNODE node) { bool is_entire_node_in_memory(FTNODE node) {
for (int i = 0; i < node->n_children; i++) { for (int i = 0; i < node->n_children; i++) {
...@@ -585,18 +592,18 @@ toku_get_and_clear_basement_stats(FTNODE leafnode) { ...@@ -585,18 +592,18 @@ toku_get_and_clear_basement_stats(FTNODE leafnode) {
static void ft_status_update_flush_reason(FTNODE node, bool for_checkpoint) { static void ft_status_update_flush_reason(FTNODE node, bool for_checkpoint) {
if (node->height == 0) { if (node->height == 0) {
if (for_checkpoint) { if (for_checkpoint) {
__sync_fetch_and_add(&STATUS_VALUE(FT_DISK_FLUSH_LEAF_FOR_CHECKPOINT), 1); STATUS_INC(FT_DISK_FLUSH_LEAF_FOR_CHECKPOINT, 1);
} }
else { else {
__sync_fetch_and_add(&STATUS_VALUE(FT_DISK_FLUSH_LEAF), 1); STATUS_INC(FT_DISK_FLUSH_LEAF, 1);
} }
} }
else { else {
if (for_checkpoint) { if (for_checkpoint) {
__sync_fetch_and_add(&STATUS_VALUE(FT_DISK_FLUSH_NONLEAF_FOR_CHECKPOINT), 1); STATUS_INC(FT_DISK_FLUSH_NONLEAF_FOR_CHECKPOINT, 1);
} }
else { else {
__sync_fetch_and_add(&STATUS_VALUE(FT_DISK_FLUSH_NONLEAF), 1); STATUS_INC(FT_DISK_FLUSH_NONLEAF, 1);
} }
} }
} }
...@@ -755,11 +762,11 @@ void ...@@ -755,11 +762,11 @@ void
toku_ft_status_update_pivot_fetch_reason(struct ftnode_fetch_extra *bfe) toku_ft_status_update_pivot_fetch_reason(struct ftnode_fetch_extra *bfe)
{ {
if (bfe->type == ftnode_fetch_prefetch) { if (bfe->type == ftnode_fetch_prefetch) {
STATUS_VALUE(FT_NUM_PIVOTS_FETCHED_PREFETCH)++; STATUS_INC(FT_NUM_PIVOTS_FETCHED_PREFETCH, 1);
} else if (bfe->type == ftnode_fetch_all) { } else if (bfe->type == ftnode_fetch_all) {
STATUS_VALUE(FT_NUM_PIVOTS_FETCHED_WRITE)++; STATUS_INC(FT_NUM_PIVOTS_FETCHED_WRITE, 1);
} else if (bfe->type == ftnode_fetch_subset) { } else if (bfe->type == ftnode_fetch_subset) {
STATUS_VALUE(FT_NUM_PIVOTS_FETCHED_QUERY)++; STATUS_INC(FT_NUM_PIVOTS_FETCHED_QUERY, 1);
} }
} }
...@@ -892,7 +899,7 @@ int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR* ...@@ -892,7 +899,7 @@ int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR*
for (int i = 0; i < node->n_children; i++) { for (int i = 0; i < node->n_children; i++) {
if (BP_STATE(node,i) == PT_AVAIL) { if (BP_STATE(node,i) == PT_AVAIL) {
if (BP_SHOULD_EVICT(node,i)) { if (BP_SHOULD_EVICT(node,i)) {
STATUS_VALUE(FT_PARTIAL_EVICTIONS_NONLEAF)++; STATUS_INC(FT_PARTIAL_EVICTIONS_NONLEAF, 1);
cilk_spawn compress_internal_node_partition(node, i, ft->h->compression_method); cilk_spawn compress_internal_node_partition(node, i, ft->h->compression_method);
} }
else { else {
...@@ -914,7 +921,7 @@ int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR* ...@@ -914,7 +921,7 @@ int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR*
for (int i = 0; i < node->n_children; i++) { for (int i = 0; i < node->n_children; i++) {
// Get rid of compressed stuff no matter what. // Get rid of compressed stuff no matter what.
if (BP_STATE(node,i) == PT_COMPRESSED) { if (BP_STATE(node,i) == PT_COMPRESSED) {
STATUS_VALUE(FT_PARTIAL_EVICTIONS_LEAF)++; STATUS_INC(FT_PARTIAL_EVICTIONS_LEAF, 1);
SUB_BLOCK sb = BSB(node, i); SUB_BLOCK sb = BSB(node, i);
toku_free(sb->compressed_ptr); toku_free(sb->compressed_ptr);
toku_free(sb); toku_free(sb);
...@@ -923,7 +930,7 @@ int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR* ...@@ -923,7 +930,7 @@ int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATTR*
} }
else if (BP_STATE(node,i) == PT_AVAIL) { else if (BP_STATE(node,i) == PT_AVAIL) {
if (BP_SHOULD_EVICT(node,i)) { if (BP_SHOULD_EVICT(node,i)) {
STATUS_VALUE(FT_PARTIAL_EVICTIONS_LEAF)++; STATUS_INC(FT_PARTIAL_EVICTIONS_LEAF, 1);
toku_evict_bn_from_memory(node, i, ft); toku_evict_bn_from_memory(node, i, ft);
} }
else { else {
...@@ -1031,54 +1038,54 @@ ft_status_update_partial_fetch_reason( ...@@ -1031,54 +1038,54 @@ ft_status_update_partial_fetch_reason(
if (is_leaf) { if (is_leaf) {
if (bfe->type == ftnode_fetch_prefetch) { if (bfe->type == ftnode_fetch_prefetch) {
if (state == PT_COMPRESSED) { if (state == PT_COMPRESSED) {
STATUS_VALUE(FT_NUM_BASEMENTS_DECOMPRESSED_PREFETCH)++; STATUS_INC(FT_NUM_BASEMENTS_DECOMPRESSED_PREFETCH, 1);
} else { } else {
STATUS_VALUE(FT_NUM_BASEMENTS_FETCHED_PREFETCH)++; STATUS_INC(FT_NUM_BASEMENTS_FETCHED_PREFETCH, 1);
} }
} else if (bfe->type == ftnode_fetch_all) { } else if (bfe->type == ftnode_fetch_all) {
if (state == PT_COMPRESSED) { if (state == PT_COMPRESSED) {
STATUS_VALUE(FT_NUM_BASEMENTS_DECOMPRESSED_WRITE)++; STATUS_INC(FT_NUM_BASEMENTS_DECOMPRESSED_WRITE, 1);
} else { } else {
STATUS_VALUE(FT_NUM_BASEMENTS_FETCHED_WRITE)++; STATUS_INC(FT_NUM_BASEMENTS_FETCHED_WRITE, 1);
} }
} else if (i == bfe->child_to_read) { } else if (i == bfe->child_to_read) {
if (state == PT_COMPRESSED) { if (state == PT_COMPRESSED) {
STATUS_VALUE(FT_NUM_BASEMENTS_DECOMPRESSED_NORMAL)++; STATUS_INC(FT_NUM_BASEMENTS_DECOMPRESSED_NORMAL, 1);
} else { } else {
STATUS_VALUE(FT_NUM_BASEMENTS_FETCHED_NORMAL)++; STATUS_INC(FT_NUM_BASEMENTS_FETCHED_NORMAL, 1);
} }
} else { } else {
if (state == PT_COMPRESSED) { if (state == PT_COMPRESSED) {
STATUS_VALUE(FT_NUM_BASEMENTS_DECOMPRESSED_AGGRESSIVE)++; STATUS_INC(FT_NUM_BASEMENTS_DECOMPRESSED_AGGRESSIVE, 1);
} else { } else {
STATUS_VALUE(FT_NUM_BASEMENTS_FETCHED_AGGRESSIVE)++; STATUS_INC(FT_NUM_BASEMENTS_FETCHED_AGGRESSIVE, 1);
} }
} }
} }
else { else {
if (bfe->type == ftnode_fetch_prefetch) { if (bfe->type == ftnode_fetch_prefetch) {
if (state == PT_COMPRESSED) { if (state == PT_COMPRESSED) {
STATUS_VALUE(FT_NUM_MSG_BUFFER_DECOMPRESSED_PREFETCH)++; STATUS_INC(FT_NUM_MSG_BUFFER_DECOMPRESSED_PREFETCH, 1);
} else { } else {
STATUS_VALUE(FT_NUM_MSG_BUFFER_FETCHED_PREFETCH)++; STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_PREFETCH, 1);
} }
} else if (bfe->type == ftnode_fetch_all) { } else if (bfe->type == ftnode_fetch_all) {
if (state == PT_COMPRESSED) { if (state == PT_COMPRESSED) {
STATUS_VALUE(FT_NUM_MSG_BUFFER_DECOMPRESSED_WRITE)++; STATUS_INC(FT_NUM_MSG_BUFFER_DECOMPRESSED_WRITE, 1);
} else { } else {
STATUS_VALUE(FT_NUM_MSG_BUFFER_FETCHED_WRITE)++; STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_WRITE, 1);
} }
} else if (i == bfe->child_to_read) { } else if (i == bfe->child_to_read) {
if (state == PT_COMPRESSED) { if (state == PT_COMPRESSED) {
STATUS_VALUE(FT_NUM_MSG_BUFFER_DECOMPRESSED_NORMAL)++; STATUS_INC(FT_NUM_MSG_BUFFER_DECOMPRESSED_NORMAL, 1);
} else { } else {
STATUS_VALUE(FT_NUM_MSG_BUFFER_FETCHED_NORMAL)++; STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_NORMAL, 1);
} }
} else { } else {
if (state == PT_COMPRESSED) { if (state == PT_COMPRESSED) {
STATUS_VALUE(FT_NUM_MSG_BUFFER_DECOMPRESSED_AGGRESSIVE)++; STATUS_INC(FT_NUM_MSG_BUFFER_DECOMPRESSED_AGGRESSIVE, 1);
} else { } else {
STATUS_VALUE(FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE)++; STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE, 1);
} }
} }
} }
...@@ -1223,9 +1230,9 @@ void toku_ftnode_free (FTNODE *nodep) { ...@@ -1223,9 +1230,9 @@ void toku_ftnode_free (FTNODE *nodep) {
toku_mempool_destroy(mp); toku_mempool_destroy(mp);
} }
} }
STATUS_VALUE(FT_DESTROY_LEAF)++; STATUS_INC(FT_DESTROY_LEAF, 1);
} else { } else {
STATUS_VALUE(FT_DESTROY_NONLEAF)++; STATUS_INC(FT_DESTROY_NONLEAF, 1);
} }
toku_destroy_ftnode_internals(node); toku_destroy_ftnode_internals(node);
toku_free(node); toku_free(node);
...@@ -1240,9 +1247,9 @@ toku_initialize_empty_ftnode (FTNODE n, BLOCKNUM nodename, int height, int num_c ...@@ -1240,9 +1247,9 @@ toku_initialize_empty_ftnode (FTNODE n, BLOCKNUM nodename, int height, int num_c
assert(height >= 0); assert(height >= 0);
if (height == 0) if (height == 0)
STATUS_VALUE(FT_CREATE_LEAF)++; STATUS_INC(FT_CREATE_LEAF, 1);
else else
STATUS_VALUE(FT_CREATE_NONLEAF)++; STATUS_INC(FT_CREATE_NONLEAF, 1);
n->max_msn_applied_to_node_on_disk = ZERO_MSN; // correct value for root node, harmless for others n->max_msn_applied_to_node_on_disk = ZERO_MSN; // correct value for root node, harmless for others
n->flags = flags; n->flags = flags;
...@@ -1473,9 +1480,10 @@ toku_ft_bn_apply_cmd_once ( ...@@ -1473,9 +1480,10 @@ toku_ft_bn_apply_cmd_once (
} }
} }
if (workdone) { // test programs may call with NULL if (workdone) { // test programs may call with NULL
uint64_t new_workdone = __sync_add_and_fetch(workdone, workdone_this_le); //uint64_t new_workdone =
if (new_workdone > STATUS_VALUE(FT_MAX_WORKDONE)) (void) __sync_add_and_fetch(workdone, workdone_this_le);
STATUS_VALUE(FT_MAX_WORKDONE) = new_workdone; //if (new_workdone > STATUS_VALUE(FT_MAX_WORKDONE))
// STATUS_VALUE(FT_MAX_WORKDONE) = new_workdone;
} }
// if we created a new mempool buffer, free the old one // if we created a new mempool buffer, free the old one
...@@ -1561,7 +1569,7 @@ static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn ...@@ -1561,7 +1569,7 @@ static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn
if (cmd->type == FT_UPDATE) { if (cmd->type == FT_UPDATE) {
// key is passed in with command (should be same as from le) // key is passed in with command (should be same as from le)
// update function extra is passed in with command // update function extra is passed in with command
STATUS_VALUE(FT_UPDATES)++; STATUS_INC(FT_UPDATES, 1);
keyp = cmd->u.id.key; keyp = cmd->u.id.key;
update_function_extra = cmd->u.id.val; update_function_extra = cmd->u.id.val;
} else if (cmd->type == FT_UPDATE_BROADCAST_ALL) { } else if (cmd->type == FT_UPDATE_BROADCAST_ALL) {
...@@ -1570,7 +1578,7 @@ static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn ...@@ -1570,7 +1578,7 @@ static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn
assert(le); // for broadcast updates, we just hit all leafentries assert(le); // for broadcast updates, we just hit all leafentries
// so this cannot be null // so this cannot be null
assert(cmd->u.id.key->size == 0); assert(cmd->u.id.key->size == 0);
STATUS_VALUE(FT_UPDATES_BROADCAST)++; STATUS_INC(FT_UPDATES_BROADCAST, 1);
keyp = toku_fill_dbt(&key, le_key(le), le_keylen(le)); keyp = toku_fill_dbt(&key, le_key(le), le_keylen(le));
update_function_extra = cmd->u.id.val; update_function_extra = cmd->u.id.val;
} else { } else {
...@@ -2274,9 +2282,9 @@ toku_bnc_flush_to_child( ...@@ -2274,9 +2282,9 @@ toku_bnc_flush_to_child(
live_root_txns live_root_txns
); );
size_t buffsize = toku_fifo_buffer_size_in_use(bnc->buffer); size_t buffsize = toku_fifo_buffer_size_in_use(bnc->buffer);
STATUS_VALUE(FT_MSG_BYTES_OUT) += buffsize; STATUS_INC(FT_MSG_BYTES_OUT, buffsize);
// may be misleading if there's a broadcast message in there // may be misleading if there's a broadcast message in there
STATUS_VALUE(FT_MSG_BYTES_CURR) -= buffsize; STATUS_INC(FT_MSG_BYTES_CURR, -buffsize);
// Perform the garbage collection. // Perform the garbage collection.
ft_leaf_gc_all_les(child, h, snapshot_txnids, referenced_xids, live_root_txns); ft_leaf_gc_all_les(child, h, snapshot_txnids, referenced_xids, live_root_txns);
...@@ -2401,7 +2409,7 @@ void toku_ft_leaf_apply_cmd( ...@@ -2401,7 +2409,7 @@ void toku_ft_leaf_apply_cmd(
workdone, workdone,
stats_to_update); stats_to_update);
} else { } else {
STATUS_VALUE(FT_MSN_DISCARDS)++; STATUS_INC(FT_MSN_DISCARDS, 1);
} }
} }
else if (ft_msg_applies_all(cmd)) { else if (ft_msg_applies_all(cmd)) {
...@@ -2416,7 +2424,7 @@ void toku_ft_leaf_apply_cmd( ...@@ -2416,7 +2424,7 @@ void toku_ft_leaf_apply_cmd(
workdone, workdone,
stats_to_update); stats_to_update);
} else { } else {
STATUS_VALUE(FT_MSN_DISCARDS)++; STATUS_INC(FT_MSN_DISCARDS, 1);
} }
} }
} }
...@@ -2457,14 +2465,14 @@ static void push_something_at_root (FT h, FTNODE *nodep, FT_MSG cmd) ...@@ -2457,14 +2465,14 @@ static void push_something_at_root (FT h, FTNODE *nodep, FT_MSG cmd)
// update some status variables // update some status variables
if (node->height != 0) { if (node->height != 0) {
uint64_t msgsize = ft_msg_size(cmd); uint64_t msgsize = ft_msg_size(cmd);
STATUS_VALUE(FT_MSG_BYTES_IN) += msgsize; STATUS_INC(FT_MSG_BYTES_IN, msgsize);
STATUS_VALUE(FT_MSG_BYTES_CURR) += msgsize; STATUS_INC(FT_MSG_BYTES_CURR, msgsize);
if (STATUS_VALUE(FT_MSG_BYTES_CURR) > STATUS_VALUE(FT_MSG_BYTES_MAX)) { //if (STATUS_VALUE(FT_MSG_BYTES_CURR) > STATUS_VALUE(FT_MSG_BYTES_MAX)) {
STATUS_VALUE(FT_MSG_BYTES_MAX) = STATUS_VALUE(FT_MSG_BYTES_CURR); // STATUS_VALUE(FT_MSG_BYTES_MAX) = STATUS_VALUE(FT_MSG_BYTES_CURR);
} //}
STATUS_VALUE(FT_MSG_NUM)++; STATUS_INC(FT_MSG_NUM, 1);
if (ft_msg_applies_all(cmd)) { if (ft_msg_applies_all(cmd)) {
STATUS_VALUE(FT_MSG_NUM_BROADCAST)++; STATUS_INC(FT_MSG_NUM_BROADCAST, 1);
} }
} }
} }
...@@ -3137,7 +3145,7 @@ toku_ft_change_descriptor( ...@@ -3137,7 +3145,7 @@ toku_ft_change_descriptor(
toku_ft_update_descriptor(ft_h->ft, &new_d); toku_ft_update_descriptor(ft_h->ft, &new_d);
// very infrequent operation, worth precise threadsafe count // very infrequent operation, worth precise threadsafe count
if (r == 0) { if (r == 0) {
STATUS_VALUE(FT_DESCRIPTOR_SET)++; STATUS_INC(FT_DESCRIPTOR_SET, 1);
} }
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
...@@ -3848,7 +3856,7 @@ do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, FTNODE ancestor, int childnum, str ...@@ -3848,7 +3856,7 @@ do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, FTNODE ancestor, int childnum, str
stats_to_update stats_to_update
); );
} else { } else {
STATUS_VALUE(FT_MSN_DISCARDS)++; STATUS_INC(FT_MSN_DISCARDS, 1);
} }
// We must always mark entry as stale since it has been marked // We must always mark entry as stale since it has been marked
// (using omt::iterate_and_mark_range) // (using omt::iterate_and_mark_range)
...@@ -4906,14 +4914,16 @@ try_again: ...@@ -4906,14 +4914,16 @@ try_again:
} }
{ // accounting (to detect and measure thrashing) { // accounting (to detect and measure thrashing)
uint retrycount = trycount - 1; // how many retries were needed? uint retrycount = trycount - 1; // how many retries were needed?
if (retrycount) STATUS_VALUE(FT_TOTAL_RETRIES) += retrycount; if (retrycount) {
STATUS_INC(FT_TOTAL_RETRIES, retrycount);
}
if (retrycount > tree_height) { // if at least one node was read from disk more than once if (retrycount > tree_height) { // if at least one node was read from disk more than once
STATUS_VALUE(FT_SEARCH_TRIES_GT_HEIGHT)++; STATUS_INC(FT_SEARCH_TRIES_GT_HEIGHT, 1);
uint excess_tries = retrycount - tree_height; //uint excess_tries = retrycount - tree_height;
if (excess_tries > STATUS_VALUE(FT_MAX_SEARCH_EXCESS_RETRIES)) //if (excess_tries > STATUS_VALUE(FT_MAX_SEARCH_EXCESS_RETRIES))
STATUS_VALUE(FT_MAX_SEARCH_EXCESS_RETRIES) = excess_tries; // STATUS_VALUE(FT_MAX_SEARCH_EXCESS_RETRIES) = excess_tries;
if (retrycount > (tree_height+3)) if (retrycount > (tree_height+3))
STATUS_VALUE(FT_SEARCH_TRIES_GT_HEIGHTPLUS3)++; STATUS_INC(FT_SEARCH_TRIES_GT_HEIGHTPLUS3, 1);
} }
} }
return r; return r;
...@@ -5549,6 +5559,7 @@ int toku_ft_layer_init(void) { ...@@ -5549,6 +5559,7 @@ int toku_ft_layer_init(void) {
if (r) { goto exit; } if (r) { goto exit; }
partitioned_counters_init(); partitioned_counters_init();
status_init();
toku_checkpoint_init(); toku_checkpoint_init();
toku_ft_serialize_layer_init(); toku_ft_serialize_layer_init();
toku_mutex_init(&ft_open_close_lock, NULL); toku_mutex_init(&ft_open_close_lock, NULL);
...@@ -5560,6 +5571,7 @@ void toku_ft_layer_destroy(void) { ...@@ -5560,6 +5571,7 @@ void toku_ft_layer_destroy(void) {
toku_mutex_destroy(&ft_open_close_lock); toku_mutex_destroy(&ft_open_close_lock);
toku_ft_serialize_layer_destroy(); toku_ft_serialize_layer_destroy();
toku_checkpoint_destroy(); toku_checkpoint_destroy();
status_destroy();
partitioned_counters_destroy(); partitioned_counters_destroy();
//Portability must be cleaned up last //Portability must be cleaned up last
toku_portability_destroy(); toku_portability_destroy();
...@@ -5751,4 +5763,4 @@ toku_ft_helgrind_ignore(void) { ...@@ -5751,4 +5763,4 @@ toku_ft_helgrind_ignore(void) {
HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&ft_status, sizeof ft_status); HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&ft_status, sizeof ft_status);
} }
#undef STATUS_VALUE #undef STATUS_INC
...@@ -253,18 +253,16 @@ static inline struct local_counter *get_thread_local_counter(uint64_t pc_key, Gr ...@@ -253,18 +253,16 @@ static inline struct local_counter *get_thread_local_counter(uint64_t pc_key, Gr
} }
} }
void increment_partitioned_counter(PARTITIONED_COUNTER pc, uint64_t amount) static struct local_counter *get_or_alloc_thread_local_counter(PARTITIONED_COUNTER pc)
// Effect: Increment the counter by amount.
// Requires: No overflows. This is a 64-bit unsigned counter.
{ {
// Only this thread is allowed to modify thread_local_array, except for setting tla->array[pc_key] to NULL // Only this thread is allowed to modify thread_local_array, except for setting tla->array[pc_key] to NULL
// when a counter is destroyed (and in that case there should be no race because no other thread should be // when a counter is destroyed (and in that case there should be no race because no other thread should be
// trying to access the same local counter at the same time. // trying to access the same local counter at the same time.
uint64_t pc_key = pc->pc_key; uint64_t pc_key = pc->pc_key;
struct local_counter *lc = get_thread_local_counter(pc_key, &thread_local_array); struct local_counter *lc = get_thread_local_counter(pc->pc_key, &thread_local_array);
if (lc==NULL) { if (__builtin_expect(!!(lc == NULL), 0)) {
XMALLOC(lc); // Might as well do the malloc without holding the pc lock. But most of the rest of this work needs the lock. XMALLOC(lc); // Might as well do the malloc without holding the pc lock. But most of the rest of this work needs the lock.
pc_lock(); pc_lock();
// Set things up so that this thread terminates, the thread-local parts of the counter will be destroyed and merged into their respective counters. // Set things up so that this thread terminates, the thread-local parts of the counter will be destroyed and merged into their respective counters.
if (!thread_local_array_inited) { if (!thread_local_array_inited) {
...@@ -274,9 +272,9 @@ void increment_partitioned_counter(PARTITIONED_COUNTER pc, uint64_t amount) ...@@ -274,9 +272,9 @@ void increment_partitioned_counter(PARTITIONED_COUNTER pc, uint64_t amount)
all_thread_local_arrays.insert(&thread_local_ll_elt, &thread_local_array); all_thread_local_arrays.insert(&thread_local_ll_elt, &thread_local_array);
} }
lc->sum = 0; lc->sum = 0;
HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&lc->sum, sizeof(lc->sum)); // the counter increment is kind of racy. HELGRIND_VALGRIND_HG_DISABLE_CHECKING(&lc->sum, sizeof(lc->sum)); // the counter increment is kind of racy.
lc->owner_pc = pc; lc->owner_pc = pc;
lc->thread_local_array = &thread_local_array; lc->thread_local_array = &thread_local_array;
// Grow the array if needed, filling in NULLs // Grow the array if needed, filling in NULLs
...@@ -285,8 +283,16 @@ void increment_partitioned_counter(PARTITIONED_COUNTER pc, uint64_t amount) ...@@ -285,8 +283,16 @@ void increment_partitioned_counter(PARTITIONED_COUNTER pc, uint64_t amount)
} }
thread_local_array.store_unchecked(pc_key, lc); thread_local_array.store_unchecked(pc_key, lc);
pc->ll_counter_head.insert(&lc->ll_in_counter, lc); pc->ll_counter_head.insert(&lc->ll_in_counter, lc);
pc_unlock(); pc_unlock();
} }
return lc;
}
void increment_partitioned_counter(PARTITIONED_COUNTER pc, uint64_t amount)
// Effect: Increment the counter by amount.
// Requires: No overflows. This is a 64-bit unsigned counter.
{
struct local_counter *lc = get_or_alloc_thread_local_counter(pc);
lc->sum += amount; lc->sum += amount;
} }
...@@ -322,9 +328,8 @@ void partitioned_counters_destroy(void) ...@@ -322,9 +328,8 @@ void partitioned_counters_destroy(void)
while (all_thread_local_arrays.pop(&a_ll)) { while (all_thread_local_arrays.pop(&a_ll)) {
a_ll->get_container()->deinit(); a_ll->get_container()->deinit();
} }
pk_delete(thread_destructor_key); pk_delete(thread_destructor_key);
destroy_counters(); destroy_counters();
pc_unlock(); pc_unlock();
} }
...@@ -41,6 +41,7 @@ const char *toku_copyright_string = "Copyright (c) 2007-2012 Tokutek Inc. All r ...@@ -41,6 +41,7 @@ const char *toku_copyright_string = "Copyright (c) 2007-2012 Tokutek Inc. All r
#include "ydb_write.h" #include "ydb_write.h"
#include "ydb_txn.h" #include "ydb_txn.h"
#include "ft/txn_manager.h" #include "ft/txn_manager.h"
#include "ft/partitioned_counter.h"
// Include ydb_lib.cc here so that its constructor/destructor gets put into // Include ydb_lib.cc here so that its constructor/destructor gets put into
// ydb.o, to make sure they don't get erased at link time (when linking to // ydb.o, to make sure they don't get erased at link time (when linking to
...@@ -2083,6 +2084,18 @@ env_get_engine_status_text(DB_ENV * env, char * buff, int bufsiz) { ...@@ -2083,6 +2084,18 @@ env_get_engine_status_text(DB_ENV * env, char * buff, int bufsiz) {
n += snprintf(buff + n, bufsiz - n, "%.6f\n", t); n += snprintf(buff + n, bufsiz - n, "%.6f\n", t);
} }
break; break;
case PARCOUNT:
{
uint64_t v = read_partitioned_counter(mystat[row].value.parcount);
n += snprintf(buff + n, bufsiz - n, "%" PRIu64 "\n", v);
}
#if 0
case MAXCOUNT:
{
uint64_t v = read_max_partitioned_counter(mystat[row].value.maxcount);
n += snprintf(buff + n, bufsiz - n, "%" PRIu64 "\n", v);
}
#endif
default: default:
n += snprintf(buff + n, bufsiz - n, "UNKNOWN STATUS TYPE: %d\n", mystat[row].type); n += snprintf(buff + n, bufsiz - n, "UNKNOWN STATUS TYPE: %d\n", mystat[row].type);
break; break;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment