Commit 578acf9a authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland

dlm: use spin_lock_bh for message processing

Use spin_lock_bh for all spinlocks involved in message processing,
in preparation for softirq message processing.  DLM lock requests
from user space involve dlm processing in user context, in addition
to the standard kernel context, necessitating bh variants.
Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent 308533b4
......@@ -142,12 +142,12 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
cb->astparam = lkb->lkb_astparam;
INIT_WORK(&cb->work, dlm_callback_work);
spin_lock(&ls->ls_cb_lock);
spin_lock_bh(&ls->ls_cb_lock);
if (test_bit(LSFL_CB_DELAY, &ls->ls_flags))
list_add(&cb->list, &ls->ls_cb_delay);
else
queue_work(ls->ls_callback_wq, &cb->work);
spin_unlock(&ls->ls_cb_lock);
spin_unlock_bh(&ls->ls_cb_lock);
break;
case DLM_ENQUEUE_CALLBACK_SUCCESS:
break;
......@@ -179,9 +179,9 @@ void dlm_callback_stop(struct dlm_ls *ls)
void dlm_callback_suspend(struct dlm_ls *ls)
{
if (ls->ls_callback_wq) {
spin_lock(&ls->ls_cb_lock);
spin_lock_bh(&ls->ls_cb_lock);
set_bit(LSFL_CB_DELAY, &ls->ls_flags);
spin_unlock(&ls->ls_cb_lock);
spin_unlock_bh(&ls->ls_cb_lock);
flush_workqueue(ls->ls_callback_wq);
}
......@@ -199,7 +199,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
return;
more:
spin_lock(&ls->ls_cb_lock);
spin_lock_bh(&ls->ls_cb_lock);
list_for_each_entry_safe(cb, safe, &ls->ls_cb_delay, list) {
list_del(&cb->list);
queue_work(ls->ls_callback_wq, &cb->work);
......@@ -210,7 +210,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
empty = list_empty(&ls->ls_cb_delay);
if (empty)
clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
spin_unlock(&ls->ls_cb_lock);
spin_unlock_bh(&ls->ls_cb_lock);
sum += count;
if (!empty) {
......
......@@ -452,7 +452,7 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
spin_lock(&ls->ls_rsbtbl[bucket].lock);
spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
if (!RB_EMPTY_ROOT(tree)) {
for (node = rb_first(tree); node; node = rb_next(node)) {
r = rb_entry(node, struct dlm_rsb, res_hashnode);
......@@ -460,12 +460,12 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
dlm_hold_rsb(r);
ri->rsb = r;
ri->bucket = bucket;
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
return ri;
}
}
}
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
/*
* move to the first rsb in the next non-empty bucket
......@@ -484,18 +484,18 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
}
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
spin_lock(&ls->ls_rsbtbl[bucket].lock);
spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
if (!RB_EMPTY_ROOT(tree)) {
node = rb_first(tree);
r = rb_entry(node, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r);
ri->rsb = r;
ri->bucket = bucket;
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
*pos = n;
return ri;
}
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
}
}
......@@ -516,7 +516,7 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
* move to the next rsb in the same bucket
*/
spin_lock(&ls->ls_rsbtbl[bucket].lock);
spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
rp = ri->rsb;
next = rb_next(&rp->res_hashnode);
......@@ -524,12 +524,12 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
r = rb_entry(next, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r);
ri->rsb = r;
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
dlm_put_rsb(rp);
++*pos;
return ri;
}
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
dlm_put_rsb(rp);
/*
......@@ -550,18 +550,18 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
}
tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
spin_lock(&ls->ls_rsbtbl[bucket].lock);
spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
if (!RB_EMPTY_ROOT(tree)) {
next = rb_first(tree);
r = rb_entry(next, struct dlm_rsb, res_hashnode);
dlm_hold_rsb(r);
ri->rsb = r;
ri->bucket = bucket;
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
*pos = n;
return ri;
}
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
}
}
......@@ -743,7 +743,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
goto out;
}
spin_lock(&ls->ls_waiters_lock);
spin_lock_bh(&ls->ls_waiters_lock);
memset(debug_buf, 0, sizeof(debug_buf));
list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
......@@ -754,7 +754,7 @@ static ssize_t waiters_read(struct file *file, char __user *userbuf,
break;
pos += ret;
}
spin_unlock(&ls->ls_waiters_lock);
spin_unlock_bh(&ls->ls_waiters_lock);
dlm_unlock_recovery(ls);
rv = simple_read_from_buffer(userbuf, count, ppos, debug_buf, pos);
......
......@@ -204,12 +204,12 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
hash = jhash(name, len, 0);
bucket = hash & (ls->ls_rsbtbl_size - 1);
spin_lock(&ls->ls_rsbtbl[bucket].lock);
spin_lock_bh(&ls->ls_rsbtbl[bucket].lock);
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
if (rv)
rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
name, len, &r);
spin_unlock(&ls->ls_rsbtbl[bucket].lock);
spin_unlock_bh(&ls->ls_rsbtbl[bucket].lock);
if (!rv)
return r;
......@@ -245,7 +245,7 @@ static void drop_dir_ctx(struct dlm_ls *ls, int nodeid)
{
struct dlm_dir_dump *dd, *safe;
write_lock(&ls->ls_dir_dump_lock);
write_lock_bh(&ls->ls_dir_dump_lock);
list_for_each_entry_safe(dd, safe, &ls->ls_dir_dump_list, list) {
if (dd->nodeid_init == nodeid) {
log_error(ls, "drop dump seq %llu",
......@@ -254,21 +254,21 @@ static void drop_dir_ctx(struct dlm_ls *ls, int nodeid)
kfree(dd);
}
}
write_unlock(&ls->ls_dir_dump_lock);
write_unlock_bh(&ls->ls_dir_dump_lock);
}
static struct dlm_dir_dump *lookup_dir_dump(struct dlm_ls *ls, int nodeid)
{
struct dlm_dir_dump *iter, *dd = NULL;
read_lock(&ls->ls_dir_dump_lock);
read_lock_bh(&ls->ls_dir_dump_lock);
list_for_each_entry(iter, &ls->ls_dir_dump_list, list) {
if (iter->nodeid_init == nodeid) {
dd = iter;
break;
}
}
read_unlock(&ls->ls_dir_dump_lock);
read_unlock_bh(&ls->ls_dir_dump_lock);
return dd;
}
......@@ -291,9 +291,9 @@ static struct dlm_dir_dump *init_dir_dump(struct dlm_ls *ls, int nodeid)
dd->seq_init = ls->ls_recover_seq;
dd->nodeid_init = nodeid;
write_lock(&ls->ls_dir_dump_lock);
write_lock_bh(&ls->ls_dir_dump_lock);
list_add(&dd->list, &ls->ls_dir_dump_list);
write_unlock(&ls->ls_dir_dump_lock);
write_unlock_bh(&ls->ls_dir_dump_lock);
return dd;
}
......@@ -311,7 +311,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
struct dlm_dir_dump *dd;
__be16 be_namelen;
read_lock(&ls->ls_masters_lock);
read_lock_bh(&ls->ls_masters_lock);
if (inlen > 1) {
dd = lookup_dir_dump(ls, nodeid);
......@@ -397,12 +397,12 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
log_rinfo(ls, "dlm_recover_directory nodeid %d sent %u res out %u messages",
nodeid, dd->sent_res, dd->sent_msg);
write_lock(&ls->ls_dir_dump_lock);
write_lock_bh(&ls->ls_dir_dump_lock);
list_del_init(&dd->list);
write_unlock(&ls->ls_dir_dump_lock);
write_unlock_bh(&ls->ls_dir_dump_lock);
kfree(dd);
}
out:
read_unlock(&ls->ls_masters_lock);
read_unlock_bh(&ls->ls_masters_lock);
}
This diff is collapsed.
......@@ -69,12 +69,12 @@ static inline int is_master(struct dlm_rsb *r)
static inline void lock_rsb(struct dlm_rsb *r)
{
spin_lock(&r->res_lock);
spin_lock_bh(&r->res_lock);
}
static inline void unlock_rsb(struct dlm_rsb *r)
{
spin_unlock(&r->res_lock);
spin_unlock_bh(&r->res_lock);
}
#endif
......
......@@ -251,15 +251,15 @@ static struct dlm_ls *find_ls_to_scan(void)
{
struct dlm_ls *ls;
spin_lock(&lslist_lock);
spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (time_after_eq(jiffies, ls->ls_scan_time +
dlm_config.ci_scan_secs * HZ)) {
spin_unlock(&lslist_lock);
spin_unlock_bh(&lslist_lock);
return ls;
}
}
spin_unlock(&lslist_lock);
spin_unlock_bh(&lslist_lock);
return NULL;
}
......@@ -306,7 +306,7 @@ struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
{
struct dlm_ls *ls;
spin_lock(&lslist_lock);
spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (ls->ls_global_id == id) {
......@@ -316,7 +316,7 @@ struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
}
ls = NULL;
out:
spin_unlock(&lslist_lock);
spin_unlock_bh(&lslist_lock);
return ls;
}
......@@ -324,7 +324,7 @@ struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
{
struct dlm_ls *ls;
spin_lock(&lslist_lock);
spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (ls->ls_local_handle == lockspace) {
atomic_inc(&ls->ls_count);
......@@ -333,7 +333,7 @@ struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
}
ls = NULL;
out:
spin_unlock(&lslist_lock);
spin_unlock_bh(&lslist_lock);
return ls;
}
......@@ -341,7 +341,7 @@ struct dlm_ls *dlm_find_lockspace_device(int minor)
{
struct dlm_ls *ls;
spin_lock(&lslist_lock);
spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (ls->ls_device.minor == minor) {
atomic_inc(&ls->ls_count);
......@@ -350,7 +350,7 @@ struct dlm_ls *dlm_find_lockspace_device(int minor)
}
ls = NULL;
out:
spin_unlock(&lslist_lock);
spin_unlock_bh(&lslist_lock);
return ls;
}
......@@ -365,15 +365,15 @@ static void remove_lockspace(struct dlm_ls *ls)
retry:
wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
spin_lock(&lslist_lock);
spin_lock_bh(&lslist_lock);
if (atomic_read(&ls->ls_count) != 0) {
spin_unlock(&lslist_lock);
spin_unlock_bh(&lslist_lock);
goto retry;
}
WARN_ON(ls->ls_create_count != 0);
list_del(&ls->ls_list);
spin_unlock(&lslist_lock);
spin_unlock_bh(&lslist_lock);
}
static int threads_start(void)
......@@ -448,7 +448,7 @@ static int new_lockspace(const char *name, const char *cluster,
error = 0;
spin_lock(&lslist_lock);
spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
WARN_ON(ls->ls_create_count <= 0);
if (ls->ls_namelen != namelen)
......@@ -464,7 +464,7 @@ static int new_lockspace(const char *name, const char *cluster,
error = 1;
break;
}
spin_unlock(&lslist_lock);
spin_unlock_bh(&lslist_lock);
if (error)
goto out;
......@@ -583,10 +583,10 @@ static int new_lockspace(const char *name, const char *cluster,
INIT_LIST_HEAD(&ls->ls_dir_dump_list);
rwlock_init(&ls->ls_dir_dump_lock);
spin_lock(&lslist_lock);
spin_lock_bh(&lslist_lock);
ls->ls_create_count = 1;
list_add(&ls->ls_list, &lslist);
spin_unlock(&lslist_lock);
spin_unlock_bh(&lslist_lock);
if (flags & DLM_LSFL_FS) {
error = dlm_callback_start(ls);
......@@ -655,9 +655,9 @@ static int new_lockspace(const char *name, const char *cluster,
out_callback:
dlm_callback_stop(ls);
out_delist:
spin_lock(&lslist_lock);
spin_lock_bh(&lslist_lock);
list_del(&ls->ls_list);
spin_unlock(&lslist_lock);
spin_unlock_bh(&lslist_lock);
idr_destroy(&ls->ls_recover_idr);
kfree(ls->ls_recover_buf);
out_lkbidr:
......@@ -756,7 +756,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
{
int rv;
spin_lock(&ls->ls_lkbidr_spin);
spin_lock_bh(&ls->ls_lkbidr_spin);
if (force == 0) {
rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
} else if (force == 1) {
......@@ -764,7 +764,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force)
} else {
rv = 0;
}
spin_unlock(&ls->ls_lkbidr_spin);
spin_unlock_bh(&ls->ls_lkbidr_spin);
return rv;
}
......@@ -776,7 +776,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
busy = lockspace_busy(ls, force);
spin_lock(&lslist_lock);
spin_lock_bh(&lslist_lock);
if (ls->ls_create_count == 1) {
if (busy) {
rv = -EBUSY;
......@@ -790,7 +790,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
} else {
rv = -EINVAL;
}
spin_unlock(&lslist_lock);
spin_unlock_bh(&lslist_lock);
if (rv) {
log_debug(ls, "release_lockspace no remove %d", rv);
......@@ -918,20 +918,19 @@ void dlm_stop_lockspaces(void)
restart:
count = 0;
spin_lock(&lslist_lock);
spin_lock_bh(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
count++;
continue;
}
spin_unlock(&lslist_lock);
spin_unlock_bh(&lslist_lock);
log_error(ls, "no userland control daemon, stopping lockspace");
dlm_ls_stop(ls);
goto restart;
}
spin_unlock(&lslist_lock);
spin_unlock_bh(&lslist_lock);
if (count)
log_print("dlm user daemon left %d lockspaces", count);
}
......@@ -867,36 +867,36 @@ static void process_dlm_messages(struct work_struct *work)
{
struct processqueue_entry *pentry;
spin_lock(&processqueue_lock);
spin_lock_bh(&processqueue_lock);
pentry = list_first_entry_or_null(&processqueue,
struct processqueue_entry, list);
if (WARN_ON_ONCE(!pentry)) {
process_dlm_messages_pending = false;
spin_unlock(&processqueue_lock);
spin_unlock_bh(&processqueue_lock);
return;
}
list_del(&pentry->list);
atomic_dec(&processqueue_count);
spin_unlock(&processqueue_lock);
spin_unlock_bh(&processqueue_lock);
for (;;) {
dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
pentry->buflen);
free_processqueue_entry(pentry);
spin_lock(&processqueue_lock);
spin_lock_bh(&processqueue_lock);
pentry = list_first_entry_or_null(&processqueue,
struct processqueue_entry, list);
if (!pentry) {
process_dlm_messages_pending = false;
spin_unlock(&processqueue_lock);
spin_unlock_bh(&processqueue_lock);
break;
}
list_del(&pentry->list);
atomic_dec(&processqueue_count);
spin_unlock(&processqueue_lock);
spin_unlock_bh(&processqueue_lock);
}
}
......@@ -966,14 +966,14 @@ static int receive_from_sock(struct connection *con, int buflen)
memmove(con->rx_leftover_buf, pentry->buf + ret,
con->rx_leftover);
spin_lock(&processqueue_lock);
spin_lock_bh(&processqueue_lock);
ret = atomic_inc_return(&processqueue_count);
list_add_tail(&pentry->list, &processqueue);
if (!process_dlm_messages_pending) {
process_dlm_messages_pending = true;
queue_work(process_workqueue, &process_work);
}
spin_unlock(&processqueue_lock);
spin_unlock_bh(&processqueue_lock);
if (ret > DLM_MAX_PROCESS_BUFFERS)
return DLM_IO_FLUSH;
......
......@@ -630,7 +630,7 @@ int dlm_ls_stop(struct dlm_ls *ls)
* message to the requestqueue without races.
*/
write_lock(&ls->ls_recv_active);
write_lock_bh(&ls->ls_recv_active);
/*
* Abort any recovery that's in progress (see RECOVER_STOP,
......@@ -638,23 +638,23 @@ int dlm_ls_stop(struct dlm_ls *ls)
* dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
*/
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
ls->ls_recover_seq++;
/* activate requestqueue and stop processing */
write_lock(&ls->ls_requestqueue_lock);
write_lock_bh(&ls->ls_requestqueue_lock);
set_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
write_unlock(&ls->ls_requestqueue_lock);
spin_unlock(&ls->ls_recover_lock);
write_unlock_bh(&ls->ls_requestqueue_lock);
spin_unlock_bh(&ls->ls_recover_lock);
/*
* Let dlm_recv run again, now any normal messages will be saved on the
* requestqueue for later.
*/
write_unlock(&ls->ls_recv_active);
write_unlock_bh(&ls->ls_recv_active);
/*
* This in_recovery lock does two things:
......@@ -679,13 +679,13 @@ int dlm_ls_stop(struct dlm_ls *ls)
dlm_recoverd_suspend(ls);
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
kfree(ls->ls_slots);
ls->ls_slots = NULL;
ls->ls_num_slots = 0;
ls->ls_slots_size = 0;
ls->ls_recover_status = 0;
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
dlm_recoverd_resume(ls);
......@@ -719,12 +719,12 @@ int dlm_ls_start(struct dlm_ls *ls)
if (error < 0)
goto fail_rv;
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
/* the lockspace needs to be stopped before it can be started */
if (!dlm_locking_stopped(ls)) {
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
log_error(ls, "start ignored: lockspace running");
error = -EINVAL;
goto fail;
......@@ -735,7 +735,7 @@ int dlm_ls_start(struct dlm_ls *ls)
rv->seq = ++ls->ls_recover_seq;
rv_old = ls->ls_recover_args;
ls->ls_recover_args = rv;
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
if (rv_old) {
log_error(ls, "unused recovery %llx %d",
......
......@@ -364,9 +364,9 @@ int dlm_midcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
node->users = 0;
midcomms_node_reset(node);
spin_lock(&nodes_lock);
spin_lock_bh(&nodes_lock);
hlist_add_head_rcu(&node->hlist, &node_hash[r]);
spin_unlock(&nodes_lock);
spin_unlock_bh(&nodes_lock);
node->debugfs = dlm_create_debug_comms_file(nodeid, node);
return 0;
......@@ -477,7 +477,7 @@ static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
{
spin_lock(&node->state_lock);
spin_lock_bh(&node->state_lock);
pr_debug("receive passive fin ack from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
......@@ -491,13 +491,13 @@ static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
wake_up(&node->shutdown_wait);
break;
default:
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
log_print("%s: unexpected state: %d",
__func__, node->state);
WARN_ON_ONCE(1);
return;
}
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
}
static void dlm_receive_buffer_3_2_trace(uint32_t seq,
......@@ -534,7 +534,7 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p,
if (is_expected_seq) {
switch (p->header.h_cmd) {
case DLM_FIN:
spin_lock(&node->state_lock);
spin_lock_bh(&node->state_lock);
pr_debug("receive fin msg from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
......@@ -575,13 +575,13 @@ static void dlm_midcomms_receive_buffer(const union dlm_packet *p,
/* probably remove_member caught it, do nothing */
break;
default:
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
log_print("%s: unexpected state: %d",
__func__, node->state);
WARN_ON_ONCE(1);
return;
}
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
break;
default:
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
......@@ -1182,7 +1182,7 @@ void dlm_midcomms_exit(void)
static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
{
spin_lock(&node->state_lock);
spin_lock_bh(&node->state_lock);
pr_debug("receive active fin ack from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
......@@ -1202,13 +1202,13 @@ static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
wake_up(&node->shutdown_wait);
break;
default:
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
log_print("%s: unexpected state: %d",
__func__, node->state);
WARN_ON_ONCE(1);
return;
}
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
}
void dlm_midcomms_add_member(int nodeid)
......@@ -1223,7 +1223,7 @@ void dlm_midcomms_add_member(int nodeid)
return;
}
spin_lock(&node->state_lock);
spin_lock_bh(&node->state_lock);
if (!node->users) {
pr_debug("receive add member from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
......@@ -1251,7 +1251,7 @@ void dlm_midcomms_add_member(int nodeid)
node->users++;
pr_debug("node %d users inc count %d\n", nodeid, node->users);
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
srcu_read_unlock(&nodes_srcu, idx);
}
......@@ -1269,13 +1269,13 @@ void dlm_midcomms_remove_member(int nodeid)
return;
}
spin_lock(&node->state_lock);
spin_lock_bh(&node->state_lock);
/* case of dlm_midcomms_addr() created node but
* was not added before because dlm_midcomms_close()
* removed the node
*/
if (!node->users) {
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
srcu_read_unlock(&nodes_srcu, idx);
return;
}
......@@ -1313,7 +1313,7 @@ void dlm_midcomms_remove_member(int nodeid)
break;
}
}
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
srcu_read_unlock(&nodes_srcu, idx);
}
......@@ -1351,7 +1351,7 @@ static void midcomms_shutdown(struct midcomms_node *node)
return;
}
spin_lock(&node->state_lock);
spin_lock_bh(&node->state_lock);
pr_debug("receive active shutdown for node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
switch (node->state) {
......@@ -1370,7 +1370,7 @@ static void midcomms_shutdown(struct midcomms_node *node)
*/
break;
}
spin_unlock(&node->state_lock);
spin_unlock_bh(&node->state_lock);
if (DLM_DEBUG_FENCE_TERMINATION)
msleep(5000);
......@@ -1441,9 +1441,9 @@ int dlm_midcomms_close(int nodeid)
ret = dlm_lowcomms_close(nodeid);
dlm_delete_debug_comms_file(node->debugfs);
spin_lock(&nodes_lock);
spin_lock_bh(&nodes_lock);
hlist_del_rcu(&node->hlist);
spin_unlock(&nodes_lock);
spin_unlock_bh(&nodes_lock);
srcu_read_unlock(&nodes_srcu, idx);
/* wait that all readers left until flush send queue */
......
......@@ -143,18 +143,18 @@ static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
static void allow_sync_reply(struct dlm_ls *ls, __le64 *new_seq)
{
spin_lock(&ls->ls_rcom_spin);
spin_lock_bh(&ls->ls_rcom_spin);
*new_seq = cpu_to_le64(++ls->ls_rcom_seq);
set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
spin_unlock(&ls->ls_rcom_spin);
spin_unlock_bh(&ls->ls_rcom_spin);
}
static void disallow_sync_reply(struct dlm_ls *ls)
{
spin_lock(&ls->ls_rcom_spin);
spin_lock_bh(&ls->ls_rcom_spin);
clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
spin_unlock(&ls->ls_rcom_spin);
spin_unlock_bh(&ls->ls_rcom_spin);
}
/*
......@@ -245,10 +245,10 @@ static void receive_rcom_status(struct dlm_ls *ls,
goto do_create;
}
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
status = ls->ls_recover_status;
num_slots = ls->ls_num_slots;
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
len += num_slots * sizeof(struct rcom_slot);
do_create:
......@@ -266,9 +266,9 @@ static void receive_rcom_status(struct dlm_ls *ls,
if (!num_slots)
goto do_send;
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
if (ls->ls_num_slots != num_slots) {
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
log_debug(ls, "receive_rcom_status num_slots %d to %d",
num_slots, ls->ls_num_slots);
rc->rc_result = 0;
......@@ -277,7 +277,7 @@ static void receive_rcom_status(struct dlm_ls *ls,
}
dlm_slots_copy_out(ls, rc);
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
do_send:
send_rcom_stateless(msg, rc);
......@@ -285,7 +285,7 @@ static void receive_rcom_status(struct dlm_ls *ls,
static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
{
spin_lock(&ls->ls_rcom_spin);
spin_lock_bh(&ls->ls_rcom_spin);
if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
le64_to_cpu(rc_in->rc_id) != ls->ls_rcom_seq) {
log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
......@@ -301,7 +301,7 @@ static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in)
clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
wake_up(&ls->ls_wait_general);
out:
spin_unlock(&ls->ls_rcom_spin);
spin_unlock_bh(&ls->ls_rcom_spin);
}
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
......@@ -613,11 +613,11 @@ void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, int nodeid)
break;
}
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
status = ls->ls_recover_status;
stop = dlm_recovery_stopped(ls);
seq = ls->ls_recover_seq;
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
if (stop && (rc->rc_type != cpu_to_le32(DLM_RCOM_STATUS)))
goto ignore;
......
......@@ -74,9 +74,9 @@ int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
uint32_t dlm_recover_status(struct dlm_ls *ls)
{
uint32_t status;
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
status = ls->ls_recover_status;
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
return status;
}
......@@ -87,9 +87,9 @@ static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
{
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
_set_recover_status(ls, status);
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
}
static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
......@@ -188,13 +188,13 @@ int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq)
rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
if (!rv) {
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
_set_recover_status(ls, DLM_RS_NODES_ALL);
ls->ls_num_slots = num_slots;
ls->ls_slots_size = slots_size;
ls->ls_slots = slots;
ls->ls_generation = gen;
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
} else {
dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
}
......@@ -241,9 +241,9 @@ static int recover_list_empty(struct dlm_ls *ls)
{
int empty;
spin_lock(&ls->ls_recover_list_lock);
spin_lock_bh(&ls->ls_recover_list_lock);
empty = list_empty(&ls->ls_recover_list);
spin_unlock(&ls->ls_recover_list_lock);
spin_unlock_bh(&ls->ls_recover_list_lock);
return empty;
}
......@@ -252,23 +252,23 @@ static void recover_list_add(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
spin_lock(&ls->ls_recover_list_lock);
spin_lock_bh(&ls->ls_recover_list_lock);
if (list_empty(&r->res_recover_list)) {
list_add_tail(&r->res_recover_list, &ls->ls_recover_list);
ls->ls_recover_list_count++;
dlm_hold_rsb(r);
}
spin_unlock(&ls->ls_recover_list_lock);
spin_unlock_bh(&ls->ls_recover_list_lock);
}
static void recover_list_del(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
spin_lock(&ls->ls_recover_list_lock);
spin_lock_bh(&ls->ls_recover_list_lock);
list_del_init(&r->res_recover_list);
ls->ls_recover_list_count--;
spin_unlock(&ls->ls_recover_list_lock);
spin_unlock_bh(&ls->ls_recover_list_lock);
dlm_put_rsb(r);
}
......@@ -277,7 +277,7 @@ static void recover_list_clear(struct dlm_ls *ls)
{
struct dlm_rsb *r, *s;
spin_lock(&ls->ls_recover_list_lock);
spin_lock_bh(&ls->ls_recover_list_lock);
list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
list_del_init(&r->res_recover_list);
r->res_recover_locks_count = 0;
......@@ -290,17 +290,17 @@ static void recover_list_clear(struct dlm_ls *ls)
ls->ls_recover_list_count);
ls->ls_recover_list_count = 0;
}
spin_unlock(&ls->ls_recover_list_lock);
spin_unlock_bh(&ls->ls_recover_list_lock);
}
static int recover_idr_empty(struct dlm_ls *ls)
{
int empty = 1;
spin_lock(&ls->ls_recover_idr_lock);
spin_lock_bh(&ls->ls_recover_idr_lock);
if (ls->ls_recover_list_count)
empty = 0;
spin_unlock(&ls->ls_recover_idr_lock);
spin_unlock_bh(&ls->ls_recover_idr_lock);
return empty;
}
......@@ -310,7 +310,7 @@ static int recover_idr_add(struct dlm_rsb *r)
struct dlm_ls *ls = r->res_ls;
int rv;
spin_lock(&ls->ls_recover_idr_lock);
spin_lock_bh(&ls->ls_recover_idr_lock);
if (r->res_id) {
rv = -1;
goto out_unlock;
......@@ -324,7 +324,7 @@ static int recover_idr_add(struct dlm_rsb *r)
dlm_hold_rsb(r);
rv = 0;
out_unlock:
spin_unlock(&ls->ls_recover_idr_lock);
spin_unlock_bh(&ls->ls_recover_idr_lock);
return rv;
}
......@@ -332,11 +332,11 @@ static void recover_idr_del(struct dlm_rsb *r)
{
struct dlm_ls *ls = r->res_ls;
spin_lock(&ls->ls_recover_idr_lock);
spin_lock_bh(&ls->ls_recover_idr_lock);
idr_remove(&ls->ls_recover_idr, r->res_id);
r->res_id = 0;
ls->ls_recover_list_count--;
spin_unlock(&ls->ls_recover_idr_lock);
spin_unlock_bh(&ls->ls_recover_idr_lock);
dlm_put_rsb(r);
}
......@@ -345,9 +345,9 @@ static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
{
struct dlm_rsb *r;
spin_lock(&ls->ls_recover_idr_lock);
spin_lock_bh(&ls->ls_recover_idr_lock);
r = idr_find(&ls->ls_recover_idr, (int)id);
spin_unlock(&ls->ls_recover_idr_lock);
spin_unlock_bh(&ls->ls_recover_idr_lock);
return r;
}
......@@ -356,7 +356,7 @@ static void recover_idr_clear(struct dlm_ls *ls)
struct dlm_rsb *r;
int id;
spin_lock(&ls->ls_recover_idr_lock);
spin_lock_bh(&ls->ls_recover_idr_lock);
idr_for_each_entry(&ls->ls_recover_idr, r, id) {
idr_remove(&ls->ls_recover_idr, id);
......@@ -372,7 +372,7 @@ static void recover_idr_clear(struct dlm_ls *ls)
ls->ls_recover_list_count);
ls->ls_recover_list_count = 0;
}
spin_unlock(&ls->ls_recover_idr_lock);
spin_unlock_bh(&ls->ls_recover_idr_lock);
}
......@@ -887,7 +887,7 @@ void dlm_clear_toss(struct dlm_ls *ls)
int i;
for (i = 0; i < ls->ls_rsbtbl_size; i++) {
spin_lock(&ls->ls_rsbtbl[i].lock);
spin_lock_bh(&ls->ls_rsbtbl[i].lock);
for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
next = rb_next(n);
r = rb_entry(n, struct dlm_rsb, res_hashnode);
......@@ -895,7 +895,7 @@ void dlm_clear_toss(struct dlm_ls *ls)
dlm_free_rsb(r);
count++;
}
spin_unlock(&ls->ls_rsbtbl[i].lock);
spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
}
if (count)
......
......@@ -26,7 +26,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
struct dlm_rsb *r;
int i, error = 0;
write_lock(&ls->ls_masters_lock);
write_lock_bh(&ls->ls_masters_lock);
if (!list_empty(&ls->ls_masters_list)) {
log_error(ls, "root list not empty");
error = -EINVAL;
......@@ -46,7 +46,7 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
spin_unlock_bh(&ls->ls_rsbtbl[i].lock);
}
out:
write_unlock(&ls->ls_masters_lock);
write_unlock_bh(&ls->ls_masters_lock);
return error;
}
......@@ -54,12 +54,12 @@ static void dlm_release_masters_list(struct dlm_ls *ls)
{
struct dlm_rsb *r, *safe;
write_lock(&ls->ls_masters_lock);
write_lock_bh(&ls->ls_masters_lock);
list_for_each_entry_safe(r, safe, &ls->ls_masters_list, res_masters_list) {
list_del_init(&r->res_masters_list);
dlm_put_rsb(r);
}
write_unlock(&ls->ls_masters_lock);
write_unlock_bh(&ls->ls_masters_lock);
}
static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
......@@ -103,9 +103,9 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
{
int error = -EINTR;
write_lock(&ls->ls_recv_active);
write_lock_bh(&ls->ls_recv_active);
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
if (ls->ls_recover_seq == seq) {
set_bit(LSFL_RUNNING, &ls->ls_flags);
/* unblocks processes waiting to enter the dlm */
......@@ -113,9 +113,9 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
error = 0;
}
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
write_unlock(&ls->ls_recv_active);
write_unlock_bh(&ls->ls_recv_active);
return error;
}
......@@ -349,12 +349,12 @@ static void do_ls_recovery(struct dlm_ls *ls)
struct dlm_recover *rv = NULL;
int error;
spin_lock(&ls->ls_recover_lock);
spin_lock_bh(&ls->ls_recover_lock);
rv = ls->ls_recover_args;
ls->ls_recover_args = NULL;
if (rv && ls->ls_recover_seq == rv->seq)
clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
spin_unlock(&ls->ls_recover_lock);
spin_unlock_bh(&ls->ls_recover_lock);
if (rv) {
error = ls_recover(ls, rv);
......
......@@ -68,7 +68,7 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
struct dlm_message *ms;
int error = 0;
write_lock(&ls->ls_requestqueue_lock);
write_lock_bh(&ls->ls_requestqueue_lock);
for (;;) {
if (list_empty(&ls->ls_requestqueue)) {
clear_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags);
......@@ -96,11 +96,11 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
error = -EINTR;
break;
}
write_unlock(&ls->ls_requestqueue_lock);
write_unlock_bh(&ls->ls_requestqueue_lock);
schedule();
write_lock(&ls->ls_requestqueue_lock);
write_lock_bh(&ls->ls_requestqueue_lock);
}
write_unlock(&ls->ls_requestqueue_lock);
write_unlock_bh(&ls->ls_requestqueue_lock);
return error;
}
......@@ -135,7 +135,7 @@ void dlm_purge_requestqueue(struct dlm_ls *ls)
struct dlm_message *ms;
struct rq_entry *e, *safe;
write_lock(&ls->ls_requestqueue_lock);
write_lock_bh(&ls->ls_requestqueue_lock);
list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
ms = &e->request;
......@@ -144,6 +144,6 @@ void dlm_purge_requestqueue(struct dlm_ls *ls)
kfree(e);
}
}
write_unlock(&ls->ls_requestqueue_lock);
write_unlock_bh(&ls->ls_requestqueue_lock);
}
......@@ -189,7 +189,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
return;
ls = lkb->lkb_resource->res_ls;
spin_lock(&ls->ls_clear_proc_locks);
spin_lock_bh(&ls->ls_clear_proc_locks);
/* If ORPHAN/DEAD flag is set, it means the process is dead so an ast
can't be delivered. For ORPHAN's, dlm_clear_proc_locks() freed
......@@ -211,7 +211,7 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
if ((flags & DLM_CB_CAST) && lkb_is_endoflife(mode, status))
set_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags);
spin_lock(&proc->asts_spin);
spin_lock_bh(&proc->asts_spin);
rv = dlm_queue_lkb_callback(lkb, flags, mode, status, sbflags, &cb);
switch (rv) {
......@@ -232,23 +232,23 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
case DLM_ENQUEUE_CALLBACK_FAILURE:
fallthrough;
default:
spin_unlock(&proc->asts_spin);
spin_unlock_bh(&proc->asts_spin);
WARN_ON_ONCE(1);
goto out;
}
spin_unlock(&proc->asts_spin);
spin_unlock_bh(&proc->asts_spin);
if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
/* N.B. spin_lock locks_spin, not asts_spin */
spin_lock(&proc->locks_spin);
spin_lock_bh(&proc->locks_spin);
if (!list_empty(&lkb->lkb_ownqueue)) {
list_del_init(&lkb->lkb_ownqueue);
dlm_put_lkb(lkb);
}
spin_unlock(&proc->locks_spin);
spin_unlock_bh(&proc->locks_spin);
}
out:
spin_unlock(&ls->ls_clear_proc_locks);
spin_unlock_bh(&ls->ls_clear_proc_locks);
}
static int device_user_lock(struct dlm_user_proc *proc,
......@@ -817,10 +817,10 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
if (test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags))
return -EINVAL;
spin_lock(&proc->asts_spin);
spin_lock_bh(&proc->asts_spin);
if (list_empty(&proc->asts)) {
if (file->f_flags & O_NONBLOCK) {
spin_unlock(&proc->asts_spin);
spin_unlock_bh(&proc->asts_spin);
return -EAGAIN;
}
......@@ -829,16 +829,16 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
repeat:
set_current_state(TASK_INTERRUPTIBLE);
if (list_empty(&proc->asts) && !signal_pending(current)) {
spin_unlock(&proc->asts_spin);
spin_unlock_bh(&proc->asts_spin);
schedule();
spin_lock(&proc->asts_spin);
spin_lock_bh(&proc->asts_spin);
goto repeat;
}
set_current_state(TASK_RUNNING);
remove_wait_queue(&proc->wait, &wait);
if (signal_pending(current)) {
spin_unlock(&proc->asts_spin);
spin_unlock_bh(&proc->asts_spin);
return -ERESTARTSYS;
}
}
......@@ -849,7 +849,7 @@ static ssize_t device_read(struct file *file, char __user *buf, size_t count,
cb = list_first_entry(&proc->asts, struct dlm_callback, list);
list_del(&cb->list);
spin_unlock(&proc->asts_spin);
spin_unlock_bh(&proc->asts_spin);
if (cb->flags & DLM_CB_BAST) {
trace_dlm_bast(cb->ls_id, cb->lkb_id, cb->mode, cb->res_name,
......@@ -874,12 +874,12 @@ static __poll_t device_poll(struct file *file, poll_table *wait)
poll_wait(file, &proc->wait, wait);
spin_lock(&proc->asts_spin);
spin_lock_bh(&proc->asts_spin);
if (!list_empty(&proc->asts)) {
spin_unlock(&proc->asts_spin);
spin_unlock_bh(&proc->asts_spin);
return EPOLLIN | EPOLLRDNORM;
}
spin_unlock(&proc->asts_spin);
spin_unlock_bh(&proc->asts_spin);
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment