Commit a53f3540 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

refs #5241, make completion queues in cachetable unnecessary

git-svn-id: file:///svn/toku/tokudb@45639 c7de825b-a66e-492c-adef-691d508d4ae1
parent c88f5f1a
...@@ -2724,56 +2724,25 @@ int toku_cachetable_unpin_and_remove ( ...@@ -2724,56 +2724,25 @@ int toku_cachetable_unpin_and_remove (
// we don't want any NEW threads to try to grab the PAIR // we don't want any NEW threads to try to grab the PAIR
// lock. // lock.
// //
// Because we call cachetable_remove_pair and setup a completion queue, // Because we call cachetable_remove_pair and wait,
// the threads that may be waiting // the threads that may be waiting
// on this PAIR lock must be careful to do NOTHING with the PAIR because // on this PAIR lock must be careful to do NOTHING with the PAIR
// it notices a completion queue. As per our analysis above, we only need // As per our analysis above, we only need
// to make sure the checkpoint thread and get_and_pin_nonblocking do // to make sure the checkpoint thread and get_and_pin_nonblocking do
// nothing, and looking at those functions, it is clear they do nothing. // nothing, and looking at those functions, it is clear they do nothing.
// //
cachetable_remove_pair(ct, p); cachetable_remove_pair(ct, p);
if (nb_mutex_blocked_writers(&p->value_nb_mutex)>0) { if (nb_mutex_blocked_writers(&p->value_nb_mutex)>0) {
struct workqueue cq; toku_cond_t cond;
workqueue_init(&cq); toku_cond_init(&cond, NULL);
while (nb_mutex_blocked_writers(&p->value_nb_mutex)>0) { nb_mutex_wait_for_users(
//Someone (one or more checkpoint threads) is waiting for a write lock &p->value_nb_mutex,
//on this pair. ct->mutex,
//They are still blocked because we have not released the &cond
//cachetable lock. );
//If we freed the memory for the pair we would have dangling toku_cond_destroy(&cond);
//pointers. We need to let the other threads finish up with assert(!p->checkpoint_pending);
//this pair. assert(p->attr.cache_pressure_size == 0);
p->cq = &cq;
// If anyone is waiting on write lock, let them finish.
cachetable_unlock(ct);
WORKITEM wi = NULL;
r = workqueue_deq(&cq, &wi, 1);
//Writer is now done.
assert(r == 0);
PAIR pp = (PAIR) workitem_arg(wi);
assert(pp == p);
//We are holding the write lock on the pair
cachetable_lock(ct);
assert(nb_mutex_writers(&p->value_nb_mutex) == 1);
// let's also assert that this PAIR was not somehow marked
// as pending a checkpoint. Above, when calling
// remove_key(), we cleared the dirty bit so that
// this PAIR cannot be marked for checkpoint, so let's
// make sure that our assumption is valid.
assert(!p->checkpoint_pending);
assert(p->attr.cache_pressure_size == 0);
nb_mutex_unlock(&p->value_nb_mutex);
// Because we assume it is just the checkpoint thread
// that may have been blocked (as argued above),
// it is safe to simply remove the PAIR from the
// cachetable. We don't need to write anything out.
}
p->cq = NULL;
workqueue_destroy(&cq);
} }
// just a sanity check // just a sanity check
assert(nb_mutex_users(&p->disk_nb_mutex) == 0); assert(nb_mutex_users(&p->disk_nb_mutex) == 0);
......
...@@ -56,6 +56,15 @@ static inline void nb_mutex_unlock(NB_MUTEX nb_mutex) { ...@@ -56,6 +56,15 @@ static inline void nb_mutex_unlock(NB_MUTEX nb_mutex) {
rwlock_write_unlock(&nb_mutex->lock); rwlock_write_unlock(&nb_mutex->lock);
} }
static inline void nb_mutex_wait_for_users(
NB_MUTEX nb_mutex,
toku_mutex_t *mutex,
toku_cond_t* cond
)
{
rwlock_wait_for_users(&nb_mutex->lock, mutex, cond);
}
// returns: the number of writers who are waiting for the lock // returns: the number of writers who are waiting for the lock
static inline int nb_mutex_blocked_writers(NB_MUTEX nb_mutex) { static inline int nb_mutex_blocked_writers(NB_MUTEX nb_mutex) {
......
...@@ -36,8 +36,16 @@ struct rwlock { ...@@ -36,8 +36,16 @@ struct rwlock {
int writer; // the number of writers int writer; // the number of writers
int want_write; // the number of blocked writers int want_write; // the number of blocked writers
toku_cond_t wait_write; toku_cond_t wait_write;
toku_cond_t* wait_users_go_to_zero;
}; };
// returns: the sum of the number of readers, pending readers, writers, and
// pending writers
static inline int rwlock_users(RWLOCK rwlock) {
return rwlock->reader + rwlock->want_read + rwlock->writer + rwlock->want_write;
}
// initialize a read write lock // initialize a read write lock
static __attribute__((__unused__)) static __attribute__((__unused__))
...@@ -47,6 +55,7 @@ rwlock_init(RWLOCK rwlock) { ...@@ -47,6 +55,7 @@ rwlock_init(RWLOCK rwlock) {
toku_cond_init(&rwlock->wait_read, 0); toku_cond_init(&rwlock->wait_read, 0);
rwlock->writer = rwlock->want_write = 0; rwlock->writer = rwlock->want_write = 0;
toku_cond_init(&rwlock->wait_write, 0); toku_cond_init(&rwlock->wait_write, 0);
rwlock->wait_users_go_to_zero = NULL;
} }
// destroy a read write lock // destroy a read write lock
...@@ -64,6 +73,7 @@ rwlock_destroy(RWLOCK rwlock) { ...@@ -64,6 +73,7 @@ rwlock_destroy(RWLOCK rwlock) {
// expects: mutex is locked // expects: mutex is locked
static inline void rwlock_read_lock(RWLOCK rwlock, toku_mutex_t *mutex) { static inline void rwlock_read_lock(RWLOCK rwlock, toku_mutex_t *mutex) {
assert(!rwlock->wait_users_go_to_zero);
if (rwlock->writer || rwlock->want_write) { if (rwlock->writer || rwlock->want_write) {
rwlock->want_read++; rwlock->want_read++;
while (rwlock->writer || rwlock->want_write) { while (rwlock->writer || rwlock->want_write) {
...@@ -84,12 +94,16 @@ static inline void rwlock_read_unlock(RWLOCK rwlock) { ...@@ -84,12 +94,16 @@ static inline void rwlock_read_unlock(RWLOCK rwlock) {
if (rwlock->reader == 0 && rwlock->want_write) { if (rwlock->reader == 0 && rwlock->want_write) {
toku_cond_signal(&rwlock->wait_write); toku_cond_signal(&rwlock->wait_write);
} }
if (rwlock->wait_users_go_to_zero && rwlock_users(rwlock) == 0) {
toku_cond_signal(rwlock->wait_users_go_to_zero);
}
} }
// obtain a write lock // obtain a write lock
// expects: mutex is locked // expects: mutex is locked
static inline void rwlock_write_lock(RWLOCK rwlock, toku_mutex_t *mutex) { static inline void rwlock_write_lock(RWLOCK rwlock, toku_mutex_t *mutex) {
assert(!rwlock->wait_users_go_to_zero);
if (rwlock->reader || rwlock->writer) { if (rwlock->reader || rwlock->writer) {
rwlock->want_write++; rwlock->want_write++;
while (rwlock->reader || rwlock->writer) { while (rwlock->reader || rwlock->writer) {
...@@ -111,6 +125,9 @@ static inline void rwlock_write_unlock(RWLOCK rwlock) { ...@@ -111,6 +125,9 @@ static inline void rwlock_write_unlock(RWLOCK rwlock) {
toku_cond_signal(&rwlock->wait_write); toku_cond_signal(&rwlock->wait_write);
} else if (rwlock->want_read) { } else if (rwlock->want_read) {
toku_cond_broadcast(&rwlock->wait_read); toku_cond_broadcast(&rwlock->wait_read);
}
if (rwlock->wait_users_go_to_zero && rwlock_users(rwlock) == 0) {
toku_cond_signal(rwlock->wait_users_go_to_zero);
} }
} }
...@@ -138,11 +155,18 @@ static inline int rwlock_writers(RWLOCK rwlock) { ...@@ -138,11 +155,18 @@ static inline int rwlock_writers(RWLOCK rwlock) {
return rwlock->writer; return rwlock->writer;
} }
// returns: the sum of the number of readers, pending readers, writers, and static inline void rwlock_wait_for_users(
// pending writers RWLOCK rwlock,
toku_mutex_t *mutex,
static inline int rwlock_users(RWLOCK rwlock) { toku_cond_t* cond
return rwlock->reader + rwlock->want_read + rwlock->writer + rwlock->want_write; )
{
assert(!rwlock->wait_users_go_to_zero);
while (rwlock_users(rwlock) > 0) {
rwlock->wait_users_go_to_zero = cond;
toku_cond_wait(cond, mutex);
}
rwlock->wait_users_go_to_zero = NULL;
} }
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus) || defined(__cilkplusplus)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment