Commit 6c648035 authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland

dlm: switch to use rhashtable for rsbs

Replace our own hash table with the more advanced rhashtable
for keeping rsb structs.
Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent 93a693d1
......@@ -63,6 +63,14 @@ static void release_node(struct config_item *);
static struct configfs_attribute *comm_attrs[];
static struct configfs_attribute *node_attrs[];
const struct rhashtable_params dlm_rhash_rsb_params = {
.nelem_hint = 3, /* start small */
.key_len = DLM_RESNAME_MAXLEN,
.key_offset = offsetof(struct dlm_rsb, res_name),
.head_offset = offsetof(struct dlm_rsb, res_node),
.automatic_shrinking = true,
};
struct dlm_cluster {
struct config_group group;
unsigned int cl_tcp_port;
......
......@@ -21,6 +21,8 @@ struct dlm_config_node {
uint32_t comm_seq;
};
extern const struct rhashtable_params dlm_rhash_rsb_params;
#define DLM_MAX_ADDR_COUNT 3
#define DLM_PROTO_TCP 0
......
......@@ -198,14 +198,10 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
int len)
{
struct dlm_rsb *r;
uint32_t hash, bucket;
int rv;
hash = jhash(name, len, 0);
bucket = hash & (ls->ls_rsbtbl_size - 1);
spin_lock_bh(&ls->ls_rsbtbl_lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].r, name, len, &r);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
spin_unlock_bh(&ls->ls_rsbtbl_lock);
if (!rv)
return r;
......
......@@ -34,6 +34,7 @@
#include <linux/kernel.h>
#include <linux/jhash.h>
#include <linux/miscdevice.h>
#include <linux/rhashtable.h>
#include <linux/mutex.h>
#include <linux/idr.h>
#include <linux/ratelimit.h>
......@@ -99,15 +100,6 @@ do { \
} \
}
#define DLM_RTF_SHRINK_BIT 0
struct dlm_rsbtable {
struct rb_root r;
unsigned long flags;
};
/*
* Lockspace member (per node in a ls)
*/
......@@ -327,13 +319,12 @@ struct dlm_rsb {
int res_id; /* for ls_recover_idr */
uint32_t res_lvbseq;
uint32_t res_hash;
uint32_t res_bucket; /* rsbtbl */
unsigned long res_toss_time;
uint32_t res_first_lkid;
struct list_head res_lookup; /* lkbs waiting on first */
union {
struct list_head res_hashchain;
struct rb_node res_hashnode; /* rsbtbl */
struct rhash_head res_node; /* rsbtbl */
};
struct list_head res_grantqueue;
struct list_head res_convertqueue;
......@@ -592,9 +583,10 @@ struct dlm_ls {
struct idr ls_lkbidr;
spinlock_t ls_lkbidr_spin;
struct dlm_rsbtable *ls_rsbtbl;
struct rhashtable ls_rsbtbl;
#define DLM_RTF_SHRINK_BIT 0
unsigned long ls_rsbtbl_flags;
spinlock_t ls_rsbtbl_lock;
uint32_t ls_rsbtbl_size;
struct list_head ls_toss;
struct list_head ls_keep;
......
......@@ -436,8 +436,6 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
list_del(&r->res_hashchain);
/* Convert the empty list_head to a NULL rb_node for tree usage: */
memset(&r->res_hashnode, 0, sizeof(struct rb_node));
ls->ls_new_rsb_count--;
spin_unlock_bh(&ls->ls_new_rsb_spin);
......@@ -458,67 +456,31 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
return 0;
}
static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
{
char maxname[DLM_RESNAME_MAXLEN];
memset(maxname, 0, DLM_RESNAME_MAXLEN);
memcpy(maxname, name, nlen);
return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
}
int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
struct dlm_rsb **r_ret)
{
struct rb_node *node = tree->rb_node;
struct dlm_rsb *r;
int rc;
while (node) {
r = rb_entry(node, struct dlm_rsb, res_hashnode);
rc = rsb_cmp(r, name, len);
if (rc < 0)
node = node->rb_left;
else if (rc > 0)
node = node->rb_right;
else
goto found;
}
*r_ret = NULL;
return -EBADR;
char key[DLM_RESNAME_MAXLEN] = {};
found:
*r_ret = r;
memcpy(key, name, len);
*r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
if (*r_ret)
return 0;
return -EBADR;
}
static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
{
struct rb_node **newn = &tree->rb_node;
struct rb_node *parent = NULL;
int rc;
while (*newn) {
struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
res_hashnode);
int rv;
parent = *newn;
rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
if (rc < 0)
newn = &parent->rb_left;
else if (rc > 0)
newn = &parent->rb_right;
else {
log_print("rsb_insert match");
rv = rhashtable_insert_fast(rhash, &rsb->res_node,
dlm_rhash_rsb_params);
if (rv == -EEXIST) {
log_print("%s match", __func__);
dlm_dump_rsb(rsb);
dlm_dump_rsb(cur);
return -EEXIST;
}
}
rb_link_node(&rsb->res_hashnode, parent, newn);
rb_insert_color(&rsb->res_hashnode, tree);
return 0;
return rv;
}
/*
......@@ -566,8 +528,7 @@ static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
*/
static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
uint32_t hash, uint32_t b,
int dir_nodeid, int from_nodeid,
uint32_t hash, int dir_nodeid, int from_nodeid,
unsigned int flags, struct dlm_rsb **r_ret)
{
struct dlm_rsb *r = NULL;
......@@ -616,7 +577,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
spin_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (error)
goto do_new;
......@@ -690,7 +651,6 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
goto out_unlock;
r->res_hash = hash;
r->res_bucket = b;
r->res_dir_nodeid = dir_nodeid;
kref_init(&r->res_ref);
......@@ -730,7 +690,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
}
out_add:
error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
error = rsb_insert(r, &ls->ls_rsbtbl);
if (!error)
list_add(&r->res_rsbs_list, &ls->ls_keep);
out_unlock:
......@@ -745,8 +705,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
dlm_recover_masters). */
static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
uint32_t hash, uint32_t b,
int dir_nodeid, int from_nodeid,
uint32_t hash, int dir_nodeid, int from_nodeid,
unsigned int flags, struct dlm_rsb **r_ret)
{
struct dlm_rsb *r = NULL;
......@@ -761,7 +720,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
spin_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (error)
goto do_new;
......@@ -823,13 +782,12 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
goto out_unlock;
r->res_hash = hash;
r->res_bucket = b;
r->res_dir_nodeid = dir_nodeid;
r->res_master_nodeid = dir_nodeid;
r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
kref_init(&r->res_ref);
error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
error = rsb_insert(r, &ls->ls_rsbtbl);
if (!error)
list_add(&r->res_rsbs_list, &ls->ls_keep);
out_unlock:
......@@ -843,22 +801,20 @@ static int find_rsb(struct dlm_ls *ls, const void *name, int len,
int from_nodeid, unsigned int flags,
struct dlm_rsb **r_ret)
{
uint32_t hash, b;
int dir_nodeid;
uint32_t hash;
if (len > DLM_RESNAME_MAXLEN)
return -EINVAL;
hash = jhash(name, len, 0);
b = hash & (ls->ls_rsbtbl_size - 1);
dir_nodeid = dlm_hash2nodeid(ls, hash);
if (dlm_no_directory(ls))
return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
return find_rsb_nodir(ls, name, len, hash, dir_nodeid,
from_nodeid, flags, r_ret);
else
return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
return find_rsb_dir(ls, name, len, hash, dir_nodeid,
from_nodeid, flags, r_ret);
}
......@@ -1020,7 +976,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
int len, unsigned int flags, int *r_nodeid, int *result)
{
struct dlm_rsb *r = NULL;
uint32_t hash, b;
uint32_t hash;
int our_nodeid = dlm_our_nodeid();
int dir_nodeid, error;
......@@ -1034,8 +990,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
}
hash = jhash(name, len, 0);
b = hash & (ls->ls_rsbtbl_size - 1);
dir_nodeid = dlm_hash2nodeid(ls, hash);
if (dir_nodeid != our_nodeid) {
log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
......@@ -1051,7 +1005,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
return error;
spin_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (!error) {
if (rsb_flag(r, RSB_TOSS))
goto do_toss;
......@@ -1100,7 +1054,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
goto out_unlock;
r->res_hash = hash;
r->res_bucket = b;
r->res_dir_nodeid = our_nodeid;
r->res_master_nodeid = from_nodeid;
r->res_nodeid = from_nodeid;
......@@ -1108,7 +1061,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
r->res_toss_time = jiffies;
rsb_set_flag(r, RSB_TOSS);
error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
error = rsb_insert(r, &ls->ls_rsbtbl);
if (error) {
/* should never happen */
dlm_free_rsb(r);
......@@ -1141,14 +1094,10 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
{
struct dlm_rsb *r = NULL;
uint32_t hash, b;
int error;
hash = jhash(name, len, 0);
b = hash & (ls->ls_rsbtbl_size - 1);
spin_lock_bh(&ls->ls_rsbtbl_lock);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (!error)
goto out;
......@@ -1168,7 +1117,7 @@ static void toss_rsb(struct kref *kref)
rsb_set_flag(r, RSB_TOSS);
list_move(&r->res_rsbs_list, &ls->ls_toss);
r->res_toss_time = jiffies;
set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
if (r->res_lvbptr) {
dlm_free_lvb(r->res_lvbptr);
r->res_lvbptr = NULL;
......@@ -1607,10 +1556,9 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb,
return error;
}
static void shrink_bucket(struct dlm_ls *ls, int b)
static void shrink_bucket(struct dlm_ls *ls)
{
struct rb_node *n, *next;
struct dlm_rsb *r;
struct dlm_rsb *r, *safe;
char *name;
int our_nodeid = dlm_our_nodeid();
int remote_count = 0;
......@@ -1621,17 +1569,12 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
spin_lock_bh(&ls->ls_rsbtbl_lock);
if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags)) {
spin_unlock_bh(&ls->ls_rsbtbl_lock);
return;
}
for (n = rb_first(&ls->ls_rsbtbl[b].r); n; n = next) {
next = rb_next(n);
r = rb_entry(n, struct dlm_rsb, res_hashnode);
if (!rsb_flag(r, RSB_TOSS))
continue;
list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) {
/* If we're the directory record for this rsb, and
we're not the master of it, then we need to wait
for the master node to send us a dir remove for
......@@ -1674,14 +1617,15 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
}
list_del(&r->res_rsbs_list);
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
dlm_rhash_rsb_params);
dlm_free_rsb(r);
}
if (need_shrink)
set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
else
clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags);
spin_unlock_bh(&ls->ls_rsbtbl_lock);
/*
......@@ -1698,7 +1642,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
len = ls->ls_remove_lens[i];
spin_lock_bh(&ls->ls_rsbtbl_lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (rv) {
spin_unlock_bh(&ls->ls_rsbtbl_lock);
log_error(ls, "remove_name not found %s", name);
......@@ -1743,7 +1687,8 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
}
list_del(&r->res_rsbs_list);
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
dlm_rhash_rsb_params);
send_remove(r);
spin_unlock_bh(&ls->ls_rsbtbl_lock);
......@@ -1753,14 +1698,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
void dlm_scan_rsbs(struct dlm_ls *ls)
{
int i;
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
shrink_bucket(ls, i);
if (dlm_locking_stopped(ls))
break;
cond_resched();
}
shrink_bucket(ls);
}
/* lkb is master or local copy */
......@@ -4174,7 +4112,6 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
{
char name[DLM_RESNAME_MAXLEN+1];
struct dlm_rsb *r;
uint32_t hash, b;
int rv, len, dir_nodeid, from_nodeid;
from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
......@@ -4194,24 +4131,22 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
return;
}
/* Look for name on rsbtbl.toss, if it's there, kill it.
If it's on rsbtbl.keep, it's being used, and we should ignore this
message. This is an expected race between the dir node sending a
request to the master node at the same time as the master node sends
a remove to the dir node. The resolution to that race is for the
dir node to ignore the remove message, and the master node to
recreate the master rsb when it gets a request from the dir node for
an rsb it doesn't have. */
/* Look for name in rsb toss state, if it's there, kill it.
* If it's in non toss state, it's being used, and we should ignore this
* message. This is an expected race between the dir node sending a
* request to the master node at the same time as the master node sends
* a remove to the dir node. The resolution to that race is for the
* dir node to ignore the remove message, and the master node to
* recreate the master rsb when it gets a request from the dir node for
* an rsb it doesn't have.
*/
memset(name, 0, sizeof(name));
memcpy(name, ms->m_extra, len);
hash = jhash(name, len, 0);
b = hash & (ls->ls_rsbtbl_size - 1);
spin_lock_bh(&ls->ls_rsbtbl_lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
if (rv) {
/* should not happen */
log_error(ls, "%s from %d not found %s", __func__,
......@@ -4247,7 +4182,8 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
if (kref_put(&r->res_ref, kill_rsb)) {
list_del(&r->res_rsbs_list);
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
dlm_rhash_rsb_params);
spin_unlock_bh(&ls->ls_rsbtbl_lock);
dlm_free_rsb(r);
} else {
......
......@@ -29,7 +29,7 @@ void dlm_unlock_recovery(struct dlm_ls *ls);
int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
int len, unsigned int flags, int *r_nodeid, int *result);
int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
struct dlm_rsb **r_ret);
void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list);
......
......@@ -410,9 +410,9 @@ static int new_lockspace(const char *name, const char *cluster,
int *ops_result, dlm_lockspace_t **lockspace)
{
struct dlm_ls *ls;
int i, size, error;
int do_unreg = 0;
int namelen = strlen(name);
int i, error;
if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
return -EINVAL;
......@@ -498,15 +498,10 @@ static int new_lockspace(const char *name, const char *cluster,
INIT_LIST_HEAD(&ls->ls_toss);
INIT_LIST_HEAD(&ls->ls_keep);
spin_lock_init(&ls->ls_rsbtbl_lock);
size = READ_ONCE(dlm_config.ci_rsbtbl_size);
ls->ls_rsbtbl_size = size;
ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
if (!ls->ls_rsbtbl)
error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
if (error)
goto out_lsfree;
for (i = 0; i < size; i++) {
ls->ls_rsbtbl[i].r.rb_node = NULL;
}
for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
......@@ -669,7 +664,7 @@ static int new_lockspace(const char *name, const char *cluster,
out_rsbtbl:
for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
kfree(ls->ls_remove_names[i]);
vfree(ls->ls_rsbtbl);
rhashtable_destroy(&ls->ls_rsbtbl);
out_lsfree:
if (do_unreg)
kobject_put(&ls->ls_kobj);
......@@ -772,10 +767,16 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
return rv;
}
static void rhash_free_rsb(void *ptr, void *arg)
{
struct dlm_rsb *rsb = ptr;
dlm_free_rsb(rsb);
}
static int release_lockspace(struct dlm_ls *ls, int force)
{
struct dlm_rsb *rsb;
struct rb_node *n;
int i, busy, rv;
busy = lockspace_busy(ls, force);
......@@ -834,19 +835,9 @@ static int release_lockspace(struct dlm_ls *ls, int force)
idr_destroy(&ls->ls_lkbidr);
/*
* Free all rsb's on rsbtbl[] lists
* Free all rsb's on rsbtbl
*/
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
while ((n = rb_first(&ls->ls_rsbtbl[i].r))) {
rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
list_del(&rsb->res_rsbs_list);
rb_erase(n, &ls->ls_rsbtbl[i].r);
dlm_free_rsb(rsb);
}
}
vfree(ls->ls_rsbtbl);
rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
kfree(ls->ls_remove_names[i]);
......
......@@ -887,7 +887,8 @@ void dlm_clear_toss(struct dlm_ls *ls)
spin_lock_bh(&ls->ls_rsbtbl_lock);
list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) {
list_del(&r->res_rsbs_list);
rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].r);
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
dlm_rhash_rsb_params);
dlm_free_rsb(r);
count++;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment