Commit 1d7c484e authored by David Teigland's avatar David Teigland

dlm: use idr instead of list for recovered rsbs

When a large number of resources are being recovered,
a linear search of the recover_list takes a long time.
Use an idr in place of a list.
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent c04fecb4
...@@ -288,6 +288,7 @@ struct dlm_rsb { ...@@ -288,6 +288,7 @@ struct dlm_rsb {
int res_nodeid; int res_nodeid;
int res_master_nodeid; int res_master_nodeid;
int res_dir_nodeid; int res_dir_nodeid;
int res_id; /* for ls_recover_idr */
uint32_t res_lvbseq; uint32_t res_lvbseq;
uint32_t res_hash; uint32_t res_hash;
uint32_t res_bucket; /* rsbtbl */ uint32_t res_bucket; /* rsbtbl */
...@@ -587,6 +588,8 @@ struct dlm_ls { ...@@ -587,6 +588,8 @@ struct dlm_ls {
struct list_head ls_recover_list; struct list_head ls_recover_list;
spinlock_t ls_recover_list_lock; spinlock_t ls_recover_list_lock;
int ls_recover_list_count; int ls_recover_list_count;
struct idr ls_recover_idr;
spinlock_t ls_recover_idr_lock;
wait_queue_head_t ls_wait_general; wait_queue_head_t ls_wait_general;
struct mutex ls_clear_proc_locks; struct mutex ls_clear_proc_locks;
......
...@@ -565,6 +565,8 @@ static int new_lockspace(const char *name, const char *cluster, ...@@ -565,6 +565,8 @@ static int new_lockspace(const char *name, const char *cluster,
INIT_LIST_HEAD(&ls->ls_recover_list); INIT_LIST_HEAD(&ls->ls_recover_list);
spin_lock_init(&ls->ls_recover_list_lock); spin_lock_init(&ls->ls_recover_list_lock);
idr_init(&ls->ls_recover_idr);
spin_lock_init(&ls->ls_recover_idr_lock);
ls->ls_recover_list_count = 0; ls->ls_recover_list_count = 0;
ls->ls_local_handle = ls; ls->ls_local_handle = ls;
init_waitqueue_head(&ls->ls_wait_general); init_waitqueue_head(&ls->ls_wait_general);
...@@ -636,6 +638,7 @@ static int new_lockspace(const char *name, const char *cluster, ...@@ -636,6 +638,7 @@ static int new_lockspace(const char *name, const char *cluster,
spin_lock(&lslist_lock); spin_lock(&lslist_lock);
list_del(&ls->ls_list); list_del(&ls->ls_list);
spin_unlock(&lslist_lock); spin_unlock(&lslist_lock);
idr_destroy(&ls->ls_recover_idr);
kfree(ls->ls_recover_buf); kfree(ls->ls_recover_buf);
out_lkbfree: out_lkbfree:
idr_destroy(&ls->ls_lkbidr); idr_destroy(&ls->ls_lkbidr);
......
...@@ -325,7 +325,7 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) ...@@ -325,7 +325,7 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
if (error) if (error)
goto out; goto out;
memcpy(rc->rc_buf, r->res_name, r->res_length); memcpy(rc->rc_buf, r->res_name, r->res_length);
rc->rc_id = (unsigned long) r; rc->rc_id = (unsigned long) r->res_id;
send_rcom(ls, mh, rc); send_rcom(ls, mh, rc);
out: out:
......
...@@ -277,22 +277,6 @@ static void recover_list_del(struct dlm_rsb *r) ...@@ -277,22 +277,6 @@ static void recover_list_del(struct dlm_rsb *r)
dlm_put_rsb(r); dlm_put_rsb(r);
} }
static struct dlm_rsb *recover_list_find(struct dlm_ls *ls, uint64_t id)
{
struct dlm_rsb *r = NULL;
spin_lock(&ls->ls_recover_list_lock);
list_for_each_entry(r, &ls->ls_recover_list, res_recover_list) {
if (id == (unsigned long) r)
goto out;
}
r = NULL;
out:
spin_unlock(&ls->ls_recover_list_lock);
return r;
}
static void recover_list_clear(struct dlm_ls *ls) static void recover_list_clear(struct dlm_ls *ls)
{ {
struct dlm_rsb *r, *s; struct dlm_rsb *r, *s;
...@@ -313,6 +297,94 @@ static void recover_list_clear(struct dlm_ls *ls) ...@@ -313,6 +297,94 @@ static void recover_list_clear(struct dlm_ls *ls)
spin_unlock(&ls->ls_recover_list_lock); spin_unlock(&ls->ls_recover_list_lock);
} }
static int recover_idr_empty(struct dlm_ls *ls)
{
int empty = 1;
spin_lock(&ls->ls_recover_idr_lock);
if (ls->ls_recover_list_count)
empty = 0;
spin_unlock(&ls->ls_recover_idr_lock);
return empty;
}
static int recover_idr_add(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
int rv, id;
rv = idr_pre_get(&ls->ls_recover_idr, GFP_NOFS);
if (!rv)
return -ENOMEM;
spin_lock(&ls->ls_recover_idr_lock);
if (r->res_id) {
spin_unlock(&ls->ls_recover_idr_lock);
return -1;
}
rv = idr_get_new_above(&ls->ls_recover_idr, r, 1, &id);
if (rv) {
spin_unlock(&ls->ls_recover_idr_lock);
return rv;
}
r->res_id = id;
ls->ls_recover_list_count++;
dlm_hold_rsb(r);
spin_unlock(&ls->ls_recover_idr_lock);
return 0;
}
static void recover_idr_del(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
spin_lock(&ls->ls_recover_idr_lock);
idr_remove(&ls->ls_recover_idr, r->res_id);
r->res_id = 0;
ls->ls_recover_list_count--;
spin_unlock(&ls->ls_recover_idr_lock);
dlm_put_rsb(r);
}
static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
{
struct dlm_rsb *r;
spin_lock(&ls->ls_recover_idr_lock);
r = idr_find(&ls->ls_recover_idr, (int)id);
spin_unlock(&ls->ls_recover_idr_lock);
return r;
}
static int recover_idr_clear_rsb(int id, void *p, void *data)
{
struct dlm_ls *ls = data;
struct dlm_rsb *r = p;
r->res_id = 0;
r->res_recover_locks_count = 0;
ls->ls_recover_list_count--;
dlm_put_rsb(r);
return 0;
}
static void recover_idr_clear(struct dlm_ls *ls)
{
spin_lock(&ls->ls_recover_idr_lock);
idr_for_each(&ls->ls_recover_idr, recover_idr_clear_rsb, ls);
idr_remove_all(&ls->ls_recover_idr);
if (ls->ls_recover_list_count != 0) {
log_error(ls, "warning: recover_list_count %d",
ls->ls_recover_list_count);
ls->ls_recover_list_count = 0;
}
spin_unlock(&ls->ls_recover_idr_lock);
}
/* Master recovery: find new master node for rsb's that were /* Master recovery: find new master node for rsb's that were
mastered on nodes that have been removed. mastered on nodes that have been removed.
...@@ -408,7 +480,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count) ...@@ -408,7 +480,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count)
set_new_master(r); set_new_master(r);
error = 0; error = 0;
} else { } else {
recover_list_add(r); recover_idr_add(r);
error = dlm_send_rcom_lookup(r, dir_nodeid); error = dlm_send_rcom_lookup(r, dir_nodeid);
} }
...@@ -493,10 +565,10 @@ int dlm_recover_masters(struct dlm_ls *ls) ...@@ -493,10 +565,10 @@ int dlm_recover_masters(struct dlm_ls *ls)
log_debug(ls, "dlm_recover_masters %u of %u", count, total); log_debug(ls, "dlm_recover_masters %u of %u", count, total);
error = dlm_wait_function(ls, &recover_list_empty); error = dlm_wait_function(ls, &recover_idr_empty);
out: out:
if (error) if (error)
recover_list_clear(ls); recover_idr_clear(ls);
return error; return error;
} }
...@@ -505,7 +577,7 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) ...@@ -505,7 +577,7 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
struct dlm_rsb *r; struct dlm_rsb *r;
int ret_nodeid, new_master; int ret_nodeid, new_master;
r = recover_list_find(ls, rc->rc_id); r = recover_idr_find(ls, rc->rc_id);
if (!r) { if (!r) {
log_error(ls, "dlm_recover_master_reply no id %llx", log_error(ls, "dlm_recover_master_reply no id %llx",
(unsigned long long)rc->rc_id); (unsigned long long)rc->rc_id);
...@@ -524,9 +596,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) ...@@ -524,9 +596,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
r->res_nodeid = new_master; r->res_nodeid = new_master;
set_new_master(r); set_new_master(r);
unlock_rsb(r); unlock_rsb(r);
recover_list_del(r); recover_idr_del(r);
if (recover_list_empty(ls)) if (recover_idr_empty(ls))
wake_up(&ls->ls_wait_general); wake_up(&ls->ls_wait_general);
out: out:
return 0; return 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment