Commit 2d903540 authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland

dlm: merge toss and keep hash table lists into one list

There are several places where lock processing can perform two hash table
lookups, first in the "keep" list, and if not found, in the "toss" list.
This patch introduces a new rsb state flag "RSB_TOSS" to represent the
difference between the state of being on keep vs toss list, so that the
two lists can be combined.  This avoids cases of two lookups.
Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent dcdaad05
......@@ -450,12 +450,20 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
if (seq->op == &format4_seq_ops)
ri->format = 4;
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
tree = &ls->ls_rsbtbl[bucket].r;
spin_lock_bh(&ls->ls_rsbtbl_lock);
if (!RB_EMPTY_ROOT(tree)) {
for (node = rb_first(tree); node; node = rb_next(node)) {
r = rb_entry(node, struct dlm_rsb, res_hashnode);
if (toss) {
if (!rsb_flag(r, RSB_TOSS))
continue;
} else {
if (rsb_flag(r, RSB_TOSS))
continue;
}
if (!entry--) {
dlm_hold_rsb(r);
ri->rsb = r;
......@@ -482,12 +490,20 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
kfree(ri);
return NULL;
}
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
tree = &ls->ls_rsbtbl[bucket].r;
spin_lock_bh(&ls->ls_rsbtbl_lock);
if (!RB_EMPTY_ROOT(tree)) {
node = rb_first(tree);
r = rb_entry(node, struct dlm_rsb, res_hashnode);
if (toss) {
if (!rsb_flag(r, RSB_TOSS))
continue;
} else {
if (rsb_flag(r, RSB_TOSS))
continue;
}
dlm_hold_rsb(r);
ri->rsb = r;
ri->bucket = bucket;
......@@ -548,12 +564,19 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
++*pos;
return NULL;
}
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
tree = &ls->ls_rsbtbl[bucket].r;
spin_lock_bh(&ls->ls_rsbtbl_lock);
if (!RB_EMPTY_ROOT(tree)) {
next = rb_first(tree);
r = rb_entry(next, struct dlm_rsb, res_hashnode);
if (toss) {
if (!rsb_flag(r, RSB_TOSS))
continue;
} else {
if (rsb_flag(r, RSB_TOSS))
continue;
}
dlm_hold_rsb(r);
ri->rsb = r;
ri->bucket = bucket;
......
......@@ -205,12 +205,8 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
bucket = hash & (ls->ls_rsbtbl_size - 1);
spin_lock_bh(&ls->ls_rsbtbl_lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
if (rv)
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
name, len, &r);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].r, name, len, &r);
spin_unlock_bh(&ls->ls_rsbtbl_lock);
if (!rv)
return r;
......
......@@ -103,8 +103,7 @@ do { \
#define DLM_RTF_SHRINK_BIT 0
struct dlm_rsbtable {
struct rb_root keep;
struct rb_root toss;
struct rb_root r;
unsigned long flags;
};
......@@ -376,6 +375,7 @@ enum rsb_flags {
RSB_RECOVER_CONVERT,
RSB_RECOVER_GRANT,
RSB_RECOVER_LVB_INVAL,
RSB_TOSS,
};
static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag)
......
......@@ -616,23 +616,22 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
spin_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
if (error)
goto do_toss;
goto do_new;
/*
* rsb is active, so we can't check master_nodeid without lock_rsb.
*/
if (rsb_flag(r, RSB_TOSS))
goto do_toss;
kref_get(&r->res_ref);
goto out_unlock;
do_toss:
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (error)
goto do_new;
/*
* rsb found inactive (master_nodeid may be out of date unless
* we are the dir_nodeid or were the master) No other thread
......@@ -669,8 +668,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
r->res_first_lkid = 0;
}
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
rsb_clear_flag(r, RSB_TOSS);
goto out_unlock;
......@@ -731,7 +729,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
}
out_add:
error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
out_unlock:
spin_unlock_bh(&ls->ls_rsbtbl_lock);
out:
......@@ -760,8 +758,11 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
spin_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
if (error)
goto do_new;
if (rsb_flag(r, RSB_TOSS))
goto do_toss;
/*
......@@ -773,10 +774,6 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
do_toss:
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (error)
goto do_new;
/*
* rsb found inactive. No other thread is using this rsb because
* it's on the toss list, so we can look at or update
......@@ -804,8 +801,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
r->res_nodeid = 0;
}
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
rsb_clear_flag(r, RSB_TOSS);
goto out_unlock;
......@@ -829,7 +825,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
kref_init(&r->res_ref);
error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
out_unlock:
spin_unlock_bh(&ls->ls_rsbtbl_lock);
out:
......@@ -1049,8 +1045,11 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
return error;
spin_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
if (!error) {
if (rsb_flag(r, RSB_TOSS))
goto do_toss;
/* because the rsb is active, we need to lock_rsb before
* checking/changing re_master_nodeid
*/
......@@ -1067,12 +1066,11 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
put_rsb(r);
return 0;
}
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (error)
} else {
goto not_found;
}
do_toss:
/* because the rsb is inactive (on toss list), it's not refcounted
* and lock_rsb is not used, but is protected by the rsbtbl lock
*/
......@@ -1102,8 +1100,9 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
r->res_nodeid = from_nodeid;
kref_init(&r->res_ref);
r->res_toss_time = jiffies;
rsb_set_flag(r, RSB_TOSS);
error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
if (error) {
/* should never happen */
dlm_free_rsb(r);
......@@ -1127,8 +1126,11 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
spin_lock_bh(&ls->ls_rsbtbl_lock);
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
if (rsb_flag(r, RSB_TOSS))
continue;
if (r->res_hash == hash)
dlm_dump_rsb(r);
}
......@@ -1146,14 +1148,10 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
b = hash & (ls->ls_rsbtbl_size - 1);
spin_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
if (!error)
goto out_dump;
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
if (error)
goto out;
out_dump:
dlm_dump_rsb(r);
out:
spin_unlock_bh(&ls->ls_rsbtbl_lock);
......@@ -1166,8 +1164,8 @@ static void toss_rsb(struct kref *kref)
DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
kref_init(&r->res_ref);
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
WARN_ON(rsb_flag(r, RSB_TOSS));
rsb_set_flag(r, RSB_TOSS);
r->res_toss_time = jiffies;
set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
if (r->res_lvbptr) {
......@@ -1627,9 +1625,11 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
return;
}
for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
for (n = rb_first(&ls->ls_rsbtbl[b].r); n; n = next) {
next = rb_next(n);
r = rb_entry(n, struct dlm_rsb, res_hashnode);
if (!rsb_flag(r, RSB_TOSS))
continue;
/* If we're the directory record for this rsb, and
we're not the master of it, then we need to wait
......@@ -1672,7 +1672,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
continue;
}
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
dlm_free_rsb(r);
}
......@@ -1696,8 +1696,14 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
len = ls->ls_remove_lens[i];
spin_lock_bh(&ls->ls_rsbtbl_lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
if (rv) {
spin_unlock_bh(&ls->ls_rsbtbl_lock);
log_error(ls, "remove_name not found %s", name);
continue;
}
if (!rsb_flag(r, RSB_TOSS)) {
spin_unlock_bh(&ls->ls_rsbtbl_lock);
log_debug(ls, "remove_name not toss %s", name);
continue;
......@@ -1734,7 +1740,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
continue;
}
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
send_remove(r);
spin_unlock_bh(&ls->ls_rsbtbl_lock);
......@@ -4202,17 +4208,16 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
spin_lock_bh(&ls->ls_rsbtbl_lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
if (rv) {
/* verify the rsb is on keep list per comment above */
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
if (rv) {
/* should not happen */
log_error(ls, "receive_remove from %d not found %s",
from_nodeid, name);
spin_unlock_bh(&ls->ls_rsbtbl_lock);
return;
}
/* should not happen */
log_error(ls, "%s from %d not found %s", __func__,
from_nodeid, name);
spin_unlock_bh(&ls->ls_rsbtbl_lock);
return;
}
if (!rsb_flag(r, RSB_TOSS)) {
if (r->res_master_nodeid != from_nodeid) {
/* should not happen */
log_error(ls, "receive_remove keep from %d master %d",
......@@ -4238,7 +4243,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
}
if (kref_put(&r->res_ref, kill_rsb)) {
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
spin_unlock_bh(&ls->ls_rsbtbl_lock);
dlm_free_rsb(r);
} else {
......@@ -5314,8 +5319,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
struct dlm_rsb *r;
spin_lock_bh(&ls->ls_rsbtbl_lock);
for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
for (n = rb_first(&ls->ls_rsbtbl[bucket].r); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
if (rsb_flag(r, RSB_TOSS))
continue;
if (!rsb_flag(r, RSB_RECOVER_GRANT))
continue;
......
......@@ -503,8 +503,7 @@ static int new_lockspace(const char *name, const char *cluster,
if (!ls->ls_rsbtbl)
goto out_lsfree;
for (i = 0; i < size; i++) {
ls->ls_rsbtbl[i].keep.rb_node = NULL;
ls->ls_rsbtbl[i].toss.rb_node = NULL;
ls->ls_rsbtbl[i].r.rb_node = NULL;
}
for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
......@@ -837,15 +836,9 @@ static int release_lockspace(struct dlm_ls *ls, int force)
*/
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
while ((n = rb_first(&ls->ls_rsbtbl[i].r))) {
rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
rb_erase(n, &ls->ls_rsbtbl[i].keep);
dlm_free_rsb(rsb);
}
while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
rb_erase(n, &ls->ls_rsbtbl[i].toss);
rb_erase(n, &ls->ls_rsbtbl[i].r);
dlm_free_rsb(rsb);
}
}
......
......@@ -888,10 +888,13 @@ void dlm_clear_toss(struct dlm_ls *ls)
spin_lock(&ls->ls_rsbtbl_lock);
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = next) {
next = rb_next(n);
r = rb_entry(n, struct dlm_rsb, res_hashnode);
rb_erase(n, &ls->ls_rsbtbl[i].toss);
if (!rsb_flag(r, RSB_TOSS))
continue;
rb_erase(n, &ls->ls_rsbtbl[i].r);
dlm_free_rsb(r);
count++;
}
......
......@@ -35,9 +35,9 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
spin_lock_bh(&ls->ls_rsbtbl_lock);
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
if (r->res_nodeid)
if (rsb_flag(r, RSB_TOSS) || r->res_nodeid)
continue;
list_add(&r->res_masters_list, &ls->ls_masters_list);
......@@ -70,14 +70,14 @@ static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
spin_lock_bh(&ls->ls_rsbtbl_lock);
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode);
if (WARN_ON_ONCE(rsb_flag(r, RSB_TOSS)))
continue;
list_add(&r->res_root_list, root_list);
dlm_hold_rsb(r);
}
if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
log_error(ls, "%s toss not empty", __func__);
}
spin_unlock_bh(&ls->ls_rsbtbl_lock);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment