Commit 8f9d5d86 authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

[t:5113] carefully document a non-obvious contract requirement for change...

[t:5113] carefully document a non-obvious contract requirement for change descriptor. clients may ONLY update a FT's cmp descriptor if it's the first operation done after open, before any reads or writes. further, remove the test - it's user error.


git-svn-id: file:///svn/toku/tokudb@44804 c7de825b-a66e-492c-adef-691d508d4ae1
parent c441dd7f
......@@ -30,11 +30,16 @@
typedef int(*FT_GET_CALLBACK_FUNCTION)(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, void *extra, bool lock_only);
int toku_open_ft_handle (const char *fname, int is_create, FT_HANDLE *, int nodesize, int basementnodesize, enum toku_compression_method compression_method, CACHETABLE, TOKUTXN, int(*)(DB *,const DBT*,const DBT*)) __attribute__ ((warn_unused_result));
int toku_ft_change_descriptor(FT_HANDLE t, const DBT* old_descriptor, const DBT* new_descriptor, BOOL do_log, TOKUTXN txn, BOOL update_cmp_descriptor);
// See the ft-ops.c file for what this toku_redirect_ft does
// effect: changes the descriptor for the ft of the given handle.
// requires:
// - cannot change descriptor for same ft in two threads in parallel.
// - can only update cmp descriptor immidiately after opening the FIRST ft handle for this ft and before
// ANY operations. to update the cmp descriptor after any operations have already happened, all handles
// and transactions must close and reopen before the change, then you can update the cmp descriptor
int toku_ft_change_descriptor(FT_HANDLE t, const DBT* old_descriptor, const DBT* new_descriptor, BOOL do_log, TOKUTXN txn, BOOL update_cmp_descriptor);
u_int32_t toku_serialize_descriptor_size(const DESCRIPTOR desc);
int toku_ft_handle_create(FT_HANDLE *) __attribute__ ((warn_unused_result));
int toku_ft_set_flags(FT_HANDLE, unsigned int flags) __attribute__ ((warn_unused_result));
int toku_ft_get_flags(FT_HANDLE, unsigned int *flags) __attribute__ ((warn_unused_result));
......
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
// test that inserts a bunch of rows, induces background flushes, and randomly
// changes descriptor every so often. if background threads don't synchronize
// with the descriptor change, there could be a data race on the descriptor's
// pointer value in the FT.
#include "test.h"
#include <portability/toku_pthread.h>
const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE;
static DB *db;
static DB_ENV *env;
static int desc_magic;
static toku_pthread_rwlock_t rwlock;
static int
int_cmp(DB *cmpdb, const DBT *a, const DBT *b) {
assert(a);
assert(b);
assert(a->size == sizeof(int));
assert(b->size == sizeof(int));
assert(cmpdb->cmp_descriptor->dbt.size == sizeof(int));
int magic = *(int *) cmpdb->cmp_descriptor->dbt.data;
toku_pthread_rwlock_rdlock(&rwlock);
if (magic != desc_magic) {
printf("got magic %d, wanted desc_magic %d\n", magic, desc_magic);
}
assert(magic == desc_magic);
toku_pthread_rwlock_rdunlock(&rwlock);
int x = *(int *) a->data;
int y = *(int *) b->data;
if (x<y) return -1;
if (x>y) return 1;
return 0;
}
static void setup (void) {
int r;
{ int chk_r = system("rm -rf " ENVDIR); CKERR(chk_r); }
{ int chk_r = toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(chk_r); }
{ int chk_r = db_env_create(&env, 0); CKERR(chk_r); }
env->set_errfile(env, stderr);
r = env->set_default_bt_compare(env, int_cmp); CKERR(r);
//r = env->set_cachesize(env, 0, 50000, 1); CKERR(r);
{ int chk_r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(chk_r); }
{ int chk_r = db_create(&db, env, 0); CKERR(chk_r); }
//r = db->set_pagesize(db, 2048); CKERR(r);
//r = db->set_readpagesize(db, 1024); CKERR(r);
IN_TXN_COMMIT(env, NULL, txn, 0, {
{ int chk_r = db->open(db, txn, "foo.db", NULL, DB_BTREE, DB_CREATE, 0666); CKERR(chk_r); }
});
}
static void
cleanup(void) {
{ int chk_r = db->close(db, 0); CKERR(chk_r); }
{ int chk_r = env->close(env, 0); CKERR(chk_r); }
}
static void
next_descriptor(void) {
toku_pthread_rwlock_wrlock(&rwlock);
IN_TXN_COMMIT(env, NULL, txn, 0, {
// get a new magic value
desc_magic++;
DBT desc_dbt;
dbt_init(&desc_dbt, &desc_magic, sizeof(int));
{ int chk_r = db->change_descriptor(db, txn, &desc_dbt, DB_UPDATE_CMP_DESCRIPTOR); CKERR(chk_r); }
});
toku_pthread_rwlock_wrunlock(&rwlock);
}
static void
insert_change_descriptor_stress(void) {
const int num_changes = 1000;
const int inserts_per_change = 1000;
const int valsize = 200 - sizeof(int);
// bigger rows cause more flushes
DBT key, value;
char valbuf[valsize];
memset(valbuf, 0, valsize);
dbt_init(&value, valbuf, valsize);
// do a bunch of inserts before changing the descriptor
// the idea is that the inserts will cause flusher threads
// which might race with the descriptor change. there's no
// contract violation because we're not inserting and changing
// the descriptor in parallel.
for (int i = 0; i < num_changes; i++) {
next_descriptor();
IN_TXN_COMMIT(env, NULL, txn, 0, {
for (int j = 0; j < inserts_per_change; j++) {
int k = random64();
dbt_init(&key, &k, sizeof(int));
int r = db->put(db, txn, &key, &value, 0); CKERR(r);
}
});
}
}
int
test_main(int argc, char * const argv[]) {
parse_args(argc, argv);
setup();
insert_change_descriptor_stress();
cleanup();
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment