Commit 6ee0cf44 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

closes #5978, merge to main

git-svn-id: file:///svn/toku/tokudb@53062 c7de825b-a66e-492c-adef-691d508d4ae1
parent fba83125
......@@ -150,6 +150,11 @@ struct ctpair {
// protected by PAIR->mutex
uint32_t count; // clock count
uint32_t refcount; // if > 0, then this PAIR is referenced by
// callers to the cachetable, and therefore cannot
// be evicted
uint32_t num_waiting_on_refs; // number of threads waiting on refcount to go to zero
toku_cond_t refcount_wait; // cond used to wait for refcount to go to zero
// locks
toku::frwlock value_rwlock;
......
......@@ -86,7 +86,9 @@ static PAIR_ATTR const zero_attr = {
static inline void ctpair_destroy(PAIR p) {
p->value_rwlock.deinit();
paranoid_invariant(p->refcount == 0);
nb_mutex_destroy(&p->disk_nb_mutex);
toku_cond_destroy(&p->refcount_wait);
toku_free(p);
}
......@@ -98,6 +100,30 @@ static inline void pair_unlock(PAIR p) {
toku_mutex_unlock(p->mutex);
}
// adds a reference to the PAIR
// on input and output, PAIR mutex is held
static void pair_add_ref_unlocked(PAIR p) {
p->refcount++;
}
// releases a reference to the PAIR
// on input and output, PAIR mutex is held
static void pair_release_ref_unlocked(PAIR p) {
paranoid_invariant(p->refcount > 0);
p->refcount--;
if (p->refcount == 0 && p->num_waiting_on_refs > 0) {
toku_cond_broadcast(&p->refcount_wait);
}
}
static void pair_wait_for_ref_release_unlocked(PAIR p) {
p->num_waiting_on_refs++;
while (p->refcount > 0) {
toku_cond_wait(&p->refcount_wait, p->mutex);
}
p->num_waiting_on_refs--;
}
bool toku_ctpair_is_write_locked(PAIR pair) {
return pair->value_rwlock.writers() == 1;
}
......@@ -573,7 +599,7 @@ static void cachetable_maybe_remove_and_free_pair (
)
{
// this ensures that a clone running in the background first completes
if (p->value_rwlock.users() == 0) {
if (p->value_rwlock.users() == 0 && p->refcount == 0) {
// assumption is that if we are about to remove the pair
// that no one has grabbed the disk_nb_mutex,
// and that there is no cloned_value_data, because
......@@ -760,6 +786,9 @@ void pair_init(PAIR p,
p->write_extraargs = write_callback.write_extraargs;
p->count = 0; // <CER> Is zero the correct init value?
p->refcount = 0;
p->num_waiting_on_refs = 0;
toku_cond_init(&p->refcount_wait, NULL);
p->checkpoint_pending = false;
p->mutex = list->get_mutex_for_pair(fullhash);
......@@ -1851,7 +1880,6 @@ int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key,
//
static int
cachetable_unpin_internal(
PAIR locked_p,
CACHEFILE cachefile,
PAIR p,
enum cachetable_dirty dirty,
......@@ -1865,9 +1893,7 @@ cachetable_unpin_internal(
bool added_data_to_cachetable = false;
// hack for #3969, only exists in case where we run unlockers
if (!locked_p || locked_p->mutex != p->mutex) {
pair_lock(p);
}
pair_lock(p);
PAIR_ATTR old_attr = p->attr;
PAIR_ATTR new_attr = attr;
if (dirty) {
......@@ -1878,9 +1904,7 @@ cachetable_unpin_internal(
}
bool read_lock_grabbed = p->value_rwlock.readers() != 0;
unpin_pair(p, read_lock_grabbed);
if (!locked_p || locked_p->mutex != p->mutex) {
pair_unlock(p);
}
pair_unlock(p);
if (attr.is_valid) {
if (new_attr.size > old_attr.size) {
......@@ -1902,18 +1926,18 @@ cachetable_unpin_internal(
}
int toku_cachetable_unpin(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) {
return cachetable_unpin_internal(NULL, cachefile, p, dirty, attr, true);
return cachetable_unpin_internal(cachefile, p, dirty, attr, true);
}
int toku_cachetable_unpin_ct_prelocked_no_flush(PAIR locked_p, CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) {
return cachetable_unpin_internal(locked_p, cachefile, p, dirty, attr, false);
int toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, PAIR p, enum cachetable_dirty dirty, PAIR_ATTR attr) {
return cachetable_unpin_internal(cachefile, p, dirty, attr, false);
}
static void
run_unlockers (PAIR p, UNLOCKERS unlockers) {
run_unlockers (UNLOCKERS unlockers) {
while (unlockers) {
assert(unlockers->locked);
unlockers->locked = false;
unlockers->f(p, unlockers->extra);
unlockers->f(unlockers->extra);
unlockers=unlockers->next;
}
}
......@@ -1944,19 +1968,27 @@ maybe_pin_pair(
// run the unlockers, as we intend to return the value to the user
if (lock_type == PL_READ) {
if (p->value_rwlock.read_lock_is_expensive()) {
run_unlockers(p, unlockers);
pair_add_ref_unlocked(p);
pair_unlock(p);
run_unlockers(unlockers);
retval = TOKUDB_TRY_AGAIN;
pair_lock(p);
pair_release_ref_unlocked(p);
}
p->value_rwlock.read_lock();
}
else if (lock_type == PL_WRITE_EXPENSIVE || lock_type == PL_WRITE_CHEAP){
if (p->value_rwlock.write_lock_is_expensive()) {
run_unlockers(p, unlockers);
pair_add_ref_unlocked(p);
pair_unlock(p);
run_unlockers(unlockers);
// change expensive to false because
// we will unpin the pair immedietely
// after pinning it
expensive = false;
retval = TOKUDB_TRY_AGAIN;
pair_lock(p);
pair_release_ref_unlocked(p);
}
p->value_rwlock.write_lock(expensive);
}
......@@ -2036,7 +2068,7 @@ try_again:
// will not block.
p->value_rwlock.write_lock(true);
pair_unlock(p);
run_unlockers(NULL, unlockers); // we hold the write list_lock.
run_unlockers(unlockers); // we hold the write list_lock.
ct->list.write_list_unlock();
// at this point, only the pair is pinned,
......@@ -2075,7 +2107,7 @@ try_again:
// still check for partial fetch
bool partial_fetch_required = pf_req_callback(p->value_data,read_extraargs);
if (partial_fetch_required) {
run_unlockers(NULL, unlockers);
run_unlockers(unlockers);
// we are now getting an expensive write lock, because we
// are doing a partial fetch. So, if we previously have
......@@ -2302,6 +2334,8 @@ void toku_cachetable_verify (CACHETABLE ct) {
ct->list.verify();
}
struct pair_flush_for_close{
PAIR p;
BACKGROUND_JOB_MANAGER bjm;
......@@ -2406,6 +2440,7 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
assert(!p->cloned_value_data);
assert(p->dirty == CACHETABLE_CLEAN);
assert(p->refcount == 0);
// TODO: maybe break up this function
// so that write lock does not need to be held for entire
// free
......@@ -2481,7 +2516,7 @@ int toku_test_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, uint32_t fullh
int toku_test_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE cachefile, CACHEKEY key, uint32_t fullhash, enum cachetable_dirty dirty, PAIR_ATTR attr) {
// We hold the cachetable mutex.
PAIR p = test_get_pair(cachefile, key, fullhash, true);
return toku_cachetable_unpin_ct_prelocked_no_flush(NULL, cachefile, p, dirty, attr);
return toku_cachetable_unpin_ct_prelocked_no_flush(cachefile, p, dirty, attr);
}
//test-only wrapper
......@@ -2597,11 +2632,15 @@ int toku_cachetable_unpin_and_remove (
//
cachetable_remove_pair(&ct->list, &ct->ev, p);
ct->list.write_list_unlock();
if (p->refcount > 0) {
pair_wait_for_ref_release_unlocked(p);
}
if (p->value_rwlock.users() > 0) {
// Need to wait for everyone else to leave
// This write lock will be granted only after all waiting
// threads are done.
p->value_rwlock.write_lock(true);
assert(p->refcount == 0);
assert(p->value_rwlock.users() == 1); // us
assert(!p->checkpoint_pending);
assert(p->attr.cache_pressure_size == 0);
......@@ -3745,7 +3784,13 @@ bool evictor::run_eviction_on_pair(PAIR curr_in_clock) {
goto exit;
}
pair_lock(curr_in_clock);
// these are the circumstances under which we don't run eviction on a pair:
// - if other users are waiting on the lock
// - if the PAIR is referenced by users
// - if the PAIR's disk_nb_mutex is in use, implying that it is
// undergoing a checkpoint
if (curr_in_clock->value_rwlock.users() ||
curr_in_clock->refcount > 0 ||
nb_mutex_users(&curr_in_clock->disk_nb_mutex))
{
pair_unlock(curr_in_clock);
......
......@@ -326,7 +326,7 @@ void toku_cachetable_pf_pinned_pair(
struct unlockers {
bool locked;
void (*f)(PAIR p, void* extra);
void (*f)(void* extra);
void *extra;
UNLOCKERS next;
};
......@@ -385,7 +385,7 @@ int toku_cachetable_unpin(CACHEFILE, PAIR, enum cachetable_dirty dirty, PAIR_ATT
// Returns: 0 if success, otherwise returns an error number.
// Requires: The ct is locked.
int toku_cachetable_unpin_ct_prelocked_no_flush(PAIR, CACHEFILE, PAIR, enum cachetable_dirty dirty, PAIR_ATTR size);
int toku_cachetable_unpin_ct_prelocked_no_flush(CACHEFILE, PAIR, enum cachetable_dirty dirty, PAIR_ATTR size);
// Effect: The same as tokud_cachetable_unpin, except that the ct must not be locked.
// Requires: The ct is NOT locked.
......
......@@ -4789,14 +4789,13 @@ struct unlock_ftnode_extra {
};
// When this is called, the cachetable lock is held
static void
unlock_ftnode_fun (PAIR p, void *v) {
unlock_ftnode_fun (void *v) {
struct unlock_ftnode_extra *x = NULL;
CAST_FROM_VOIDP(x, v);
FT_HANDLE brt = x->ft_handle;
FTNODE node = x->node;
// CT lock is held
int r = toku_cachetable_unpin_ct_prelocked_no_flush(
p,
brt->ft->cf,
node->ct_pair,
(enum cachetable_dirty) node->dirty,
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: cachetable-simple-verify.cc 52748 2013-01-31 21:12:42Z leifwalsh $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#include "test.h"
//
// This test verifies that if a node is pinned by a thread
// doing get_and_pin_nonblocking while another thread is trying
// to unpin_and_remove it, that nothing bad happens.
//
CACHEFILE f1;
PAIR p1;
PAIR p2;
static int
fetch_one(CACHEFILE f __attribute__((__unused__)),
PAIR UU(p),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
uint32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
void **dd __attribute__((__unused__)),
PAIR_ATTR *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = make_pair_attr(8);
assert(k.b == 1);
p1 = p;
return 0;
}
static int
fetch_two (CACHEFILE f __attribute__((__unused__)),
PAIR UU(p),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
uint32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
void **dd __attribute__((__unused__)),
PAIR_ATTR *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = make_pair_attr(8);
assert(k.b == 2);
p2 = p;
return 0;
}
toku_pthread_t unpin_and_remove_tid;
static void *unpin_and_remove_one(void *UU(arg)) {
int r = toku_cachetable_unpin_and_remove(
f1,
p1,
NULL,
NULL
);
assert_zero(r);
return arg;
}
static void
unpin_two (void* UU(v)) {
int r = toku_cachetable_unpin_ct_prelocked_no_flush(
f1,
p2,
CACHETABLE_DIRTY,
make_pair_attr(8)
);
assert_zero(r);
// at this point, we have p1 pinned, want to start a thread to do an unpin_and_remove
// on p1
r = toku_pthread_create(
&unpin_and_remove_tid,
NULL,
unpin_and_remove_one,
NULL
);
assert_zero(r);
// sleep to give a chance for the unpin_and_remove to get going
usleep(512*1024);
}
static void *repin_one(void *UU(arg)) {
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
struct unlockers unlockers = {true, unpin_two, NULL, NULL};
void* v1;
long s1;
int r = toku_cachetable_get_and_pin_nonblocking(
f1,
make_blocknum(1),
1,
&v1,
&s1,
wc,
def_fetch,
def_pf_req_callback,
def_pf_callback,
PL_WRITE_EXPENSIVE,
NULL,
&unlockers
);
assert(r == TOKUDB_TRY_AGAIN);
return arg;
}
static void
cachetable_test (void) {
const int test_limit = 1000;
int r;
toku_pair_list_set_lock_size(2); // set two bucket mutexes
CACHETABLE ct;
toku_cachetable_create(&ct, test_limit, ZERO_LSN, NULL_LOGGER);
const char *fname1 = TOKU_TEST_FILENAME;
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
long s1;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
// bring pairs 1 and 2 into memory, then unpin
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, fetch_one, def_pf_req_callback, def_pf_callback, true, NULL);
assert_zero(r);
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v1, &s1, wc, fetch_two, def_pf_req_callback, def_pf_callback, true, NULL);
assert_zero(r);
toku_pthread_t tid1;
r = toku_pthread_create(&tid1, NULL, repin_one, NULL);
assert_zero(r);
void *ret;
r = toku_pthread_join(tid1, &ret);
assert_zero(r);
r = toku_pthread_join(unpin_and_remove_tid, &ret);
assert_zero(r);
toku_cachetable_verify(ct);
toku_cachefile_close(&f1, false, ZERO_LSN);
toku_cachetable_close(&ct);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
// test ought to run bunch of times in hope of hitting bug
uint32_t num_test_runs = 1;
for (uint32_t i = 0; i < num_test_runs; i++) {
if (verbose) {
printf("starting test run %" PRIu32 " \n", i);
}
cachetable_test();
}
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id: cachetable-simple-verify.cc 52748 2013-01-31 21:12:42Z leifwalsh $"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#include "test.h"
//
// This test verifies the behavior that originally caused
// #5978 is fixed. Here is what we do. We have four pairs with
// blocknums and fullhashes of 1,2,3,4. The cachetable has only
// two bucket mutexes, so 1 and 3 share a pair mutex, as do 2 and 4.
// We pin all four with expensive write locks. Then, on backgroud threads,
// we call get_and_pin_nonblocking on 3, where the unlockers unpins 2, and
// we call get_and_pin_nonblocking on 4, where the unlockers unpins 1. Run this
// enough times, and we should see a deadlock before the fix, and no deadlock
// after the fix.
//
CACHEFILE f1;
PAIR p3;
PAIR p4;
static int
fetch_three (CACHEFILE f __attribute__((__unused__)),
PAIR UU(p),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
uint32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
void **dd __attribute__((__unused__)),
PAIR_ATTR *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = make_pair_attr(8);
assert(k.b == 3);
p3 = p;
return 0;
}
static int
fetch_four (CACHEFILE f __attribute__((__unused__)),
PAIR UU(p),
int UU(fd),
CACHEKEY k __attribute__((__unused__)),
uint32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
void **dd __attribute__((__unused__)),
PAIR_ATTR *sizep __attribute__((__unused__)),
int *dirtyp,
void *extraargs __attribute__((__unused__))
) {
*dirtyp = 0;
*value = NULL;
*sizep = make_pair_attr(8);
assert(k.b == 4);
p4 = p;
return 0;
}
static void
unpin_four (void* UU(v)) {
int r = toku_cachetable_unpin_ct_prelocked_no_flush(
f1,
p3,
CACHETABLE_DIRTY,
make_pair_attr(8)
);
assert_zero(r);
}
static void
unpin_three (void* UU(v)) {
int r = toku_cachetable_unpin_ct_prelocked_no_flush(
f1,
p4,
CACHETABLE_DIRTY,
make_pair_attr(8)
);
assert_zero(r);
}
static void *repin_one(void *UU(arg)) {
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
struct unlockers unlockers = {true, unpin_four, NULL, NULL};
void* v1;
long s1;
int r = toku_cachetable_get_and_pin_nonblocking(
f1,
make_blocknum(1),
1,
&v1,
&s1,
wc,
def_fetch,
def_pf_req_callback,
def_pf_callback,
PL_WRITE_EXPENSIVE,
NULL,
&unlockers
);
assert(r == TOKUDB_TRY_AGAIN);
return arg;
}
static void *repin_two(void *UU(arg)) {
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
struct unlockers unlockers = {true, unpin_three, NULL, NULL};
void* v1;
long s1;
int r = toku_cachetable_get_and_pin_nonblocking(
f1,
make_blocknum(2),
2,
&v1,
&s1,
wc,
def_fetch,
def_pf_req_callback,
def_pf_callback,
PL_WRITE_EXPENSIVE,
NULL,
&unlockers
);
assert(r == TOKUDB_TRY_AGAIN);
return arg;
}
static void
cachetable_test (void) {
const int test_limit = 1000;
int r;
toku_pair_list_set_lock_size(2); // set two bucket mutexes
CACHETABLE ct;
toku_cachetable_create(&ct, test_limit, ZERO_LSN, NULL_LOGGER);
const char *fname1 = TOKU_TEST_FILENAME;
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
void* v1;
long s1;
CACHETABLE_WRITE_CALLBACK wc = def_write_callback(NULL);
// bring pairs 1 and 2 into memory, then unpin
r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
assert_zero(r);
r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v1, &s1, wc, def_fetch, def_pf_req_callback, def_pf_callback, true, NULL);
assert_zero(r);
// now pin pairs 3 and 4
r = toku_cachetable_get_and_pin(f1, make_blocknum(3), 3, &v1, &s1, wc, fetch_three, def_pf_req_callback, def_pf_callback, true, NULL);
assert_zero(r);
r = toku_cachetable_get_and_pin(f1, make_blocknum(4), 4, &v1, &s1, wc, fetch_four, def_pf_req_callback, def_pf_callback, true, NULL);
assert_zero(r);
toku_pthread_t tid1;
toku_pthread_t tid2;
r = toku_pthread_create(&tid1, NULL, repin_one, NULL);
assert_zero(r);
r = toku_pthread_create(&tid2, NULL, repin_two, NULL);
assert_zero(r);
// unpin 1 and 2 so tid1 and tid2 can make progress
usleep(512*1024);
r = toku_test_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, make_pair_attr(8));
assert_zero(r);
r = toku_test_cachetable_unpin(f1, make_blocknum(2), 2, CACHETABLE_DIRTY, make_pair_attr(8));
assert_zero(r);
void *ret;
r = toku_pthread_join(tid1, &ret);
assert_zero(r);
r = toku_pthread_join(tid2, &ret);
assert_zero(r);
toku_cachetable_verify(ct);
toku_cachefile_close(&f1, false, ZERO_LSN);
toku_cachetable_close(&ct);
}
int
test_main(int argc, const char *argv[]) {
default_parse_args(argc, argv);
// test ought to run bunch of times in hope of hitting bug
uint32_t num_test_runs = 30;
for (uint32_t i = 0; i < num_test_runs; i++) {
if (verbose) {
printf("starting test run %" PRIu32 " \n", i);
}
cachetable_test();
}
return 0;
}
......@@ -27,7 +27,7 @@ static void kibbutz_work(void *fe_v)
}
static void
unlock_dummy (PAIR UU(p), void* UU(v)) {
unlock_dummy (void* UU(v)) {
}
static void reset_unlockers(UNLOCKERS unlockers) {
......
......@@ -35,7 +35,7 @@ static void kibbutz_work(void *fe_v)
}
static void
unlock_dummy (PAIR UU(p), void* UU(v)) {
unlock_dummy (void* UU(v)) {
}
static void reset_unlockers(UNLOCKERS unlockers) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment