Commit 01068d0c authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

fixes #4752 support blackhole option in tokudb, add it to the stress test...

fixes #4752 support blackhole option in tokudb, add it to the stress test framework as well as an option for prelocked write. we also fix the flags api in the ft layer to return void since they can't fail.


git-svn-id: file:///svn/toku/tokudb@46644 c7de825b-a66e-492c-adef-691d508d4ae1
parent 16b31e15
......@@ -131,6 +131,9 @@ static void print_defines (void) {
dodefine(DB_ARCH_ABS);
dodefine(DB_ARCH_LOG);
#define DB_BLACKHOLE 0x0001000 /* unused common bit according to BDB */
dodefine(DB_BLACKHOLE);
dodefine(DB_CREATE);
dodefine(DB_CXX_NO_EXCEPTIONS);
dodefine(DB_EXCL);
......
......@@ -467,6 +467,9 @@ struct ft {
int panic;
// A malloced string that can indicate what went wrong.
char *panic_string;
// is this ft a blackhole? if so, all messages are dropped.
bool blackhole;
};
// Copy the descriptor into a temporary variable, and tell DRD that subsequent code happens after reading that pointer.
......
......@@ -2453,16 +2453,21 @@ static void push_something_at_root (FT h, FTNODE *nodep, FT_MSG cmd)
}
int
toku_ft_root_put_cmd (FT h, FT_MSG_S * cmd)
toku_ft_root_put_cmd (FT ft, FT_MSG_S * cmd)
// Effect:
// - assign msn to cmd
// - push the cmd into the brt
// - cmd will set new msn in tree
{
// blackhole fractal trees drop all messages, so do nothing.
if (ft->blackhole) {
return 0;
}
FTNODE node;
CACHEKEY root_key;
//assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable));
assert(h);
assert(ft);
//
// As of Dr. Noga, the following code is currently protected by two locks:
// - the ydb lock
......@@ -2492,16 +2497,16 @@ toku_ft_root_put_cmd (FT h, FT_MSG_S * cmd)
// others
//
{
toku_ft_grab_treelock(h);
toku_ft_grab_treelock(ft);
uint32_t fullhash;
toku_calculate_root_offset_pointer(h, &root_key, &fullhash);
toku_calculate_root_offset_pointer(ft, &root_key, &fullhash);
// get the root node
struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, h);
fill_bfe_for_full_read(&bfe, ft);
toku_pin_ftnode_off_client_thread(
h,
ft,
root_key,
fullhash,
&bfe,
......@@ -2520,28 +2525,28 @@ toku_ft_root_put_cmd (FT h, FT_MSG_S * cmd)
//VERIFY_NODE(brt, node);
assert(node->fullhash==fullhash);
ft_verify_flags(h, node);
ft_verify_flags(ft, node);
// first handle a reactive root, then put in the message
CACHEKEY new_root_key;
bool root_changed = ft_process_maybe_reactive_root(h, &new_root_key, &node);
bool root_changed = ft_process_maybe_reactive_root(ft, &new_root_key, &node);
if (root_changed) {
toku_ft_set_new_root_blocknum(h, new_root_key);
toku_ft_set_new_root_blocknum(ft, new_root_key);
}
toku_ft_release_treelock(h);
toku_ft_release_treelock(ft);
}
push_something_at_root(h, &node, cmd);
push_something_at_root(ft, &node, cmd);
// verify that msn of latest message was captured in root node (push_something_at_root() did not release ydb lock)
invariant(cmd->msn.msn == node->max_msn_applied_to_node_on_disk.msn);
// if we call flush_some_child, then that function unpins the root
// otherwise, we unpin ourselves
if (node->height > 0 && toku_ft_nonleaf_is_gorged(node)) {
flush_node_on_background_thread(h, node);
flush_node_on_background_thread(ft, node);
}
else {
toku_unpin_ftnode(h, node); // unpin root
toku_unpin_ftnode(ft, node); // unpin root
}
return 0;
......@@ -3389,17 +3394,13 @@ toku_ft_get_dictionary_id(FT_HANDLE brt) {
return dict_id;
}
int toku_ft_set_flags(FT_HANDLE brt, unsigned int flags) {
assert(flags==(flags&TOKU_DB_KEYCMP_BUILTIN)); // make sure there are no extraneous flags
brt->did_set_flags = true;
brt->options.flags = flags;
return 0;
void toku_ft_set_flags(FT_HANDLE ft_handle, unsigned int flags) {
ft_handle->did_set_flags = true;
ft_handle->options.flags = flags;
}
int toku_ft_get_flags(FT_HANDLE brt, unsigned int *flags) {
*flags = brt->options.flags;
assert(brt->options.flags==(brt->options.flags&TOKU_DB_KEYCMP_BUILTIN)); // make sure there are no extraneous flags
return 0;
void toku_ft_get_flags(FT_HANDLE ft_handle, unsigned int *flags) {
*flags = ft_handle->options.flags;
}
void toku_ft_get_maximum_advised_key_value_lengths (unsigned int *max_key_len, unsigned int *max_val_len)
......@@ -3456,7 +3457,6 @@ void toku_ft_set_redirect_callback(FT_HANDLE brt, on_redirect_callback redir_cb,
brt->redirect_callback_extra = extra;
}
int toku_ft_set_update(FT_HANDLE brt, ft_update_func update_fun) {
brt->options.update_fun = update_fun;
return 0;
......
......@@ -41,8 +41,8 @@ int toku_ft_change_descriptor(FT_HANDLE t, const DBT* old_descriptor, const DBT*
uint32_t toku_serialize_descriptor_size(const DESCRIPTOR desc);
int toku_ft_handle_create(FT_HANDLE *) __attribute__ ((warn_unused_result));
int toku_ft_set_flags(FT_HANDLE, unsigned int flags) __attribute__ ((warn_unused_result));
int toku_ft_get_flags(FT_HANDLE, unsigned int *flags) __attribute__ ((warn_unused_result));
void toku_ft_set_flags(FT_HANDLE, unsigned int flags);
void toku_ft_get_flags(FT_HANDLE, unsigned int *flags);
void toku_ft_handle_set_nodesize(FT_HANDLE, unsigned int nodesize);
void toku_ft_handle_get_nodesize(FT_HANDLE, unsigned int *nodesize);
void toku_ft_get_maximum_advised_key_value_lengths(unsigned int *klimit, unsigned int *vlimit);
......
......@@ -1055,4 +1055,7 @@ void toku_ft_get_compression_method(FT ft, enum toku_compression_method *methodp
toku_ft_unlock(ft);
}
// mark the ft as a blackhole. any message injections will be a no op.
void toku_ft_set_blackhole(FT_HANDLE ft_handle) {
ft_handle->ft->blackhole = true;
}
......@@ -109,5 +109,7 @@ void toku_ft_get_basementnodesize(FT ft, unsigned int *basementnodesize);
void toku_ft_set_compression_method(FT ft, enum toku_compression_method method);
void toku_ft_get_compression_method(FT ft, enum toku_compression_method *methodp);
// mark the ft as a blackhole. any message injections will be a no op.
void toku_ft_set_blackhole(FT_HANDLE ft_handle);
#endif
......@@ -273,8 +273,7 @@ static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, bool must_create
r = toku_ft_handle_create(&brt);
assert(r == 0);
r = toku_ft_set_flags(brt, treeflags);
assert(r == 0);
toku_ft_set_flags(brt, treeflags);
if (nodesize != 0) {
toku_ft_handle_set_nodesize(brt, nodesize);
......
......@@ -66,6 +66,7 @@ struct env_args {
int checkpointing_period;
int cleaner_period;
int cleaner_iterations;
int64_t lk_max_memory;
uint64_t cachetable_size;
const char *envdir;
test_update_callback_f update_function; // update callback function
......@@ -112,6 +113,8 @@ struct cli_args {
struct env_args env_args; // specifies environment variables
bool single_txn;
bool warm_cache; // warm caches before running stress_table
bool blackhole; // all message injects are no-ops. helps measure txn/logging/locktree overhead.
bool prelocked_write; // use prelocked_write flag for insertions, avoiding the locktree
};
struct arg {
......@@ -701,7 +704,8 @@ static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, void *stats_extra) {
DBT key, val;
dbt_init(&key, &rand_key_b, sizeof rand_key_b);
dbt_init(&val, valbuf, sizeof valbuf);
r = db->put(db, txn, &key, &val, 0);
int flags = arg->cli->prelocked_write ? DB_PRELOCKED_WRITE : 0;
r = db->put(db, txn, &key, &val, flags);
if (r != 0) {
goto cleanup;
}
......@@ -814,7 +818,8 @@ static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void
DBT key, val;
dbt_init(&key, &rand_key_b, sizeof rand_key_b);
dbt_init(&val, valbuf, sizeof valbuf);
r = db->put(db, txn, &key, &val, 0);
int flags = arg->cli->prelocked_write ? DB_PRELOCKED_WRITE : 0;
r = db->put(db, txn, &key, &val, flags);
if (r != 0) {
goto cleanup;
}
......@@ -1398,9 +1403,10 @@ static int run_workers(
static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
int (*bt_compare)(DB *, const DBT *, const DBT *),
struct env_args env_args
struct cli_args *cli_args
) {
int r;
struct env_args env_args = cli_args->env_args;
char rmcmd[32 + strlen(env_args.envdir)]; sprintf(rmcmd, "rm -rf %s", env_args.envdir);
r = system(rmcmd);
......@@ -1411,6 +1417,7 @@ static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = db_env_create(&env, 0); assert(r == 0);
r = env->set_redzone(env, 0); CKERR(r);
r = env->set_default_bt_compare(env, bt_compare); CKERR(r);
r = env->set_lk_max_memory(env, env_args.lk_max_memory); CKERR(r);
r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
if (env_args.generate_put_callback) {
r = env->set_generate_row_callback_for_put(env, env_args.generate_put_callback);
......@@ -1430,7 +1437,6 @@ static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r);
*env_res = env;
for (int i = 0; i < num_DBs; i++) {
DB *db;
char name[30];
......@@ -1444,7 +1450,8 @@ static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
CKERR(r);
r = db->set_readpagesize(db, env_args.basement_node_size);
CKERR(r);
r = db->open(db, null_txn, name, NULL, DB_BTREE, DB_CREATE, 0666);
const int flags = DB_CREATE | (cli_args->blackhole ? DB_BLACKHOLE : 0);
r = db->open(db, null_txn, name, NULL, DB_BTREE, flags, 0666);
CKERR(r);
db_res[i] = db;
}
......@@ -1533,14 +1540,16 @@ static void do_xa_recovery(DB_ENV* env) {
static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
int (*bt_compare)(DB *, const DBT *, const DBT *),
struct env_args env_args) {
struct cli_args *cli_args) {
int r;
struct env_args env_args = cli_args->env_args;
/* create the dup database file */
DB_ENV *env;
r = db_env_create(&env, 0); assert(r == 0);
r = env->set_redzone(env, 0); CKERR(r);
r = env->set_default_bt_compare(env, bt_compare); CKERR(r);
r = env->set_lk_max_memory(env, env_args.lk_max_memory); CKERR(r);
env->set_update(env, env_args.update_function);
// set the cache size to 10MB
r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
......@@ -1571,7 +1580,8 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
get_ith_table_name(name, sizeof(name), i);
r = db_create(&db, env, 0);
CKERR(r);
r = db->open(db, null_txn, name, NULL, DB_BTREE, 0, 0666);
const int flags = cli_args->blackhole ? DB_BLACKHOLE : 0;
r = db->open(db, null_txn, name, NULL, DB_BTREE, flags, 0666);
CKERR(r);
db_res[i] = db;
}
......@@ -1593,6 +1603,7 @@ static const struct env_args DEFAULT_ENV_ARGS = {
.checkpointing_period = 10,
.cleaner_period = 1,
.cleaner_iterations = 1,
.lk_max_memory = 1 * 1024 * 1024,
.cachetable_size = 300000,
.envdir = ENVDIR,
.update_function = update_op_callback,
......@@ -1606,6 +1617,7 @@ static const struct env_args DEFAULT_PERF_ENV_ARGS = {
.checkpointing_period = 60,
.cleaner_period = 1,
.cleaner_iterations = 5,
.lk_max_memory = 1L * 1024 * 1024 * 1024,
.cachetable_size = 1<<30,
.envdir = ENVDIR,
.update_function = NULL,
......@@ -1645,6 +1657,8 @@ static struct cli_args UU() get_default_args(void) {
.env_args = DEFAULT_ENV_ARGS,
.single_txn = false,
.warm_cache = false,
.blackhole = false,
.prelocked_write = false
};
return DEFAULT_ARGS;
}
......@@ -2016,6 +2030,8 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
BOOL_ARG("only_stress", only_stress),
BOOL_ARG("test", do_test_and_crash),
BOOL_ARG("recover", do_recover),
BOOL_ARG("blackhole", blackhole),
BOOL_ARG("prelocked_write", prelocked_write),
STRING_ARG("--envdir", env_args.envdir),
LOCAL_STRING_ARG("--perf_format", perf_format_s, "human"),
......@@ -2167,7 +2183,7 @@ UU() stress_test_main_with_cmp(struct cli_args *args, int (*bt_compare)(DB *, co
dbs,
args->num_DBs,
bt_compare,
args->env_args
args
);
{ int chk_r = fill_tables_with_zeroes(dbs, args->num_DBs, args->num_elements, args->key_size, args->val_size); CKERR(chk_r); }
{ int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
......@@ -2177,7 +2193,7 @@ UU() stress_test_main_with_cmp(struct cli_args *args, int (*bt_compare)(DB *, co
dbs,
args->num_DBs,
bt_compare,
args->env_args); CKERR(chk_r); }
args); CKERR(chk_r); }
if (args->warm_cache) {
do_warm_cache(env, dbs, args);
}
......@@ -2201,7 +2217,7 @@ UU() stress_recover(struct cli_args *args) {
dbs,
args->num_DBs,
stress_int_dbt_cmp,
args->env_args); CKERR(chk_r); }
args); CKERR(chk_r); }
DB_TXN* txn = NULL;
struct arg recover_args;
......
......@@ -406,20 +406,18 @@ static int toku_env_txn_checkpoint(DB_ENV * env, uint32_t kbyte, uint32_t min, u
static int
db_use_builtin_key_cmp(DB *db) {
HANDLE_PANICKED_DB(db);
int r;
int r = 0;
if (db_opened(db))
r = toku_ydb_do_error(db->dbenv, EINVAL, "Comparison functions cannot be set after DB open.\n");
else if (db->i->key_compare_was_set)
r = toku_ydb_do_error(db->dbenv, EINVAL, "Key comparison function already set.\n");
else {
uint32_t tflags;
r = toku_ft_get_flags(db->i->ft_handle, &tflags);
if (r!=0) return r;
toku_ft_get_flags(db->i->ft_handle, &tflags);
tflags |= TOKU_DB_KEYCMP_BUILTIN;
r = toku_ft_set_flags(db->i->ft_handle, tflags);
if (!r)
db->i->key_compare_was_set = true;
toku_ft_set_flags(db->i->ft_handle, tflags);
db->i->key_compare_was_set = true;
}
return r;
}
......
......@@ -234,19 +234,19 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP
int is_db_hot_index = flags & DB_IS_HOT_INDEX; unused_flags&=~DB_IS_HOT_INDEX;
//We support READ_UNCOMMITTED and READ_COMMITTED whether or not the flag is provided.
unused_flags&=~DB_READ_UNCOMMITTED;
unused_flags&=~DB_READ_COMMITTED;
unused_flags&=~DB_SERIALIZABLE;
if (unused_flags & ~DB_THREAD) return EINVAL; // unknown flags
unused_flags&=~DB_READ_UNCOMMITTED;
unused_flags&=~DB_READ_COMMITTED;
unused_flags&=~DB_SERIALIZABLE;
// DB_THREAD is implicitly supported and DB_BLACKHOLE is supported at the ft-layer
unused_flags &= ~DB_THREAD;
unused_flags &= ~DB_BLACKHOLE;
// check for unknown or conflicting flags
if (unused_flags) return EINVAL; // unknown flags
if (is_db_excl && !is_db_create) return EINVAL;
if (dbtype==DB_UNKNOWN && is_db_excl) return EINVAL;
/* tokudb supports no duplicates and sorted duplicates only */
unsigned int tflags;
r = toku_ft_get_flags(db->i->ft_handle, &tflags);
if (r != 0)
return r;
if (db_opened(db)) {
// it was already open
return EINVAL;
......@@ -384,17 +384,12 @@ db_open_iname(DB * db, DB_TXN * txn, const char *iname_in_env, uint32_t flags, i
flags&=~DB_SERIALIZABLE;
flags&=~DB_IS_HOT_INDEX;
// unknown or conflicting flags are bad
if ((flags & ~DB_THREAD) || (is_db_excl && !is_db_create)) {
int unknown_flags = flags & ~DB_THREAD;
unknown_flags &= ~DB_BLACKHOLE;
if (unknown_flags || (is_db_excl && !is_db_create)) {
return EINVAL;
}
/* tokudb supports no duplicates and sorted duplicates only */
unsigned int tflags;
r = toku_ft_get_flags(db->i->ft_handle, &tflags);
if (r != 0) {
return r;
}
if (db_opened(db)) {
return EINVAL; /* It was already open. */
}
......@@ -411,6 +406,12 @@ db_open_iname(DB * db, DB_TXN * txn, const char *iname_in_env, uint32_t flags, i
goto error_cleanup;
}
// if the dictionary was opened as a blackhole, mark the
// fractal tree as blackhole too.
if (flags & DB_BLACKHOLE) {
toku_ft_set_blackhole(ft_handle);
}
db->i->opened = 1;
// now that the handle has successfully opened, a valid descriptor
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment