Commit 62c43ec6 authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

back out change


git-svn-id: file:///svn/toku/tokudb@51266 c7de825b-a66e-492c-adef-691d508d4ae1
parent 93515995
...@@ -42,6 +42,6 @@ int ...@@ -42,6 +42,6 @@ int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf(); struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
perf_test_main(&args); stress_test_main(&args);
return 0; return 0;
} }
...@@ -42,17 +42,17 @@ static int UU() iibench_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void ...@@ -42,17 +42,17 @@ static int UU() iibench_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void
} }
int r = 0; int r = 0;
uint8_t keybuf[arg->cli->key_size];
uint8_t valbuf[arg->cli->val_size]; uint8_t valbuf[arg->cli->val_size];
dbt_init(&mult_key_dbt[0], keybuf, sizeof keybuf); ZERO_ARRAY(valbuf);
dbt_init(&mult_val_dbt[0], valbuf, sizeof valbuf);
uint64_t puts_to_increment = 0; uint64_t puts_to_increment = 0;
for (uint32_t i = 0; i < arg->cli->txn_size; ++i) { for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
fill_zeroed_array(valbuf, arg->cli->val_size,
arg->random_data, arg->cli->compressibility);
struct iibench_op_extra *CAST_FROM_VOIDP(info, operation_extra); struct iibench_op_extra *CAST_FROM_VOIDP(info, operation_extra);
uint64_t pk = toku_sync_fetch_and_add(&info->autoincrement, 1); uint64_t pk = toku_sync_fetch_and_add(&info->autoincrement, 1);
fill_key_buf(pk, keybuf, arg->cli); dbt_init(&mult_key_dbt[0], &pk, sizeof pk);
fill_val_buf_random(arg->random_data, valbuf, arg->cli); dbt_init(&mult_val_dbt[0], valbuf, sizeof valbuf);
r = env->put_multiple( r = env->put_multiple(
env, env,
dbs[0], // source db. dbs[0], // source db.
...@@ -128,6 +128,6 @@ test_main(int argc, char *const argv[]) { ...@@ -128,6 +128,6 @@ test_main(int argc, char *const argv[]) {
args.crash_on_operation_failure = false; args.crash_on_operation_failure = false;
} }
args.env_args.generate_put_callback = iibench_generate_row_for_put; args.env_args.generate_put_callback = iibench_generate_row_for_put;
perf_test_main(&args); stress_test_main_with_cmp(&args, stress_uint64_dbt_cmp);
return 0; return 0;
} }
...@@ -51,6 +51,6 @@ test_main(int argc, char *const argv[]) { ...@@ -51,6 +51,6 @@ test_main(int argc, char *const argv[]) {
if (args.num_put_threads > 1) { if (args.num_put_threads > 1) {
args.crash_on_operation_failure = false; args.crash_on_operation_failure = false;
} }
perf_test_main(&args); stress_test_main_with_cmp(&args, stress_uint64_dbt_cmp);
return 0; return 0;
} }
...@@ -20,13 +20,6 @@ ...@@ -20,13 +20,6 @@
// The intent of this test is to measure the throughput of malloc and free // The intent of this test is to measure the throughput of malloc and free
// with multiple threads. // with multiple threads.
static int xmalloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
size_t s = 256;
void *p = toku_xmalloc(s);
toku_free(p);
return 0;
}
static void static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) { stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n"); if (verbose) printf("starting creation of pthreads\n");
...@@ -34,7 +27,7 @@ stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) { ...@@ -34,7 +27,7 @@ stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
struct arg myargs[num_threads]; struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbp, env, cli_args); arg_init(&myargs[i], dbp, env, cli_args);
myargs[i].operation = xmalloc_free_op; myargs[i].operation = malloc_free_op;
} }
run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args); run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args);
} }
...@@ -43,6 +36,6 @@ int ...@@ -43,6 +36,6 @@ int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf(); struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
perf_test_main(&args); stress_test_main(&args);
return 0; return 0;
} }
...@@ -18,10 +18,6 @@ ...@@ -18,10 +18,6 @@
// The intent of this test is to measure the throughput of the test infrastructure executing a nop // The intent of this test is to measure the throughput of the test infrastructure executing a nop
// on multiple threads. // on multiple threads.
static int UU() nop(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
return 0;
}
static void static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) { stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n"); if (verbose) printf("starting creation of pthreads\n");
...@@ -38,6 +34,6 @@ int ...@@ -38,6 +34,6 @@ int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf(); struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
perf_test_main(&args); stress_test_main(&args);
return 0; return 0;
} }
...@@ -62,6 +62,6 @@ int ...@@ -62,6 +62,6 @@ int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf(); struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
perf_test_main(&args); stress_test_main(&args);
return 0; return 0;
} }
...@@ -75,6 +75,6 @@ int ...@@ -75,6 +75,6 @@ int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf(); struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
perf_test_main(&args); stress_test_main(&args);
return 0; return 0;
} }
...@@ -78,6 +78,6 @@ test_main(int argc, char *const argv[]) { ...@@ -78,6 +78,6 @@ test_main(int argc, char *const argv[]) {
args.num_update_threads = 1; args.num_update_threads = 1;
args.crash_on_operation_failure = false; args.crash_on_operation_failure = false;
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
perf_test_main(&args); stress_test_main(&args);
return 0; return 0;
} }
...@@ -71,6 +71,6 @@ test_main(int argc, char *const argv[]) { ...@@ -71,6 +71,6 @@ test_main(int argc, char *const argv[]) {
// this test is all about transactions, make the DB small // this test is all about transactions, make the DB small
args.num_elements = 1; args.num_elements = 1;
args.num_DBs= 1; args.num_DBs= 1;
perf_test_main(&args); stress_test_main(&args);
return 0; return 0;
} }
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#ident "$Id$"
#include "test.h"
#include <stdio.h>
#include <stdlib.h>
#include <toku_pthread.h>
#include <unistd.h>
#include <memory.h>
#include <sys/stat.h>
#include <db.h>
#include "threaded_stress_test_helpers.h"
// The intent of this test is to measure the throughput of toku_malloc and toku_free
// with multiple threads.
static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbp, env, cli_args);
myargs[i].operation = xmalloc_free_op;
}
run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args);
}
int
test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args);
stress_test_main(&args);
return 0;
}
...@@ -22,14 +22,6 @@ ...@@ -22,14 +22,6 @@
// This test is targetted at stressing the locktree, hence the small table and many update threads. // This test is targetted at stressing the locktree, hence the small table and many update threads.
// //
static int UU() lock_escalation_op(DB_TXN *UU(txn), ARG arg, void* operation_extra, void *UU(stats_extra)) {
invariant_null(operation_extra);
if (!arg->cli->nolocktree) {
toku_env_run_lock_escalation_for_test(arg->env);
}
return 0;
}
static void static void
stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
...@@ -48,8 +40,13 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -48,8 +40,13 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[0].operation_extra = &soe[0]; myargs[0].operation_extra = &soe[0];
myargs[0].operation = scan_op; myargs[0].operation = scan_op;
myargs[1].sleep_ms = 15L * 1000; // make the lock escalation thread.
myargs[1].operation_extra = nullptr; // it should sleep somewhere between 10 and 20
// seconds between each escalation.
struct lock_escalation_op_extra eoe;
eoe.min_sleep_time_micros = 10UL * (1000 * 1000);
eoe.max_sleep_time_micros = 20UL * (1000 * 1000);
myargs[1].operation_extra = &eoe;
myargs[1].operation = lock_escalation_op; myargs[1].operation = lock_escalation_op;
// make the threads that update the db // make the threads that update the db
......
...@@ -69,15 +69,9 @@ static int hi_inserts(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void ...@@ -69,15 +69,9 @@ static int hi_inserts(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void
DBT dest_vals[2]; DBT dest_vals[2];
memset(dest_keys, 0, sizeof(dest_keys)); memset(dest_keys, 0, sizeof(dest_keys));
memset(dest_vals, 0, sizeof(dest_vals)); memset(dest_vals, 0, sizeof(dest_vals));
DBT key, val;
uint8_t keybuf[arg->cli->key_size];
uint8_t valbuf[arg->cli->val_size];
dbt_init(&key, keybuf, sizeof keybuf),
dbt_init(&val, valbuf, sizeof valbuf),
r = env->txn_begin(env, NULL, &hi_txn, 0); CKERR(r);
int i; int i;
r = env->txn_begin(env, NULL, &hi_txn, 0);
CKERR(r);
for (i = 0; i < 1000; i++) { for (i = 0; i < 1000; i++) {
DB* dbs[2]; DB* dbs[2];
toku_mutex_lock(&hi_lock); toku_mutex_lock(&hi_lock);
...@@ -85,8 +79,11 @@ static int hi_inserts(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void ...@@ -85,8 +79,11 @@ static int hi_inserts(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void
dbs[1] = hot_db; dbs[1] = hot_db;
int num_dbs = hot_db ? 2 : 1; int num_dbs = hot_db ? 2 : 1;
// do a random insertion // do a random insertion
fill_key_buf_random(arg->random_data, keybuf, arg); int rand_key = random() % arg->cli->num_elements;
fill_val_buf_random(arg->random_data, valbuf, arg->cli); int rand_val = random();
DBT key, val;
dbt_init(&key, &rand_key, sizeof(rand_key)),
dbt_init(&val, &rand_val, sizeof(rand_val)),
r = env->put_multiple( r = env->put_multiple(
env, env,
db, db,
......
...@@ -52,11 +52,6 @@ memalign(size_t UU(alignment), size_t size) ...@@ -52,11 +52,6 @@ memalign(size_t UU(alignment), size_t size)
# endif # endif
#endif #endif
#define MIN_VAL_SIZE sizeof(int64_t)
#define MIN_KEY_SIZE sizeof(int64_t)
#define MIN_COMPRESSIBILITY (0.0)
#define MAX_COMPRESSIBILITY (1.0)
volatile bool run_test; // should be volatile since we are communicating through this variable. volatile bool run_test; // should be volatile since we are communicating through this variable.
typedef struct arg *ARG; typedef struct arg *ARG;
...@@ -92,6 +87,9 @@ enum perf_output_format { ...@@ -92,6 +87,9 @@ enum perf_output_format {
HUMAN = 0, HUMAN = 0,
CSV, CSV,
TSV, TSV,
#if 0
GNUPLOT,
#endif
NUM_OUTPUT_FORMATS NUM_OUTPUT_FORMATS
}; };
...@@ -155,6 +153,8 @@ struct arg { ...@@ -155,6 +153,8 @@ struct arg {
bool prelock_updates; bool prelock_updates;
}; };
DB_TXN * const null_txn = 0;
static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cli_args) { static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cli_args) {
arg->cli = cli_args; arg->cli = cli_args;
arg->dbp = dbp; arg->dbp = dbp;
...@@ -163,7 +163,7 @@ static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cl ...@@ -163,7 +163,7 @@ static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cl
arg->sleep_ms = 0; arg->sleep_ms = 0;
arg->lock_type = STRESS_LOCK_NONE; arg->lock_type = STRESS_LOCK_NONE;
arg->txn_type = DB_TXN_SNAPSHOT; arg->txn_type = DB_TXN_SNAPSHOT;
arg->operation_extra = nullptr; arg->operation_extra = NULL;
arg->do_prepare = false; arg->do_prepare = false;
arg->prelock_updates = false; arg->prelock_updates = false;
} }
...@@ -174,14 +174,12 @@ enum operation_type { ...@@ -174,14 +174,12 @@ enum operation_type {
PTQUERIES, PTQUERIES,
NUM_OPERATION_TYPES NUM_OPERATION_TYPES
}; };
const char *operation_names[] = { const char *operation_names[] = {
"ops", "ops",
"puts", "puts",
"ptqueries", "ptqueries",
nullptr NULL
}; };
static void increment_counter(void *extra, enum operation_type type, uint64_t inc) { static void increment_counter(void *extra, enum operation_type type, uint64_t inc) {
invariant(type != OPERATION); invariant(type != OPERATION);
int t = (int) type; int t = (int) type;
...@@ -401,6 +399,45 @@ tsv_print_perf_totals(const struct cli_args *cli_args, uint64_t *counters[], con ...@@ -401,6 +399,45 @@ tsv_print_perf_totals(const struct cli_args *cli_args, uint64_t *counters[], con
printf("\n"); printf("\n");
} }
#if 0
static void
gnuplot_print_perf_header(const struct cli_args *cli_args, const int num_threads)
{
printf("set terminal postscript solid color\n");
printf("set output \"foo.eps\"\n");
printf("set xlabel \"seconds\"\n");
printf("set xrange [0:*]\n");
printf("set ylabel \"X/s\"\n");
printf("plot ");
if (cli_args->print_thread_performance) {
for (int t = 1; t <= num_threads; ++t) {
for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
const int col = (2 * ((t - 1) * (int) NUM_OPERATION_TYPES + op)) + 2;
//printf("'-' u 1:%d w lines t \"Thread %d %s\", ", col, t, operation_names[op]);
printf("'-' u 1:%d w lines t \"Thread %d %s/s\", ", col + 1, t, operation_names[op]);
}
}
}
for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
const int col = (2 * (num_threads * (int) NUM_OPERATION_TYPES + op)) + 2;
//printf("'-' u 1:%d w lines t \"Total %s\", ", col);
printf("'-' u 1:%d w lines t \"Total %s/s\"%s", col + 1, operation_names[op], op == ((int) NUM_OPERATION_TYPES - 1) ? "\n" : ", ");
}
}
static void
gnuplot_print_perf_iteration(const struct cli_args *cli_args, const int current_time, uint64_t last_counters[][(int) NUM_OPERATION_TYPES], uint64_t *counters[], const int num_threads)
{
tsv_print_perf_iteration(cli_args, current_time, last_counters, counters, num_threads);
}
static void
gnuplot_print_perf_totals(const struct cli_args *UU(cli_args), uint64_t *UU(counters[]), const int UU(num_threads))
{
printf("e\n");
}
#endif
const struct perf_formatter perf_formatters[] = { const struct perf_formatter perf_formatters[] = {
[HUMAN] = { [HUMAN] = {
.header = human_print_perf_header, .header = human_print_perf_header,
...@@ -417,6 +454,13 @@ const struct perf_formatter perf_formatters[] = { ...@@ -417,6 +454,13 @@ const struct perf_formatter perf_formatters[] = {
.iteration = tsv_print_perf_iteration, .iteration = tsv_print_perf_iteration,
.totals = tsv_print_perf_totals .totals = tsv_print_perf_totals
}, },
#if 0
[GNUPLOT] = {
.header = gnuplot_print_perf_header,
.iteration = gnuplot_print_perf_iteration,
.totals = gnuplot_print_perf_totals
}
#endif
}; };
static int get_env_open_flags(struct cli_args *args) { static int get_env_open_flags(struct cli_args *args) {
...@@ -487,7 +531,7 @@ static void *worker(void *arg_v) { ...@@ -487,7 +531,7 @@ static void *worker(void *arg_v) {
assert_zero(r); assert_zero(r);
arg->random_data = &random_data; arg->random_data = &random_data;
DB_ENV *env = arg->env; DB_ENV *env = arg->env;
DB_TXN *txn = nullptr; DB_TXN *txn = NULL;
if (verbose) { if (verbose) {
toku_pthread_t self = toku_pthread_self(); toku_pthread_t self = toku_pthread_self();
uintptr_t intself = (uintptr_t) self; uintptr_t intself = (uintptr_t) self;
...@@ -544,6 +588,7 @@ static void *worker(void *arg_v) { ...@@ -544,6 +588,7 @@ static void *worker(void *arg_v) {
return arg; return arg;
} }
typedef struct scan_cb_extra *SCAN_CB_EXTRA;
struct scan_cb_extra { struct scan_cb_extra {
bool fast; bool fast;
int64_t curr_sum; int64_t curr_sum;
...@@ -557,13 +602,13 @@ struct scan_op_extra { ...@@ -557,13 +602,13 @@ struct scan_op_extra {
}; };
static int static int
scan_cb(const DBT *key, const DBT *val, void *arg_v) { scan_cb(const DBT *a, const DBT *b, void *arg_v) {
struct scan_cb_extra *CAST_FROM_VOIDP(cb_extra, arg_v); SCAN_CB_EXTRA CAST_FROM_VOIDP(cb_extra, arg_v);
assert(key); assert(a);
assert(val); assert(b);
assert(cb_extra); assert(cb_extra);
assert(val->size >= sizeof(int64_t)); assert(b->size >= sizeof(int));
cb_extra->curr_sum += *(int64_t *) val->data; cb_extra->curr_sum += *(int *)b->data;
cb_extra->num_elements++; cb_extra->num_elements++;
return cb_extra->fast ? TOKUDB_CURSOR_CONTINUE : 0; return cb_extra->fast ? TOKUDB_CURSOR_CONTINUE : 0;
} }
...@@ -576,13 +621,12 @@ static int scan_op_and_maybe_check_sum( ...@@ -576,13 +621,12 @@ static int scan_op_and_maybe_check_sum(
) )
{ {
int r = 0; int r = 0;
DBC* cursor = nullptr; DBC* cursor = NULL;
struct scan_cb_extra e = { struct scan_cb_extra e;
e.fast = sce->fast, e.fast = sce->fast;
e.curr_sum = 0, e.curr_sum = 0;
e.num_elements = 0, e.num_elements = 0;
};
{ int chk_r = db->cursor(db, txn, &cursor, 0); CKERR(chk_r); } { int chk_r = db->cursor(db, txn, &cursor, 0); CKERR(chk_r); }
if (sce->prefetch) { if (sce->prefetch) {
...@@ -633,68 +677,37 @@ static int generate_row_for_put( ...@@ -633,68 +677,37 @@ static int generate_row_for_put(
return 0; return 0;
} }
static uint64_t breverse(uint64_t v) static int UU() nop(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
// Effect: return the bits in i, reversed return 0;
// Notes: implementation taken from http://graphics.stanford.edu/~seander/bithacks.html#BitReverseObvious
// Rationale: just a hack to spread out the keys during loading, doesn't need to be fast but does need to be correct.
{
uint64_t r = v; // r will be reversed bits of v; first get LSB of v
int s = sizeof(v) * CHAR_BIT - 1; // extra shift needed at end
for (v >>= 1; v; v >>= 1) {
r <<= 1;
r |= v & 1;
s--;
}
r <<= s; // shift when v's highest bits are zero
return r;
}
static void
fill_key_buf(uint64_t key, uint8_t *data, struct cli_args *args) {
invariant(args->key_size >= MIN_KEY_SIZE);
uint64_t *k = reinterpret_cast<uint64_t *>(data);
if (args->disperse_keys) {
*k = static_cast<uint64_t>(breverse(key));
} else {
*k = key;
}
if (args->key_size > sizeof(uint64_t)) {
memset(data + sizeof(uint64_t), 0, args->key_size - sizeof(uint64_t));
}
} }
static void static int UU() xmalloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
fill_key_buf_random(struct random_data *random_data, uint8_t *data, ARG arg) { size_t s = 256;
uint64_t key = randu64(random_data); void *p = toku_xmalloc(s);
if (arg->bounded_element_range && arg->cli->num_elements > 0) { toku_free(p);
key = key % arg->cli->num_elements; return 0;
}
fill_key_buf(key, data, arg->cli);
} }
static void #if DONT_DEPRECATE_MALLOC
fill_val_buf(int64_t val, uint8_t *data, uint32_t val_size) { static int UU() malloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
invariant(val_size >= MIN_VAL_SIZE); size_t s = 256;
int64_t *v = reinterpret_cast<int64_t *>(data); void *p = malloc(s);
*v = val; free(p);
if (val_size > sizeof(int64_t)) { return 0;
memset(data + sizeof(int64_t), 0, val_size - sizeof(int64_t));
}
} }
#endif
// Fill array with compressibility*size 0s. // Fill array with compressibility*size 0s.
// 0.0<=compressibility<=1.0 // 0.0<=compressibility<=1.0
// Compressibility is the fraction of size that will be 0s (e.g. approximate fraction that will be compressed away). // Compressibility is the fraction of size that will be 0s (e.g. approximate fraction that will be compressed away).
// The rest will be random data. // The rest will be random data.
static void static void
fill_val_buf_random(struct random_data *random_data, uint8_t *data, struct cli_args *args) { fill_zeroed_array(uint8_t *data, uint32_t size, struct random_data *random_data, double compressibility) {
invariant(args->val_size >= MIN_VAL_SIZE);
//Requires: The array was zeroed since the last time 'size' was changed. //Requires: The array was zeroed since the last time 'size' was changed.
//Requires: compressibility is in range [0,1] indicating fraction that should be zeros. //Requires: compressibility is in range [0,1] indicating fraction that should be zeros.
// Fill in the random bytes uint32_t num_random_bytes = (1 - compressibility) * size;
uint32_t num_random_bytes = (1 - args->compressibility) * args->val_size;
if (num_random_bytes > 0) { if (num_random_bytes > 0) {
uint32_t filled; uint32_t filled;
for (filled = 0; filled + sizeof(uint64_t) <= num_random_bytes; filled += sizeof(uint64_t)) { for (filled = 0; filled + sizeof(uint64_t) <= num_random_bytes; filled += sizeof(uint64_t)) {
...@@ -705,28 +718,39 @@ fill_val_buf_random(struct random_data *random_data, uint8_t *data, struct cli_a ...@@ -705,28 +718,39 @@ fill_val_buf_random(struct random_data *random_data, uint8_t *data, struct cli_a
memcpy(&data[filled], &last8, num_random_bytes - filled); memcpy(&data[filled], &last8, num_random_bytes - filled);
} }
} }
}
// Fill in the zero bytes static inline size_t
if (num_random_bytes < args->val_size) { size_t_max(size_t a, size_t b) {
memset(data + num_random_bytes, 0, args->val_size - num_random_bytes); return (a > b) ? a : b;
}
} }
static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, void *stats_extra) { static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, void *stats_extra) {
int r = 0; int r = 0;
uint8_t keybuf[arg->cli->key_size]; uint8_t rand_key_b[size_t_max(arg->cli->key_size, sizeof(uint64_t))];
uint64_t *rand_key_key = cast_to_typeof(rand_key_key) rand_key_b;
uint16_t *rand_key_i = cast_to_typeof(rand_key_i) rand_key_b;
ZERO_ARRAY(rand_key_b);
uint8_t valbuf[arg->cli->val_size]; uint8_t valbuf[arg->cli->val_size];
ZERO_ARRAY(valbuf);
DBT key, val;
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, valbuf, sizeof valbuf);
const int put_flags = get_put_flags(arg->cli);
uint64_t puts_to_increment = 0; uint64_t puts_to_increment = 0;
for (uint32_t i = 0; i < arg->cli->txn_size; ++i) { for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
fill_key_buf_random(arg->random_data, keybuf, arg); rand_key_key[0] = randu64(arg->random_data);
fill_val_buf_random(arg->random_data, valbuf, arg->cli); if (arg->cli->interleave) {
r = db->put(db, txn, &key, &val, put_flags); rand_key_i[3] = arg->thread_idx;
} else {
rand_key_i[0] = arg->thread_idx;
}
if (arg->cli->num_elements > 0 && arg->bounded_element_range) {
rand_key_key[0] = rand_key_key[0] % arg->cli->num_elements;
}
fill_zeroed_array(valbuf, arg->cli->val_size, arg->random_data, arg->cli->compressibility);
DBT key, val;
dbt_init(&key, &rand_key_b, sizeof rand_key_b);
dbt_init(&val, valbuf, sizeof valbuf);
int flags = get_put_flags(arg->cli);
r = db->put(db, txn, &key, &val, flags);
if (!ignore_errors && r != 0) { if (!ignore_errors && r != 0) {
goto cleanup; goto cleanup;
} }
...@@ -736,7 +760,6 @@ static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, vo ...@@ -736,7 +760,6 @@ static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, vo
puts_to_increment = 0; puts_to_increment = 0;
} }
} }
cleanup: cleanup:
increment_counter(stats_extra, PUTS, puts_to_increment); increment_counter(stats_extra, PUTS, puts_to_increment);
return r; return r;
...@@ -765,19 +788,22 @@ static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void ...@@ -765,19 +788,22 @@ static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void
DB* db = arg->dbp[db_index]; DB* db = arg->dbp[db_index];
int r = 0; int r = 0;
uint8_t keybuf[arg->cli->key_size]; uint8_t rand_key_b[size_t_max(arg->cli->key_size, sizeof(uint64_t))];
uint64_t *rand_key_key = cast_to_typeof(rand_key_key) rand_key_b;
uint16_t *rand_key_i = cast_to_typeof(rand_key_i) rand_key_b;
ZERO_ARRAY(rand_key_b);
uint8_t valbuf[arg->cli->val_size]; uint8_t valbuf[arg->cli->val_size];
ZERO_ARRAY(valbuf);
DBT key, val;
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, valbuf, sizeof valbuf);
const int put_flags = get_put_flags(arg->cli);
uint64_t puts_to_increment = 0; uint64_t puts_to_increment = 0;
for (uint64_t i = 0; i < arg->cli->txn_size; ++i) { for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
fill_key_buf(i, keybuf, arg->cli); rand_key_key[0] = extra->current++;
fill_val_buf_random(arg->random_data, valbuf, arg->cli); fill_zeroed_array(valbuf, arg->cli->val_size, arg->random_data, arg->cli->compressibility);
r = db->put(db, txn, &key, &val, put_flags); DBT key, val;
dbt_init(&key, &rand_key_b, sizeof rand_key_b);
dbt_init(&val, valbuf, sizeof valbuf);
int flags = get_put_flags(arg->cli);
r = db->put(db, txn, &key, &val, flags);
if (r != 0) { if (r != 0) {
goto cleanup; goto cleanup;
} }
...@@ -787,7 +813,6 @@ static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void ...@@ -787,7 +813,6 @@ static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void
puts_to_increment = 0; puts_to_increment = 0;
} }
} }
cleanup: cleanup:
increment_counter(stats_extra, PUTS, puts_to_increment); increment_counter(stats_extra, PUTS, puts_to_increment);
return r; return r;
...@@ -802,44 +827,42 @@ static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra), v ...@@ -802,44 +827,42 @@ static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra), v
uint32_t dbt_flags = 0; uint32_t dbt_flags = 0;
r = db_create(&db_load, env, 0); r = db_create(&db_load, env, 0);
assert(r == 0); assert(r == 0);
r = db_load->open(db_load, txn, "loader-db", nullptr, DB_BTREE, DB_CREATE, 0666); r = db_load->open(db_load, txn, "loader-db", NULL, DB_BTREE, DB_CREATE, 0666);
assert(r == 0); assert(r == 0);
DB_LOADER *loader; DB_LOADER *loader;
uint32_t loader_flags = (num == 0) ? 0 : LOADER_COMPRESS_INTERMEDIATES; uint32_t loader_flags = (num == 0) ? 0 : LOADER_COMPRESS_INTERMEDIATES;
r = env->create_loader(env, txn, &loader, db_load, 1, &db_load, &db_flags, &dbt_flags, loader_flags); r = env->create_loader(env, txn, &loader, db_load, 1, &db_load, &db_flags, &dbt_flags, loader_flags);
CKERR(r); CKERR(r);
DBT key, val;
uint8_t keybuf[arg->cli->key_size];
uint8_t valbuf[arg->cli->val_size];
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, valbuf, sizeof valbuf);
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
fill_key_buf(i, keybuf, arg->cli); DBT key, val;
fill_val_buf_random(arg->random_data, valbuf, arg->cli); int rand_key = i;
int rand_val = myrandom_r(arg->random_data);
dbt_init(&key, &rand_key, sizeof(rand_key));
dbt_init(&val, &rand_val, sizeof(rand_val));
r = loader->put(loader, &key, &val); CKERR(r); r = loader->put(loader, &key, &val); CKERR(r);
} }
r = loader->close(loader); CKERR(r); r = loader->close(loader); CKERR(r);
r = db_load->close(db_load, 0); CKERR(r); r = db_load->close(db_load, 0); CKERR(r);
r = env->dbremove(env, txn, "loader-db", nullptr, 0); CKERR(r); r = env->dbremove(env, txn, "loader-db", NULL, 0); CKERR(r);
} }
return 0; return 0;
} }
static int UU() keyrange_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { static int UU() keyrange_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
// Pick a random DB, do a keyrange operation. int r;
// callback is designed to run on tests with one DB
// no particular reason why, just the way it was
// originally done
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index]; DB* db = arg->dbp[db_index];
int rand_key = myrandom_r(arg->random_data);
int r = 0; if (arg->bounded_element_range) {
uint8_t keybuf[arg->cli->key_size]; rand_key = rand_key % arg->cli->num_elements;
}
DBT key; DBT key;
dbt_init(&key, keybuf, sizeof keybuf); dbt_init(&key, &rand_key, sizeof rand_key);
fill_key_buf_random(arg->random_data, keybuf, arg);
uint64_t less,equal,greater; uint64_t less,equal,greater;
int is_exact; int is_exact;
r = db->key_range64(db, txn, &key, &less, &equal, &greater, &is_exact); r = db->key_range64(db, txn, &key, &less, &equal, &greater, &is_exact);
...@@ -867,6 +890,27 @@ static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra ...@@ -867,6 +890,27 @@ static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra
return r; return r;
} }
struct lock_escalation_op_extra {
// sleep somewhere between these times before running escalation.
// this will add some chaos into the mix.
uint64_t min_sleep_time_micros;
uint64_t max_sleep_time_micros;
};
static int UU() lock_escalation_op(DB_TXN *UU(txn), ARG arg, void* operation_extra, void *UU(stats_extra)) {
struct lock_escalation_op_extra *CAST_FROM_VOIDP(extra, operation_extra);
if (extra->max_sleep_time_micros > 0) {
invariant(extra->max_sleep_time_micros >= extra->min_sleep_time_micros);
uint64_t extra_sleep_time = (extra->max_sleep_time_micros - extra->min_sleep_time_micros) + 1;
uint64_t sleep_time = extra->min_sleep_time_micros + (myrandom_r(arg->random_data) % extra_sleep_time);
usleep(sleep_time);
}
if (!arg->cli->nolocktree) {
toku_env_run_lock_escalation_for_test(arg->env);
}
return 0;
}
static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra, void *UU(stats_extra)) { static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra, void *UU(stats_extra)) {
struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra); struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra);
for (int i = 0; run_test && i < arg->cli->num_DBs; i++) { for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
...@@ -923,24 +967,23 @@ static int dbt_do_nothing (DBT const *UU(key), DBT const *UU(row), void *UU(con ...@@ -923,24 +967,23 @@ static int dbt_do_nothing (DBT const *UU(key), DBT const *UU(row), void *UU(con
} }
static int UU() ptquery_and_maybe_check_op(DB* db, DB_TXN *txn, ARG arg, bool check) { static int UU() ptquery_and_maybe_check_op(DB* db, DB_TXN *txn, ARG arg, bool check) {
int r = 0; int r;
uint8_t keybuf[arg->cli->key_size]; int rand_key = myrandom_r(arg->random_data);
if (arg->bounded_element_range) {
rand_key = rand_key % arg->cli->num_elements;
}
DBT key, val; DBT key, val;
dbt_init(&key, keybuf, sizeof keybuf); dbt_init(&key, &rand_key, sizeof rand_key);
dbt_init(&val, nullptr, 0); dbt_init(&val, NULL, 0);
fill_key_buf_random(arg->random_data, keybuf, arg);
r = db->getf_set( r = db->getf_set(
db, db,
txn, txn,
0, 0,
&key, &key,
dbt_do_nothing, dbt_do_nothing,
nullptr NULL
); );
if (check) { if (check) assert(r != DB_NOTFOUND);
assert(r != DB_NOTFOUND);
}
r = 0; r = 0;
return r; return r;
} }
...@@ -968,7 +1011,7 @@ static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_ext ...@@ -968,7 +1011,7 @@ static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_ext
static int UU() cursor_create_close_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { static int UU() cursor_create_close_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int db_index = arg->cli->num_DBs > 1 ? myrandom_r(arg->random_data)%arg->cli->num_DBs : 0; int db_index = arg->cli->num_DBs > 1 ? myrandom_r(arg->random_data)%arg->cli->num_DBs : 0;
DB* db = arg->dbp[db_index]; DB* db = arg->dbp[db_index];
DBC* cursor = nullptr; DBC* cursor = NULL;
int r = db->cursor(db, txn, &cursor, 0); assert(r == 0); int r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
r = cursor->c_close(cursor); assert(r == 0); r = cursor->c_close(cursor); assert(r == 0);
return 0; return 0;
...@@ -1017,14 +1060,14 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key), ...@@ -1017,14 +1060,14 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key),
void *set_extra), void *set_extra),
void *set_extra) void *set_extra)
{ {
int64_t old_int_val = 0; int old_int_val = 0;
if (old_val) { if (old_val) {
old_int_val = *(int64_t *) old_val->data; old_int_val = *(int*)old_val->data;
} }
assert(extra->size == sizeof(struct update_op_extra)); assert(extra->size == sizeof(struct update_op_extra));
struct update_op_extra *CAST_FROM_VOIDP(e, extra->data); struct update_op_extra *CAST_FROM_VOIDP(e, extra->data);
int64_t new_int_val; int new_int_val;
switch (e->type) { switch (e->type) {
case UPDATE_ADD_DIFF: case UPDATE_ADD_DIFF:
new_int_val = old_int_val + e->u.d.diff; new_int_val = old_int_val + e->u.d.diff;
...@@ -1040,59 +1083,53 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key), ...@@ -1040,59 +1083,53 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key),
assert(false); assert(false);
} }
uint32_t val_size = sizeof(int64_t) + e->pad_bytes;
uint8_t valbuf[val_size];
fill_val_buf(new_int_val, valbuf, val_size);
DBT new_val; DBT new_val;
dbt_init(&new_val, valbuf, val_size); uint32_t data_size = sizeof(int) + e->pad_bytes;
set_val(&new_val, set_extra); char* data [data_size];
ZERO_ARRAY(data);
memcpy(data, &new_int_val, sizeof(new_int_val));
set_val(dbt_init(&new_val, data, data_size), set_extra);
return 0; return 0;
} }
static int UU() update_op2(DB_TXN* txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { static int UU()update_op2(DB_TXN* txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int r;
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index]; DB* db = arg->dbp[db_index];
int curr_val_sum = 0;
int r = 0;
DBT key, val; DBT key, val;
uint8_t keybuf[arg->cli->key_size]; int rand_key;
int rand_key2;
toku_sync_fetch_and_add(&update_count, 1); toku_sync_fetch_and_add(&update_count, 1);
struct update_op_extra extra; struct update_op_extra extra;
ZERO_STRUCT(extra); ZERO_STRUCT(extra);
extra.type = UPDATE_ADD_DIFF; extra.type = UPDATE_ADD_DIFF;
extra.pad_bytes = 0; extra.pad_bytes = 0;
int64_t curr_val_sum = 0;
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, &extra, sizeof extra);
for (uint32_t i = 0; i < arg->cli->txn_size; i++) { for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
fill_key_buf_random(arg->random_data, keybuf, arg); rand_key = myrandom_r(arg->random_data);
if (arg->bounded_element_range) {
rand_key = rand_key % (arg->cli->num_elements/2);
}
rand_key2 = arg->cli->num_elements - rand_key;
assert(rand_key != rand_key2);
extra.u.d.diff = 1; extra.u.d.diff = 1;
curr_val_sum += extra.u.d.diff; curr_val_sum += extra.u.d.diff;
r = db->update( r = db->update(
db, db,
txn, txn,
&key, dbt_init(&key, &rand_key, sizeof rand_key),
&val, dbt_init(&val, &extra, sizeof extra),
0 0
); );
if (r != 0) { if (r != 0) {
return r; return r;
} }
int64_t *rkp = (int64_t *) keybuf;
int64_t rand_key = *rkp;
invariant(rand_key != (arg->cli->num_elements - rand_key));
rand_key -= arg->cli->num_elements;
fill_key_buf(rand_key, keybuf, arg->cli);
extra.u.d.diff = -1; extra.u.d.diff = -1;
r = db->update( r = db->update(
db, db,
txn, txn,
&key, dbt_init(&key, &rand_key2, sizeof rand_key),
&val, dbt_init(&val, &extra, sizeof extra),
0 0
); );
if (r != 0) { if (r != 0) {
...@@ -1119,6 +1156,10 @@ static int pre_acquire_write_lock(DB *db, DB_TXN *txn, ...@@ -1119,6 +1156,10 @@ static int pre_acquire_write_lock(DB *db, DB_TXN *txn,
// take the given db and do an update on it // take the given db and do an update on it
static int static int
UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) { UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
int r = 0;
int curr_val_sum = 0;
DBT key, val;
int update_key;
uint64_t old_update_count = toku_sync_fetch_and_add(&update_count, 1); uint64_t old_update_count = toku_sync_fetch_and_add(&update_count, 1);
struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra); struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra);
struct update_op_extra extra; struct update_op_extra extra;
...@@ -1130,14 +1171,7 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU( ...@@ -1130,14 +1171,7 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(
extra.pad_bytes = 100; extra.pad_bytes = 100;
} }
} }
int r = 0;
DBT key, val;
uint8_t keybuf[arg->cli->key_size];
int64_t update_key;
int64_t curr_val_sum = 0;
const int update_flags = arg->cli->prelock_updates ? DB_PRELOCKED_WRITE : 0; const int update_flags = arg->cli->prelock_updates ? DB_PRELOCKED_WRITE : 0;
for (uint32_t i = 0; i < arg->cli->txn_size; i++) { for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
if (arg->prelock_updates) { if (arg->prelock_updates) {
if (i == 0) { if (i == 0) {
...@@ -1146,9 +1180,9 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU( ...@@ -1146,9 +1180,9 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(
update_key = update_key % arg->cli->num_elements; update_key = update_key % arg->cli->num_elements;
} }
const int64_t max_key_in_table = arg->cli->num_elements - 1; const uint32_t max_key_in_table = arg->cli->num_elements - 1;
const bool range_wraps = (update_key + arg->cli->txn_size - 1) > max_key_in_table; const bool range_wraps = (update_key + arg->cli->txn_size - 1) > max_key_in_table;
int64_t left_key, right_key; int left_key, right_key;
DBT left_key_dbt, right_key_dbt; DBT left_key_dbt, right_key_dbt;
// acquire the range starting at the random key, plus txn_size - 1 // acquire the range starting at the random key, plus txn_size - 1
...@@ -1184,16 +1218,15 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU( ...@@ -1184,16 +1218,15 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(
} }
} else { } else {
update_key++; update_key++;
if (arg->bounded_element_range) {
update_key = update_key % arg->cli->num_elements;
}
} }
fill_key_buf(update_key, keybuf, arg->cli);
} else { } else {
// just do a usual, random point update without locking first // just do a usual, random point update without locking first
fill_key_buf_random(arg->random_data, keybuf, arg); update_key = myrandom_r(arg->random_data);
} }
if (arg->bounded_element_range) {
update_key = update_key % arg->cli->num_elements;
}
// the last update keeps the table's sum as zero // the last update keeps the table's sum as zero
// every other update except the last applies a random delta // every other update except the last applies a random delta
...@@ -1208,15 +1241,12 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU( ...@@ -1208,15 +1241,12 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(
curr_val_sum += extra.u.d.diff; curr_val_sum += extra.u.d.diff;
} }
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, &extra, sizeof extra);
// do the update // do the update
r = db->update( r = db->update(
db, db,
txn, txn,
&key, dbt_init(&key, &update_key, sizeof update_key),
&val, dbt_init(&val, &extra, sizeof extra),
update_flags update_flags
); );
if (r != 0) { if (r != 0) {
...@@ -1239,11 +1269,12 @@ static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_ext ...@@ -1239,11 +1269,12 @@ static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_ext
struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra); struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra);
assert(arg->bounded_element_range); assert(arg->bounded_element_range);
assert(op_args->update_history_buffer); assert(op_args->update_history_buffer);
int r;
int r = 0;
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index]; DB* db = arg->dbp[db_index];
int curr_val_sum = 0;
DBT key, val;
int rand_key;
struct update_op_extra extra; struct update_op_extra extra;
ZERO_STRUCT(extra); ZERO_STRUCT(extra);
extra.type = UPDATE_WITH_HISTORY; extra.type = UPDATE_WITH_HISTORY;
...@@ -1254,44 +1285,47 @@ static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_ext ...@@ -1254,44 +1285,47 @@ static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_ext
extra.pad_bytes = 500; extra.pad_bytes = 500;
} }
} }
DBT key, val;
uint8_t keybuf[arg->cli->key_size];
int64_t rand_key;
int64_t curr_val_sum = 0;
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, &extra, sizeof extra);
for (uint32_t i = 0; i < arg->cli->txn_size; i++) { for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
fill_key_buf_random(arg->random_data, keybuf, arg); rand_key = myrandom_r(arg->random_data) % arg->cli->num_elements;
int64_t *rkp = (int64_t *) keybuf; extra.u.h.new_val = myrandom_r(arg->random_data) % MAX_RANDOM_VAL;
rand_key = *rkp; // just make every other value random
invariant(rand_key < arg->cli->num_elements); if (i%2 == 0) {
if (i < arg->cli->txn_size - 1) { extra.u.h.new_val = -extra.u.h.new_val;
extra.u.h.new_val = myrandom_r(arg->random_data) % MAX_RANDOM_VAL;
// just make every other value random
if (i % 2 == 0) {
extra.u.h.new_val = -extra.u.h.new_val;
}
curr_val_sum += extra.u.h.new_val;
} else {
// the last update should ensure the sum stays zero
extra.u.h.new_val = -curr_val_sum;
} }
curr_val_sum += extra.u.h.new_val;
extra.u.h.expected = op_args->update_history_buffer[rand_key]; extra.u.h.expected = op_args->update_history_buffer[rand_key];
op_args->update_history_buffer[rand_key] = extra.u.h.new_val; op_args->update_history_buffer[rand_key] = extra.u.h.new_val;
r = db->update( r = db->update(
db, db,
txn, txn,
&key, dbt_init(&key, &rand_key, sizeof rand_key),
&val, dbt_init(&val, &extra, sizeof extra),
0 0
); );
if (r != 0) { if (r != 0) {
return r; return r;
} }
} }
//
// now put in one more to ensure that the sum stays 0
//
extra.u.h.new_val = -curr_val_sum;
rand_key = myrandom_r(arg->random_data);
if (arg->bounded_element_range) {
rand_key = rand_key % arg->cli->num_elements;
}
extra.u.h.expected = op_args->update_history_buffer[rand_key];
op_args->update_history_buffer[rand_key] = extra.u.h.new_val;
r = db->update(
db,
txn,
dbt_init(&key, &rand_key, sizeof rand_key),
dbt_init(&val, &extra, sizeof extra),
0
);
if (r != 0) {
return r;
}
return r; return r;
} }
...@@ -1317,7 +1351,7 @@ static int UU() hot_op(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra), ...@@ -1317,7 +1351,7 @@ static int UU() hot_op(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra),
int r; int r;
for (int i = 0; run_test && i < arg->cli->num_DBs; i++) { for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
DB* db = arg->dbp[i]; DB* db = arg->dbp[i];
r = db->hot_optimize(db, hot_progress_callback, nullptr); r = db->hot_optimize(db, hot_progress_callback, NULL);
if (run_test) { if (run_test) {
CKERR(r); CKERR(r);
} }
...@@ -1330,8 +1364,6 @@ get_ith_table_name(char *buf, size_t len, int i) { ...@@ -1330,8 +1364,6 @@ get_ith_table_name(char *buf, size_t len, int i) {
snprintf(buf, len, "main%d", i); snprintf(buf, len, "main%d", i);
} }
DB_TXN * const null_txn = 0;
static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int r; int r;
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
...@@ -1342,12 +1374,12 @@ static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operat ...@@ -1342,12 +1374,12 @@ static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operat
ZERO_ARRAY(name); ZERO_ARRAY(name);
get_ith_table_name(name, sizeof(name), db_index); get_ith_table_name(name, sizeof(name), db_index);
r = arg->env->dbremove(arg->env, null_txn, name, nullptr, 0); r = arg->env->dbremove(arg->env, null_txn, name, NULL, 0);
CKERR(r); CKERR(r);
r = db_create(&(arg->dbp[db_index]), arg->env, 0); r = db_create(&(arg->dbp[db_index]), arg->env, 0);
assert(r == 0); assert(r == 0);
r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, nullptr, DB_BTREE, DB_CREATE, 0666); r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, NULL, DB_BTREE, DB_CREATE, 0666);
assert(r == 0); assert(r == 0);
return 0; return 0;
} }
...@@ -1432,7 +1464,6 @@ struct sleep_and_crash_extra { ...@@ -1432,7 +1464,6 @@ struct sleep_and_crash_extra {
bool is_setup; bool is_setup;
bool threads_have_joined; bool threads_have_joined;
}; };
static void *sleep_and_crash(void *extra) { static void *sleep_and_crash(void *extra) {
sleep_and_crash_extra *e = static_cast<sleep_and_crash_extra *>(extra); sleep_and_crash_extra *e = static_cast<sleep_and_crash_extra *>(extra);
toku_mutex_lock(&e->mutex); toku_mutex_lock(&e->mutex);
...@@ -1477,7 +1508,7 @@ static int run_workers( ...@@ -1477,7 +1508,7 @@ static int run_workers(
int r; int r;
const struct perf_formatter *perf_formatter = &perf_formatters[cli_args->perf_output_format]; const struct perf_formatter *perf_formatter = &perf_formatters[cli_args->perf_output_format];
toku_mutex_t mutex = ZERO_MUTEX_INITIALIZER; toku_mutex_t mutex = ZERO_MUTEX_INITIALIZER;
toku_mutex_init(&mutex, nullptr); toku_mutex_init(&mutex, NULL);
struct rwlock rwlock; struct rwlock rwlock;
rwlock_init(&rwlock); rwlock_init(&rwlock);
toku_pthread_t tids[num_threads]; toku_pthread_t tids[num_threads];
...@@ -1502,11 +1533,11 @@ static int run_workers( ...@@ -1502,11 +1533,11 @@ static int run_workers(
worker_extra[i].operation_lock_mutex = &mutex; worker_extra[i].operation_lock_mutex = &mutex;
XCALLOC_N((int) NUM_OPERATION_TYPES, worker_extra[i].counters); XCALLOC_N((int) NUM_OPERATION_TYPES, worker_extra[i].counters);
TOKU_DRD_IGNORE_VAR(worker_extra[i].counters); TOKU_DRD_IGNORE_VAR(worker_extra[i].counters);
{ int chk_r = toku_pthread_create(&tids[i], nullptr, worker, &worker_extra[i]); CKERR(chk_r); } { int chk_r = toku_pthread_create(&tids[i], NULL, worker, &worker_extra[i]); CKERR(chk_r); }
if (verbose) if (verbose)
printf("%lu created\n", (unsigned long) tids[i]); printf("%lu created\n", (unsigned long) tids[i]);
} }
{ int chk_r = toku_pthread_create(&time_tid, nullptr, test_time, &tte); CKERR(chk_r); } { int chk_r = toku_pthread_create(&time_tid, NULL, test_time, &tte); CKERR(chk_r); }
if (verbose) if (verbose)
printf("%lu created\n", (unsigned long) time_tid); printf("%lu created\n", (unsigned long) time_tid);
...@@ -1629,22 +1660,131 @@ static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs, ...@@ -1629,22 +1660,131 @@ static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = db->set_readpagesize(db, env_args.basement_node_size); r = db->set_readpagesize(db, env_args.basement_node_size);
CKERR(r); CKERR(r);
const int flags = DB_CREATE | (cli_args->blackhole ? DB_BLACKHOLE : 0); const int flags = DB_CREATE | (cli_args->blackhole ? DB_BLACKHOLE : 0);
r = db->open(db, null_txn, name, nullptr, DB_BTREE, flags, 0666); r = db->open(db, null_txn, name, NULL, DB_BTREE, flags, 0666);
CKERR(r); CKERR(r);
db_res[i] = db; db_res[i] = db;
} }
return r; return r;
} }
static void report_overall_fill_table_progress(struct cli_args *args, int num_rows) { static int fill_table_from_fun(DB_ENV *env, DB *db, int num_elements, int key_bufsz, int val_bufsz,
void (*callback)(int idx, void *extra,
void *key, int *keysz,
void *val, int *valsz),
void *extra, void (*progress_cb)(int num_rows)) {
DB_TXN *txn = nullptr;
const int puts_per_txn = 100000;
int r = 0;
for (long i = 0; i < num_elements; ++i) {
if (txn == nullptr) {
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
}
char keybuf[key_bufsz], valbuf[val_bufsz];
memset(keybuf, 0, sizeof(keybuf));
memset(valbuf, 0, sizeof(valbuf));
int keysz, valsz;
callback(i, extra, keybuf, &keysz, valbuf, &valsz);
// let's make sure the data stored fits in the buffers we passed in
assert(keysz <= key_bufsz);
assert(valsz <= val_bufsz);
DBT key, val;
// make size of data what is specified w/input parameters
// note that key and val have sizes of
// key_bufsz and val_bufsz, which were passed into this
// function, not what was stored by the callback
r = db->put(
db,
txn,
dbt_init(&key, keybuf, key_bufsz),
dbt_init(&val, valbuf, val_bufsz),
// don't bother taking locks in the locktree
DB_PRELOCKED_WRITE
);
assert(r == 0);
if (i > 0 && i % puts_per_txn == 0) {
// don't bother fsyncing to disk.
// the caller can checkpoint if they want.
r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r);
txn = nullptr;
if (verbose) {
progress_cb(puts_per_txn);
}
}
}
if (txn) {
r = txn->commit(txn, DB_TXN_NOSYNC);
invariant_zero(r);
}
return r;
}
static uint32_t breverse(uint32_t v)
// Effect: return the bits in i, reversed
// Notes: implementation taken from http://graphics.stanford.edu/~seander/bithacks.html#BitReverseObvious
// Rationale: just a hack to spread out the keys during loading, doesn't need to be fast but does need to be correct.
{
uint32_t r = v; // r will be reversed bits of v; first get LSB of v
int s = sizeof(v) * CHAR_BIT - 1; // extra shift needed at end
for (v >>= 1; v; v >>= 1) {
r <<= 1;
r |= v & 1;
s--;
}
r <<= s; // shift when v's highest bits are zero
return r;
}
static void zero_element_callback(int idx, void *extra, void *keyv, int *keysz, void *valv, int *valsz) {
const bool *disperse_keys = static_cast<bool *>(extra);
int *CAST_FROM_VOIDP(key, keyv);
int *CAST_FROM_VOIDP(val, valv);
if (*disperse_keys) {
*key = static_cast<int>(breverse(idx));
} else {
*key = idx;
}
*val = 0;
*keysz = sizeof(int);
*valsz = sizeof(int);
}
struct fill_table_worker_info {
DB_ENV *env;
DB *db;
int num_elements;
uint32_t key_size;
uint32_t val_size;
bool disperse_keys;
void (*progress_cb)(int num_rows);
};
static void fill_table_worker(void *arg) {
struct fill_table_worker_info *CAST_FROM_VOIDP(info, arg);
int r = fill_table_from_fun(
info->env,
info->db,
info->num_elements,
info->key_size,
info->val_size,
zero_element_callback,
&info->disperse_keys,
info->progress_cb
);
invariant_zero(r);
toku_free(info);
}
static int num_tables_to_fill = 1;
static int rows_per_table = 1;
static void report_overall_fill_table_progress(int num_rows) {
// for sanitary reasons we'd like to prevent two threads // for sanitary reasons we'd like to prevent two threads
// from printing the same performance report twice. // from printing the same performance report twice.
static bool reporting; static bool reporting;
// when was the first time measurement taken? // when was the first time measurement taken?
static uint64_t t0; static uint64_t t0;
static int rows_inserted; static int rows_inserted;
// when was the last report? what was its progress? // when was the last report? what was its progress?
static uint64_t last_report; static uint64_t last_report;
static double last_progress; static double last_progress;
...@@ -1654,9 +1794,12 @@ static void report_overall_fill_table_progress(struct cli_args *args, int num_ro ...@@ -1654,9 +1794,12 @@ static void report_overall_fill_table_progress(struct cli_args *args, int num_ro
} }
uint64_t rows_so_far = toku_sync_add_and_fetch(&rows_inserted, num_rows); uint64_t rows_so_far = toku_sync_add_and_fetch(&rows_inserted, num_rows);
double progress = rows_so_far / (args->num_elements * args->num_DBs * 1.0); double progress = rows_so_far /
(rows_per_table * num_tables_to_fill * 1.0);
if (progress > (last_progress + .01)) { if (progress > (last_progress + .01)) {
uint64_t t1 = toku_current_time_microsec(); uint64_t t1 = toku_current_time_microsec();
// report no more often than once every 5 seconds, for less output.
// there is a race condition. it is probably harmless.
const uint64_t minimum_report_period = 5 * 1000000; const uint64_t minimum_report_period = 5 * 1000000;
if (t1 > last_report + minimum_report_period if (t1 > last_report + minimum_report_period
&& toku_sync_bool_compare_and_swap(&reporting, 0, 1) == 0) { && toku_sync_bool_compare_and_swap(&reporting, 0, 1) == 0) {
...@@ -1670,68 +1813,24 @@ static void report_overall_fill_table_progress(struct cli_args *args, int num_ro ...@@ -1670,68 +1813,24 @@ static void report_overall_fill_table_progress(struct cli_args *args, int num_ro
} }
} }
static void fill_single_table(DB_ENV *env, DB *db, struct cli_args *args, bool fill_with_zeroes) { static int fill_tables_with_zeroes(DB_ENV *env, DB **dbs, int num_DBs, int num_elements, uint32_t key_size, uint32_t val_size, bool disperse_keys) {
const int puts_per_txn = 100000; // set the static globals that the progress reporter uses
num_tables_to_fill = num_DBs;
int r = 0; rows_per_table = num_elements;
DB_TXN *txn = nullptr;
struct random_data random_data;
char random_buf[8];
r = myinitstate_r(random(), random_buf, 8, &random_data); CKERR(r);
uint8_t keybuf[args->key_size], valbuf[args->val_size];
memset(keybuf, 0, sizeof keybuf);
memset(valbuf, 0, sizeof valbuf);
DBT key, val;
dbt_init(&key, keybuf, args->key_size);
dbt_init(&val, valbuf, args->val_size);
for (int i = 0; i < args->num_elements; i++) {
if (txn == nullptr) {
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
}
fill_key_buf(i, keybuf, args);
if (!fill_with_zeroes) {
fill_val_buf_random(&random_data, valbuf, args);
}
r = db->put(db, txn, &key, &val, DB_PRELOCKED_WRITE); CKERR(r);
if (i > 0 && i % puts_per_txn == 0) {
// don't bother fsyncing to disk.
// the caller can checkpoint if they want.
r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r);
txn = nullptr;
if (verbose) {
report_overall_fill_table_progress(args, puts_per_txn);
}
}
}
if (txn) {
r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r);
}
}
struct fill_table_worker_info {
struct cli_args *args;
DB_ENV *env;
DB *db;
bool fill_with_zeroes;
};
static void fill_table_worker(void *arg) {
struct fill_table_worker_info *CAST_FROM_VOIDP(info, arg);
fill_single_table(info->env, info->db, info->args, info->fill_with_zeroes);
toku_free(info);
}
static int fill_tables(DB_ENV *env, DB **dbs, struct cli_args *args, bool fill_with_zeroes) {
const int num_cores = toku_os_get_number_processors(); const int num_cores = toku_os_get_number_processors();
KIBBUTZ kibbutz = toku_kibbutz_create(num_cores); KIBBUTZ kibbutz = toku_kibbutz_create(num_cores);
for (int i = 0; i < args->num_DBs; i++) { for (int i = 0; i < num_DBs; i++) {
assert(key_size >= sizeof(int));
assert(val_size >= sizeof(int));
struct fill_table_worker_info *XCALLOC(info); struct fill_table_worker_info *XCALLOC(info);
info->env = env; info->env = env;
info->db = dbs[i]; info->db = dbs[i];
info->args = args; info->num_elements = num_elements;
info->fill_with_zeroes = fill_with_zeroes; info->key_size = key_size;
info->val_size = val_size;
info->disperse_keys = disperse_keys;
info->progress_cb = report_overall_fill_table_progress;
toku_kibbutz_enq(kibbutz, fill_table_worker, info); toku_kibbutz_enq(kibbutz, fill_table_worker, info);
} }
toku_kibbutz_destroy(kibbutz); toku_kibbutz_destroy(kibbutz);
...@@ -1766,6 +1865,7 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, ...@@ -1766,6 +1865,7 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
int r; int r;
struct env_args env_args = cli_args->env_args; struct env_args env_args = cli_args->env_args;
/* create the dup database file */
DB_ENV *env; DB_ENV *env;
db_env_set_num_bucket_mutexes(env_args.num_bucket_mutexes); db_env_set_num_bucket_mutexes(env_args.num_bucket_mutexes);
r = db_env_create(&env, 0); assert(r == 0); r = db_env_create(&env, 0); assert(r == 0);
...@@ -1773,6 +1873,7 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, ...@@ -1773,6 +1873,7 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = env->set_default_bt_compare(env, bt_compare); CKERR(r); r = env->set_default_bt_compare(env, bt_compare); CKERR(r);
r = env->set_lk_max_memory(env, env_args.lk_max_memory); CKERR(r); r = env->set_lk_max_memory(env, env_args.lk_max_memory); CKERR(r);
env->set_update(env, env_args.update_function); env->set_update(env, env_args.update_function);
// set the cache size to 10MB
r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r); r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
r = env->set_lg_bsize(env, env_args.rollback_node_size); CKERR(r); r = env->set_lg_bsize(env, env_args.rollback_node_size); CKERR(r);
if (env_args.generate_put_callback) { if (env_args.generate_put_callback) {
...@@ -1803,7 +1904,7 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, ...@@ -1803,7 +1904,7 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = db_create(&db, env, 0); r = db_create(&db, env, 0);
CKERR(r); CKERR(r);
const int flags = cli_args->blackhole ? DB_BLACKHOLE : 0; const int flags = cli_args->blackhole ? DB_BLACKHOLE : 0;
r = db->open(db, null_txn, name, nullptr, DB_BTREE, flags, 0666); r = db->open(db, null_txn, name, NULL, DB_BTREE, flags, 0666);
CKERR(r); CKERR(r);
db_res[i] = db; db_res[i] = db;
} }
...@@ -1831,8 +1932,8 @@ static const struct env_args DEFAULT_ENV_ARGS = { ...@@ -1831,8 +1932,8 @@ static const struct env_args DEFAULT_ENV_ARGS = {
.num_bucket_mutexes = 1024, .num_bucket_mutexes = 1024,
.envdir = ENVDIR, .envdir = ENVDIR,
.update_function = update_op_callback, .update_function = update_op_callback,
.generate_put_callback = nullptr, .generate_put_callback = NULL,
.generate_del_callback = nullptr, .generate_del_callback = NULL,
}; };
static const struct env_args DEFAULT_PERF_ENV_ARGS = { static const struct env_args DEFAULT_PERF_ENV_ARGS = {
...@@ -1846,11 +1947,15 @@ static const struct env_args DEFAULT_PERF_ENV_ARGS = { ...@@ -1846,11 +1947,15 @@ static const struct env_args DEFAULT_PERF_ENV_ARGS = {
.cachetable_size = 1<<30, .cachetable_size = 1<<30,
.num_bucket_mutexes = 1024 * 1024, .num_bucket_mutexes = 1024 * 1024,
.envdir = ENVDIR, .envdir = ENVDIR,
.update_function = nullptr, .update_function = NULL,
.generate_put_callback = nullptr, .generate_put_callback = NULL,
.generate_del_callback = nullptr, .generate_del_callback = NULL,
}; };
#define MIN_VAL_SIZE sizeof(int)
#define MIN_KEY_SIZE sizeof(int)
#define MIN_COMPRESSIBILITY (0.0)
#define MAX_COMPRESSIBILITY (1.0)
static struct cli_args UU() get_default_args(void) { static struct cli_args UU() get_default_args(void) {
struct cli_args DEFAULT_ARGS = { struct cli_args DEFAULT_ARGS = {
.num_elements = 150000, .num_elements = 150000,
...@@ -2222,7 +2327,7 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -2222,7 +2327,7 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
#define LOCAL_STRING_ARG(name_string, variable, default) \ #define LOCAL_STRING_ARG(name_string, variable, default) \
MAKE_LOCAL_ARG(name_string, type_string, s, default, variable, "", "", "") MAKE_LOCAL_ARG(name_string, type_string, s, default, variable, "", "", "")
const char *perf_format_s = nullptr; const char *perf_format_s = NULL;
struct arg_type arg_types[] = { struct arg_type arg_types[] = {
INT32_ARG_NONNEG("--num_elements", num_elements, ""), INT32_ARG_NONNEG("--num_elements", num_elements, ""),
INT32_ARG_NONNEG("--num_DBs", num_DBs, ""), INT32_ARG_NONNEG("--num_DBs", num_DBs, ""),
...@@ -2334,15 +2439,20 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -2334,15 +2439,20 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
} }
} }
} }
if (perf_format_s != nullptr) { if (perf_format_s != NULL) {
if (!strcmp(perf_format_s, "human")) { if (!strcmp(perf_format_s, "human")) {
args->perf_output_format = HUMAN; args->perf_output_format = HUMAN;
} else if (!strcmp(perf_format_s, "csv")) { } else if (!strcmp(perf_format_s, "csv")) {
args->perf_output_format = CSV; args->perf_output_format = CSV;
} else if (!strcmp(perf_format_s, "tsv")) { } else if (!strcmp(perf_format_s, "tsv")) {
args->perf_output_format = TSV; args->perf_output_format = TSV;
#if 0
} else if (!strcmp(perf_format_s, "gnuplot")) {
args->perf_output_format = GNUPLOT;
#endif
} else { } else {
fprintf(stderr, "valid values for --perf_format are \"human\", \"csv\", and \"tsv\"\n"); fprintf(stderr, "valid values for --perf_format are \"human\", \"csv\", and \"tsv\"\n");
//fprintf(stderr, "valid values for --perf_format are \"human\", \"csv\", \"tsv\", and \"gnuplot\"\n");
do_usage(argv0, num_arg_types, arg_types); do_usage(argv0, num_arg_types, arg_types);
exit(EINVAL); exit(EINVAL);
} }
...@@ -2358,24 +2468,35 @@ static void ...@@ -2358,24 +2468,35 @@ static void
stress_table(DB_ENV *, DB **, struct cli_args *); stress_table(DB_ENV *, DB **, struct cli_args *);
static int static int
stress_dbt_cmp(DB *db, const DBT *a, const DBT *b) { UU() stress_int_dbt_cmp (DB *db, const DBT *a, const DBT *b) {
assert(db && a && b); assert(db && a && b);
assert(a->size >= sizeof(int));
assert(b->size >= sizeof(int));
int x = *(int *) a->data;
int y = *(int *) b->data;
if (x<y) return -1;
if (x>y) return 1;
return 0;
}
// Keys are only compared by their first 8 bytes, static int
// interpreted as a little endian 64 bit integer.. UU() stress_uint64_dbt_cmp(DB *db, const DBT *a, const DBT *b) {
assert(a->size >= sizeof(int64_t)); assert(db && a && b);
assert(b->size >= sizeof(int64_t)); assert(a->size >= sizeof(uint64_t));
assert(b->size >= sizeof(uint64_t));
int64_t x = *(int64_t *) a->data; uint64_t x = *(uint64_t *) a->data;
int64_t y = *(int64_t *) b->data; uint64_t y = *(uint64_t *) b->data;
if (x < y) { if (x < y) {
return -1; return -1;
} else if (x > y) { }
if (x > y) {
return +1; return +1;
} else {
return 0;
} }
return 0;
} }
...@@ -2391,49 +2512,21 @@ do_warm_cache(DB_ENV *env, DB **dbs, struct cli_args *args) ...@@ -2391,49 +2512,21 @@ do_warm_cache(DB_ENV *env, DB **dbs, struct cli_args *args)
scan_arg.operation_extra = &soe; scan_arg.operation_extra = &soe;
scan_arg.operation = scan_op_no_check; scan_arg.operation = scan_op_no_check;
scan_arg.lock_type = STRESS_LOCK_NONE; scan_arg.lock_type = STRESS_LOCK_NONE;
DB_TXN* txn = nullptr; DB_TXN* txn = NULL;
// don't take serializable read locks when scanning. // don't take serializable read locks when scanning.
int r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT); CKERR(r); int r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT); CKERR(r);
// make sure the scan doesn't terminate early // make sure the scan doesn't terminate early
run_test = true; run_test = true;
// warm up each DB in parallel // warm up each DB in parallel
scan_op_no_check_parallel(txn, &scan_arg, &soe, nullptr); scan_op_no_check_parallel(txn, &scan_arg, &soe, NULL);
r = txn->commit(txn,0); CKERR(r); r = txn->commit(txn,0); CKERR(r);
} }
static void static void
UU() stress_recover(struct cli_args *args) { UU() stress_test_main_with_cmp(struct cli_args *args, int (*bt_compare)(DB *, const DBT *, const DBT *))
DB_ENV* env = nullptr;
DB* dbs[args->num_DBs];
memset(dbs, 0, sizeof(dbs));
{ int chk_r = open_tables(&env,
dbs,
args->num_DBs,
stress_dbt_cmp,
args); CKERR(chk_r); }
DB_TXN* txn = nullptr;
struct arg recover_args;
arg_init(&recover_args, dbs, env, args);
int r = env->txn_begin(env, 0, &txn, recover_args.txn_type);
CKERR(r);
struct scan_op_extra soe;
soe.fast = true;
soe.fwd = true;
soe.prefetch = false;
// make sure the scan doesn't terminate early
run_test = true;
r = scan_op(txn, &recover_args, &soe, nullptr);
CKERR(r);
{ int chk_r = txn->commit(txn,0); CKERR(chk_r); }
{ int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
}
static void
test_main(struct cli_args *args, bool fill_with_zeroes)
{ {
{ char *loc = setlocale(LC_NUMERIC, "en_US.UTF-8"); assert(loc); } { char *loc = setlocale(LC_NUMERIC, "en_US.UTF-8"); assert(loc); }
DB_ENV* env = nullptr; DB_ENV* env = NULL;
DB* dbs[args->num_DBs]; DB* dbs[args->num_DBs];
memset(dbs, 0, sizeof(dbs)); memset(dbs, 0, sizeof(dbs));
db_env_enable_engine_status(args->nocrashstatus ? false : true); db_env_enable_engine_status(args->nocrashstatus ? false : true);
...@@ -2442,17 +2535,17 @@ test_main(struct cli_args *args, bool fill_with_zeroes) ...@@ -2442,17 +2535,17 @@ test_main(struct cli_args *args, bool fill_with_zeroes)
&env, &env,
dbs, dbs,
args->num_DBs, args->num_DBs,
stress_dbt_cmp, bt_compare,
args args
); );
{ int chk_r = fill_tables(env, dbs, args, fill_with_zeroes); CKERR(chk_r); } { int chk_r = fill_tables_with_zeroes(env, dbs, args->num_DBs, args->num_elements, args->key_size, args->val_size, args->disperse_keys); CKERR(chk_r); }
{ int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); } { int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
} }
if (!args->only_create) { if (!args->only_create) {
{ int chk_r = open_tables(&env, { int chk_r = open_tables(&env,
dbs, dbs,
args->num_DBs, args->num_DBs,
stress_dbt_cmp, bt_compare,
args); CKERR(chk_r); } args); CKERR(chk_r); }
if (args->warm_cache) { if (args->warm_cache) {
do_warm_cache(env, dbs, args); do_warm_cache(env, dbs, args);
...@@ -2463,17 +2556,37 @@ test_main(struct cli_args *args, bool fill_with_zeroes) ...@@ -2463,17 +2556,37 @@ test_main(struct cli_args *args, bool fill_with_zeroes)
} }
static void static void
UU() stress_test_main(struct cli_args *args) { UU() stress_test_main(struct cli_args *args)
// Begin the test with fixed size values equal to zero. {
// This is important for correctness testing. stress_test_main_with_cmp(args, stress_int_dbt_cmp);
test_main(args, true);
} }
static void static void
UU() perf_test_main(struct cli_args *args) { UU() stress_recover(struct cli_args *args) {
// Do not begin the test by creating a table of all zeroes. DB_ENV* env = NULL;
// We want to control the row size and its compressibility. DB* dbs[args->num_DBs];
test_main(args, false); memset(dbs, 0, sizeof(dbs));
{ int chk_r = open_tables(&env,
dbs,
args->num_DBs,
stress_int_dbt_cmp,
args); CKERR(chk_r); }
DB_TXN* txn = NULL;
struct arg recover_args;
arg_init(&recover_args, dbs, env, args);
int r = env->txn_begin(env, 0, &txn, recover_args.txn_type);
CKERR(r);
struct scan_op_extra soe;
soe.fast = true;
soe.fwd = true;
soe.prefetch = false;
// make sure the scan doesn't terminate early
run_test = true;
r = scan_op(txn, &recover_args, &soe, NULL);
CKERR(r);
{ int chk_r = txn->commit(txn,0); CKERR(chk_r); }
{ int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
} }
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment