Commit e0796d87 authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

fixes #5702 finalize perf_iibench by adding a true pk + 3 secondary key...

fixes #5702 finalize perf_iibench by adding a true pk + 3 secondary key schema. the primary key is 8 bytes and stores 3 columns as value data. secondary keys are 16 bytes (one column plus the pk) and store no value data (non-clustering)


git-svn-id: file:///svn/toku/tokudb@52482 c7de825b-a66e-492c-adef-691d508d4ae1
parent cbd13675
......@@ -10,56 +10,113 @@
#include "threaded_stress_test_helpers.h"
//
// This test tries to emulate iibench at the ydb layer. There is one
// unique index with an auto-increment key, plus several non-unique
// secondary indexes with random keys.
// This test tries to emulate iibench at the ydb layer.
//
// The schema is simple:
// 8 byte primary key
// 8 byte key A
// 8 byte key B
// 8 byte key C
//
// There's one primary DB for the pk and three secondary DBs.
//
// The primary key stores the other columns as the value.
// The secondary keys have the primary key appended to them.
//
static const int iibench_num_dbs = 4;
static const size_t iibench_secondary_key_size = sizeof(uint64_t) * 2;
struct iibench_row {
int64_t pk;
int64_t a;
int64_t b;
int64_t c;
};
static int64_t hash(int64_t key) {
int64_t hash = 0;
char *buf = (char *) &key;
for (int i = 0; i < 8; i++) {
hash += (((buf[i] + 1) * 17) & 0xFF) << (i * 8);
}
return hash;
}
static void iibench_generate_secondary_keys(int64_t pk, struct iibench_row *row) {
row->a = hash(pk);
row->b = hash(pk * 2);
row->c = hash(pk * 3);
}
static void iibench_verify_row(struct iibench_row *row) {
(void) iibench_verify_row;
struct iibench_row expected_row;
iibench_generate_secondary_keys(row->pk, &expected_row);
invariant(row->a == expected_row.a);
invariant(row->b == expected_row.b);
invariant(row->c == expected_row.c);
}
static void iibench_fill_key_buf(int64_t pk, int64_t *buf) {
memcpy(&buf[0], &pk, sizeof(int64_t));
}
static void iibench_fill_val_buf(int64_t pk, int64_t *buf) {
struct iibench_row row;
iibench_generate_secondary_keys(pk, &row);
memcpy(&buf[0], &row.a, sizeof(row.a));
memcpy(&buf[1], &row.b, sizeof(row.b));
memcpy(&buf[2], &row.c, sizeof(row.c));
}
struct iibench_op_extra {
uint64_t autoincrement;
};
static int UU() iibench_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void *stats_extra) {
const int num_dbs = arg->cli->num_DBs;
DB **dbs = arg->dbp;
DB_ENV *env = arg->env;
DBT mult_key_dbt[num_dbs];
DBT mult_val_dbt[num_dbs];
uint32_t mult_put_flags[num_dbs];
DBT mult_key_dbt[iibench_num_dbs];
DBT mult_val_dbt[iibench_num_dbs];
uint32_t mult_put_flags[iibench_num_dbs];
memset(mult_key_dbt, 0, sizeof(mult_key_dbt));
memset(mult_val_dbt, 0, sizeof(mult_val_dbt));
// The first index is unique with serial autoincrement keys.
// The rest are have keys generated with this thread's random data.
mult_put_flags[0] = get_put_flags(arg->cli) | DB_NOOVERWRITE;
dbs[0]->app_private = nullptr;
for (int i = 1; i < num_dbs; i++) {
for (int i = 1; i < iibench_num_dbs; i++) {
// Secondary keys have the primary key appended to them.
mult_key_dbt[i].size = iibench_secondary_key_size;
mult_key_dbt[i].data = toku_xmalloc(iibench_secondary_key_size);
mult_key_dbt[i].flags = DB_DBT_REALLOC;
mult_key_dbt[i].size = sizeof(uint64_t);
mult_key_dbt[i].data = toku_xmalloc(mult_key_dbt[i].size);
mult_put_flags[i] = get_put_flags(arg->cli);
dbs[i]->app_private = arg->random_data;
}
int r = 0;
uint8_t keybuf[arg->cli->key_size];
uint8_t valbuf[arg->cli->val_size];
dbt_init(&mult_key_dbt[0], keybuf, sizeof keybuf);
dbt_init(&mult_val_dbt[0], valbuf, sizeof valbuf);
uint64_t puts_to_increment = 0;
for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
struct iibench_op_extra *CAST_FROM_VOIDP(info, operation_extra);
// Get a random primary key, generate secondary key columns in valbuf
uint64_t pk = toku_sync_fetch_and_add(&info->autoincrement, 1);
fill_key_buf(pk, keybuf, arg->cli);
fill_val_buf_random(arg->random_data, valbuf, arg->cli);
int64_t keybuf[1];
int64_t valbuf[3];
iibench_fill_key_buf(pk, keybuf);
iibench_fill_val_buf(pk, valbuf);
dbt_init(&mult_key_dbt[0], keybuf, sizeof keybuf);
dbt_init(&mult_val_dbt[0], valbuf, sizeof valbuf);
r = env->put_multiple(
env,
dbs[0], // source db.
txn,
&mult_key_dbt[0], // source db key
&mult_val_dbt[0], // source db value
num_dbs, // total number of dbs
iibench_num_dbs, // total number of dbs
dbs, // array of dbs
mult_key_dbt, // array of keys
mult_val_dbt, // array of values
......@@ -76,14 +133,14 @@ static int UU() iibench_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void
}
cleanup:
for (int i = 1; i < num_dbs; i++) {
for (int i = 1; i < iibench_num_dbs; i++) {
toku_free(mult_key_dbt[i].data);
}
return r;
}
static void
stress_table(DB_ENV* env, DB** UU(dbp), struct cli_args *cli_args) {
stress_table(DB_ENV* env, DB **dbs, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_put_threads;
struct iibench_op_extra iib_extra = {
......@@ -91,23 +148,52 @@ stress_table(DB_ENV* env, DB** UU(dbp), struct cli_args *cli_args) {
};
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbp, env, cli_args);
arg_init(&myargs[i], dbs, env, cli_args);
myargs[i].operation = iibench_put_op;
myargs[i].operation_extra = &iib_extra;
}
for (int i = 0; i < iibench_num_dbs; i++) {
DB *db = dbs[i];
DBT desc_dbt;
desc_dbt.data = &i;
desc_dbt.size = sizeof(i);
desc_dbt.ulen = 0;
desc_dbt.flags = 0;
int r = db->change_descriptor(db, nullptr, &desc_dbt, 0);
invariant_zero(r);
}
// Close and reopen the tables to get the descriptors to change properly
close_and_reopen_tables(env, dbs, cli_args);
const bool crash_at_end = false;
run_workers(myargs, num_threads, cli_args->num_seconds, crash_at_end, cli_args);
}
static int iibench_generate_row_for_put(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *UU(src_key), const DBT *src_val) {
DESCRIPTOR desc = dest_db->cmp_descriptor;
invariant(src_db != dest_db);
invariant(dest_db->app_private != nullptr);
invariant(dest_key->size == sizeof(uint64_t));
invariant_notnull(src_key->data);
invariant(src_key->size == sizeof(int64_t));
invariant(dest_key->size == iibench_secondary_key_size);
invariant(dest_key->flags == DB_DBT_REALLOC);
struct random_data *CAST_FROM_VOIDP(r_data, dest_db->app_private);
uint64_t key = randu64(r_data);
memcpy(dest_key->data, &key, sizeof(key));
invariant_notnull(desc->dbt.data);
invariant(desc->dbt.size == sizeof(int));
// Get the column index from the descriptor. This is a secondary index
// so it has to be greater than zero (which would be the pk). Then
// grab the appropriate secondary key from the source val, which is
// an array of the 3 columns, so we have to subtract 1 from the index.
int column_index;
memcpy(&column_index, desc->dbt.data, desc->dbt.size);
invariant(column_index > 0 && column_index < 4);
int64_t *CAST_FROM_VOIDP(columns, src_val->data);
int64_t secondary_key = columns[column_index - 1];
// First write down the secondary key, then the primary key (in src_key)
int64_t *CAST_FROM_VOIDP(dest_key_buf, dest_key->data);
memcpy(&dest_key_buf[0], &secondary_key, sizeof(secondary_key));
memcpy(&dest_key_buf[1], src_key->data, src_key->size);
*dest_val = *src_val;
return 0;
}
......@@ -116,12 +202,13 @@ int
test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf();
args.num_elements = 0; // want to start with empty DBs
// In MySQL, iibench has one primary key and three secondaries and does 1k inserts per txn.
args.num_DBs = 4;
// Puts per transaction is configurable. It defaults to 1k.
args.txn_size = 1000;
args.key_size = 8;
args.val_size = 8;
parse_stress_test_args(argc, argv, &args);
// The index count and schema are not configurable. Silently ignore whatever was passed in.
args.num_DBs = 4;
args.key_size = 8;
args.val_size = 32;
// when there are multiple threads, its valid for two of them to
// generate the same key and one of them fail with DB_LOCK_NOTGRANTED
if (args.num_put_threads > 1) {
......
......@@ -2628,6 +2628,19 @@ UU() stress_recover(struct cli_args *args) {
{ int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
}
static void
close_and_reopen_tables(DB_ENV *env, DB **dbs, struct cli_args *args) {
{ int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
{ int chk_r = open_tables(&env,
dbs,
args->num_DBs,
stress_cmp,
args); CKERR(chk_r); }
if (args->warm_cache) {
do_warm_cache(env, dbs, args);
}
}
static void
test_main(struct cli_args *args, bool fill_with_zeroes)
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment