Commit 0a37714f authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'dlm-6.5' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:
 "The dlm posix lock handling (for gfs2) has three notable changes:

   - Local pids returned from GETLK are no longer negated. A previous
     patch negating remote pids mistakenly changed local pids also.

   - SETLKW operations can now be interrupted only when the process is
     killed, and not from other signals. General interruption was
     resulting in previously acquired locks being cleared, not just the
     in-progress lock. Handling this correctly will require extending a
     cancel capability to user space (a future feature.)

   - If multiple threads are requesting posix locks (with SETLKW), fix
     incorrect matching of results to the requests.

  The dlm networking has several minor cleanups, and one notable change:

   - Avoid delaying ack messages for too long (used for message
     reliability), resulting in a backlog of un-acked messages. These
     could previously be delayed as a result of either too many or too
     few other messages being sent. Now an upper and lower threshold is
     used to determine when an ack should be sent"

* tag 'dlm-6.5' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  fs: dlm: remove filter local comms on close
  fs: dlm: add send ack threshold and append acks to msgs
  fs: dlm: handle sequence numbers as atomic
  fs: dlm: handle lkb wait count as atomic_t
  fs: dlm: filter ourself midcomms calls
  fs: dlm: warn about messages from left nodes
  fs: dlm: move dlm_purge_lkb_callbacks to user module
  fs: dlm: cleanup STOP_IO bitflag set when stop io
  fs: dlm: don't check othercon twice
  fs: dlm: unregister memory at the very last
  fs: dlm: fix missing pending to false
  fs: dlm: clear pending bit when queue was empty
  fs: dlm: revert check required context while close
  fs: dlm: fix mismatch of plock results from userspace
  fs: dlm: make F_SETLK use unkillable wait_event
  fs: dlm: interrupt posix locks only when process is killed
  fs: dlm: fix cleanup pending ops when interrupted
  fs: dlm: return positive pid value for F_GETLK
  dlm: Replace all non-returning strlcpy with strscpy
parents 9e06150d fc4ea422
......@@ -36,23 +36,6 @@ void dlm_callback_set_last_ptr(struct dlm_callback **from,
*from = to;
}
void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb)
{
struct dlm_callback *cb, *safe;
list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) {
list_del(&cb->list);
kref_put(&cb->ref, dlm_release_callback);
}
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
/* invalidate */
dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL);
dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL);
lkb->lkb_last_bast_mode = -1;
}
int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags)
{
......@@ -181,10 +164,12 @@ void dlm_callback_work(struct work_struct *work)
spin_lock(&lkb->lkb_cb_lock);
rv = dlm_dequeue_lkb_callback(lkb, &cb);
if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY)) {
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
spin_unlock(&lkb->lkb_cb_lock);
if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY))
goto out;
}
spin_unlock(&lkb->lkb_cb_lock);
for (;;) {
castfn = lkb->lkb_astfn;
......
......@@ -26,7 +26,6 @@ void dlm_callback_set_last_ptr(struct dlm_callback **from,
struct dlm_callback *to);
void dlm_release_callback(struct kref *ref);
void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb);
void dlm_callback_work(struct work_struct *work);
int dlm_callback_start(struct dlm_ls *ls);
void dlm_callback_stop(struct dlm_ls *ls);
......
......@@ -246,7 +246,7 @@ struct dlm_lkb {
int8_t lkb_highbast; /* highest mode bast sent for */
int8_t lkb_wait_type; /* type of reply waiting for */
int8_t lkb_wait_count;
atomic_t lkb_wait_count;
int lkb_wait_nodeid; /* for debugging */
struct list_head lkb_statequeue; /* rsb g/c/w list */
......
......@@ -1407,6 +1407,7 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int error = 0;
int wc;
mutex_lock(&ls->ls_waiters_mutex);
......@@ -1428,20 +1429,17 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
error = -EBUSY;
goto out;
}
lkb->lkb_wait_count++;
wc = atomic_inc_return(&lkb->lkb_wait_count);
hold_lkb(lkb);
log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
lkb->lkb_id, lkb->lkb_wait_type, mstype,
lkb->lkb_wait_count, dlm_iflags_val(lkb));
lkb->lkb_id, lkb->lkb_wait_type, mstype, wc,
dlm_iflags_val(lkb));
goto out;
}
DLM_ASSERT(!lkb->lkb_wait_count,
dlm_print_lkb(lkb);
printk("wait_count %d\n", lkb->lkb_wait_count););
lkb->lkb_wait_count++;
wc = atomic_fetch_inc(&lkb->lkb_wait_count);
DLM_ASSERT(!wc, dlm_print_lkb(lkb); printk("wait_count %d\n", wc););
lkb->lkb_wait_type = mstype;
lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
hold_lkb(lkb);
......@@ -1504,7 +1502,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
lkb->lkb_id);
lkb->lkb_wait_type = 0;
lkb->lkb_wait_count--;
atomic_dec(&lkb->lkb_wait_count);
unhold_lkb(lkb);
goto out_del;
}
......@@ -1531,16 +1529,15 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
if (overlap_done && lkb->lkb_wait_type) {
log_error(ls, "remwait error %x reply %d wait_type %d overlap",
lkb->lkb_id, mstype, lkb->lkb_wait_type);
lkb->lkb_wait_count--;
atomic_dec(&lkb->lkb_wait_count);
unhold_lkb(lkb);
lkb->lkb_wait_type = 0;
}
DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
DLM_ASSERT(atomic_read(&lkb->lkb_wait_count), dlm_print_lkb(lkb););
clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
lkb->lkb_wait_count--;
if (!lkb->lkb_wait_count)
if (atomic_dec_and_test(&lkb->lkb_wait_count))
list_del_init(&lkb->lkb_wait_reply);
unhold_lkb(lkb);
return 0;
......@@ -2669,7 +2666,7 @@ static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
goto out;
/* lock not allowed if there's any op in progress */
if (lkb->lkb_wait_type || lkb->lkb_wait_count)
if (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count))
goto out;
if (is_overlap(lkb))
......@@ -2731,7 +2728,7 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
/* normal unlock not allowed if there's any op in progress */
if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
(lkb->lkb_wait_type || lkb->lkb_wait_count))
(lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count)))
goto out;
/* an lkb may be waiting for an rsb lookup to complete where the
......@@ -4616,7 +4613,7 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
{
int error = 0, noent = 0;
if (!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid))) {
if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
log_limit(ls, "receive %d from non-member %d %x %x %d",
le32_to_cpu(ms->m_type),
le32_to_cpu(ms->m_header.h_nodeid),
......@@ -4754,7 +4751,7 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
/* If we were a member of this lockspace, left, and rejoined,
other nodes may still be sending us messages from the
lockspace generation before we left. */
if (!ls->ls_generation) {
if (WARN_ON_ONCE(!ls->ls_generation)) {
log_limit(ls, "receive %d from %d ignore old gen",
le32_to_cpu(ms->m_type), nodeid);
return;
......@@ -5066,10 +5063,9 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
/* drop all wait_count references we still
* hold a reference for this iteration.
*/
while (lkb->lkb_wait_count) {
lkb->lkb_wait_count--;
while (!atomic_dec_and_test(&lkb->lkb_wait_count))
unhold_lkb(lkb);
}
mutex_lock(&ls->ls_waiters_mutex);
list_del_init(&lkb->lkb_wait_reply);
mutex_unlock(&ls->ls_waiters_mutex);
......
......@@ -935,15 +935,3 @@ void dlm_stop_lockspaces(void)
log_print("dlm user daemon left %d lockspaces", count);
}
void dlm_stop_lockspaces_check(void)
{
struct dlm_ls *ls;
spin_lock(&lslist_lock);
list_for_each_entry(ls, &lslist, ls_list) {
if (WARN_ON(!rwsem_is_locked(&ls->ls_in_recovery) ||
!dlm_locking_stopped(ls)))
break;
}
spin_unlock(&lslist_lock);
}
......@@ -27,7 +27,6 @@ struct dlm_ls *dlm_find_lockspace_local(void *id);
struct dlm_ls *dlm_find_lockspace_device(int minor);
void dlm_put_lockspace(struct dlm_ls *ls);
void dlm_stop_lockspaces(void);
void dlm_stop_lockspaces_check(void);
int dlm_new_user_lockspace(const char *name, const char *cluster,
uint32_t flags, int lvblen,
const struct dlm_lockspace_ops *ops,
......
......@@ -546,9 +546,6 @@ int dlm_lowcomms_connect_node(int nodeid)
struct connection *con;
int idx;
if (nodeid == dlm_our_nodeid())
return 0;
idx = srcu_read_lock(&connections_srcu);
con = nodeid2con(nodeid, 0);
if (WARN_ON_ONCE(!con)) {
......@@ -735,19 +732,15 @@ static void stop_connection_io(struct connection *con)
if (con->othercon)
stop_connection_io(con->othercon);
spin_lock_bh(&con->writequeue_lock);
set_bit(CF_IO_STOP, &con->flags);
spin_unlock_bh(&con->writequeue_lock);
down_write(&con->sock_lock);
if (con->sock) {
lock_sock(con->sock->sk);
restore_callbacks(con->sock->sk);
spin_lock_bh(&con->writequeue_lock);
set_bit(CF_IO_STOP, &con->flags);
spin_unlock_bh(&con->writequeue_lock);
release_sock(con->sock->sk);
} else {
spin_lock_bh(&con->writequeue_lock);
set_bit(CF_IO_STOP, &con->flags);
spin_unlock_bh(&con->writequeue_lock);
}
up_write(&con->sock_lock);
......@@ -867,30 +860,8 @@ struct dlm_processed_nodes {
struct list_head list;
};
static void add_processed_node(int nodeid, struct list_head *processed_nodes)
{
struct dlm_processed_nodes *n;
list_for_each_entry(n, processed_nodes, list) {
/* we already remembered this node */
if (n->nodeid == nodeid)
return;
}
/* if it's fails in worst case we simple don't send an ack back.
* We try it next time.
*/
n = kmalloc(sizeof(*n), GFP_NOFS);
if (!n)
return;
n->nodeid = nodeid;
list_add(&n->list, processed_nodes);
}
static void process_dlm_messages(struct work_struct *work)
{
struct dlm_processed_nodes *n, *n_tmp;
struct processqueue_entry *pentry;
LIST_HEAD(processed_nodes);
......@@ -898,6 +869,7 @@ static void process_dlm_messages(struct work_struct *work)
pentry = list_first_entry_or_null(&processqueue,
struct processqueue_entry, list);
if (WARN_ON_ONCE(!pentry)) {
process_dlm_messages_pending = false;
spin_unlock(&processqueue_lock);
return;
}
......@@ -908,7 +880,6 @@ static void process_dlm_messages(struct work_struct *work)
for (;;) {
dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
pentry->buflen);
add_processed_node(pentry->nodeid, &processed_nodes);
free_processqueue_entry(pentry);
spin_lock(&processqueue_lock);
......@@ -923,13 +894,6 @@ static void process_dlm_messages(struct work_struct *work)
list_del(&pentry->list);
spin_unlock(&processqueue_lock);
}
/* send ack back after we processed couple of messages */
list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) {
list_del(&n->list);
dlm_midcomms_receive_done(n->nodeid);
kfree(n);
}
}
/* Data received from remote end */
......@@ -1500,7 +1464,6 @@ int dlm_lowcomms_close(int nodeid)
call_srcu(&connections_srcu, &con->rcu, connection_release);
if (con->othercon) {
clean_one_writequeue(con->othercon);
if (con->othercon)
call_srcu(&connections_srcu, &con->othercon->rcu, connection_release);
}
srcu_read_unlock(&connections_srcu, idx);
......
......@@ -73,10 +73,10 @@ static void __exit exit_dlm(void)
dlm_plock_exit();
dlm_user_exit();
dlm_config_exit();
dlm_memory_exit();
dlm_lockspace_exit();
dlm_midcomms_exit();
dlm_unregister_debugfs();
dlm_memory_exit();
}
module_init(init_dlm);
......
......@@ -307,6 +307,21 @@ static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
}
}
static int add_remote_member(int nodeid)
{
int error;
if (nodeid == dlm_our_nodeid())
return 0;
error = dlm_lowcomms_connect_node(nodeid);
if (error < 0)
return error;
dlm_midcomms_add_member(nodeid);
return 0;
}
static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
{
struct dlm_member *memb;
......@@ -316,16 +331,16 @@ static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
if (!memb)
return -ENOMEM;
error = dlm_lowcomms_connect_node(node->nodeid);
memb->nodeid = node->nodeid;
memb->weight = node->weight;
memb->comm_seq = node->comm_seq;
error = add_remote_member(node->nodeid);
if (error < 0) {
kfree(memb);
return error;
}
memb->nodeid = node->nodeid;
memb->weight = node->weight;
memb->comm_seq = node->comm_seq;
dlm_midcomms_add_member(node->nodeid);
add_ordered_member(ls, memb);
ls->ls_num_nodes++;
return 0;
......@@ -370,11 +385,19 @@ static void clear_memb_list(struct list_head *head,
}
}
static void clear_members_cb(int nodeid)
static void remove_remote_member(int nodeid)
{
if (nodeid == dlm_our_nodeid())
return;
dlm_midcomms_remove_member(nodeid);
}
static void clear_members_cb(int nodeid)
{
remove_remote_member(nodeid);
}
void dlm_clear_members(struct dlm_ls *ls)
{
clear_memb_list(&ls->ls_nodes, clear_members_cb);
......@@ -562,7 +585,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
neg++;
list_move(&memb->list, &ls->ls_nodes_gone);
dlm_midcomms_remove_member(memb->nodeid);
remove_remote_member(memb->nodeid);
ls->ls_num_nodes--;
dlm_lsop_recover_slot(ls, memb);
}
......
......@@ -136,7 +136,6 @@
#include <net/tcp.h>
#include "dlm_internal.h"
#include "lockspace.h"
#include "lowcomms.h"
#include "config.h"
#include "memory.h"
......@@ -149,12 +148,14 @@
/* 5 seconds wait to sync ending of dlm */
#define DLM_SHUTDOWN_TIMEOUT msecs_to_jiffies(5000)
#define DLM_VERSION_NOT_SET 0
#define DLM_SEND_ACK_BACK_MSG_THRESHOLD 32
#define DLM_RECV_ACK_BACK_MSG_THRESHOLD (DLM_SEND_ACK_BACK_MSG_THRESHOLD * 8)
struct midcomms_node {
int nodeid;
uint32_t version;
uint32_t seq_send;
uint32_t seq_next;
atomic_t seq_send;
atomic_t seq_next;
/* These queues are unbound because we cannot drop any message in dlm.
* We could send a fence signal for a specific node to the cluster
* manager if queues hits some maximum value, however this handling
......@@ -166,7 +167,7 @@ struct midcomms_node {
#define DLM_NODE_FLAG_CLOSE 1
#define DLM_NODE_FLAG_STOP_TX 2
#define DLM_NODE_FLAG_STOP_RX 3
#define DLM_NODE_ULP_DELIVERED 4
atomic_t ulp_delivered;
unsigned long flags;
wait_queue_head_t shutdown_wait;
......@@ -318,8 +319,9 @@ static void midcomms_node_reset(struct midcomms_node *node)
{
pr_debug("reset node %d\n", node->nodeid);
node->seq_next = DLM_SEQ_INIT;
node->seq_send = DLM_SEQ_INIT;
atomic_set(&node->seq_next, DLM_SEQ_INIT);
atomic_set(&node->seq_send, DLM_SEQ_INIT);
atomic_set(&node->ulp_delivered, 0);
node->version = DLM_VERSION_NOT_SET;
node->flags = 0;
......@@ -394,6 +396,28 @@ static int dlm_send_ack(int nodeid, uint32_t seq)
return 0;
}
static void dlm_send_ack_threshold(struct midcomms_node *node,
uint32_t threshold)
{
uint32_t oval, nval;
bool send_ack;
/* let only send one user trigger threshold to send ack back */
do {
oval = atomic_read(&node->ulp_delivered);
send_ack = (oval > threshold);
/* abort if threshold is not reached */
if (!send_ack)
break;
nval = 0;
/* try to reset ulp_delivered counter */
} while (atomic_cmpxchg(&node->ulp_delivered, oval, nval) != oval);
if (send_ack)
dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
}
static int dlm_send_fin(struct midcomms_node *node,
void (*ack_rcv)(struct midcomms_node *node))
{
......@@ -493,9 +517,19 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
struct midcomms_node *node,
uint32_t seq)
{
if (seq == node->seq_next) {
node->seq_next++;
bool is_expected_seq;
uint32_t oval, nval;
do {
oval = atomic_read(&node->seq_next);
is_expected_seq = (oval == seq);
if (!is_expected_seq)
break;
nval = oval + 1;
} while (atomic_cmpxchg(&node->seq_next, oval, nval) != oval);
if (is_expected_seq) {
switch (p->header.h_cmd) {
case DLM_FIN:
spin_lock(&node->state_lock);
......@@ -504,7 +538,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
switch (node->state) {
case DLM_ESTABLISHED:
dlm_send_ack(node->nodeid, node->seq_next);
dlm_send_ack(node->nodeid, nval);
/* passive shutdown DLM_LAST_ACK case 1
* additional we check if the node is used by
......@@ -523,14 +557,14 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
}
break;
case DLM_FIN_WAIT1:
dlm_send_ack(node->nodeid, node->seq_next);
dlm_send_ack(node->nodeid, nval);
node->state = DLM_CLOSING;
set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
pr_debug("switch node %d to state %s\n",
node->nodeid, dlm_state_str(node->state));
break;
case DLM_FIN_WAIT2:
dlm_send_ack(node->nodeid, node->seq_next);
dlm_send_ack(node->nodeid, nval);
midcomms_node_reset(node);
pr_debug("switch node %d to state %s\n",
node->nodeid, dlm_state_str(node->state));
......@@ -551,18 +585,20 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
dlm_receive_buffer_3_2_trace(seq, p);
dlm_receive_buffer(p, node->nodeid);
set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
atomic_inc(&node->ulp_delivered);
/* unlikely case to send ack back when we don't transmit */
dlm_send_ack_threshold(node, DLM_RECV_ACK_BACK_MSG_THRESHOLD);
break;
}
} else {
/* retry to ack message which we already have by sending back
* current node->seq_next number as ack.
*/
if (seq < node->seq_next)
dlm_send_ack(node->nodeid, node->seq_next);
if (seq < oval)
dlm_send_ack(node->nodeid, oval);
log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d",
seq, node->seq_next, node->nodeid);
seq, oval, node->nodeid);
}
}
......@@ -960,49 +996,6 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
return ret;
}
void dlm_midcomms_receive_done(int nodeid)
{
struct midcomms_node *node;
int idx;
idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, 0);
if (!node) {
srcu_read_unlock(&nodes_srcu, idx);
return;
}
/* old protocol, we do nothing */
switch (node->version) {
case DLM_VERSION_3_2:
break;
default:
srcu_read_unlock(&nodes_srcu, idx);
return;
}
/* do nothing if we didn't delivered stateful to ulp */
if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
&node->flags)) {
srcu_read_unlock(&nodes_srcu, idx);
return;
}
spin_lock(&node->state_lock);
/* we only ack if state is ESTABLISHED */
switch (node->state) {
case DLM_ESTABLISHED:
spin_unlock(&node->state_lock);
dlm_send_ack(node->nodeid, node->seq_next);
break;
default:
spin_unlock(&node->state_lock);
/* do nothing FIN has it's own ack send */
break;
}
srcu_read_unlock(&nodes_srcu, idx);
}
void dlm_midcomms_unack_msg_resend(int nodeid)
{
struct midcomms_node *node;
......@@ -1059,7 +1052,7 @@ static void midcomms_new_msg_cb(void *data)
list_add_tail_rcu(&mh->list, &mh->node->send_queue);
spin_unlock_bh(&mh->node->send_queue_lock);
mh->seq = mh->node->seq_send++;
mh->seq = atomic_fetch_inc(&mh->node->seq_send);
}
static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid,
......@@ -1133,6 +1126,8 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
goto err;
}
/* send ack back if necessary */
dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD);
break;
default:
dlm_free_mhandle(mh);
......@@ -1281,9 +1276,6 @@ void dlm_midcomms_add_member(int nodeid)
struct midcomms_node *node;
int idx;
if (nodeid == dlm_our_nodeid())
return;
idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, GFP_NOFS);
if (!node) {
......@@ -1329,9 +1321,6 @@ void dlm_midcomms_remove_member(int nodeid)
struct midcomms_node *node;
int idx;
if (nodeid == dlm_our_nodeid())
return;
idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, 0);
if (!node) {
......@@ -1488,11 +1477,6 @@ int dlm_midcomms_close(int nodeid)
struct midcomms_node *node;
int idx, ret;
if (nodeid == dlm_our_nodeid())
return 0;
dlm_stop_lockspaces_check();
idx = srcu_read_lock(&nodes_srcu);
/* Abort pending close/remove operation */
node = nodeid2node(nodeid, 0);
......@@ -1542,7 +1526,7 @@ static void midcomms_new_rawmsg_cb(void *data)
switch (h->h_cmd) {
case DLM_OPTS:
if (!h->u.h_seq)
h->u.h_seq = cpu_to_le32(rd->node->seq_send++);
h->u.h_seq = cpu_to_le32(atomic_fetch_inc(&rd->node->seq_send));
break;
default:
break;
......
......@@ -30,8 +30,6 @@ struct plock_async_data {
struct plock_op {
struct list_head list;
int done;
/* if lock op got interrupted while waiting dlm_controld reply */
bool sigint;
struct dlm_plock_info info;
/* if set indicates async handling */
struct plock_async_data *data;
......@@ -157,7 +155,8 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file,
send_op(op);
rv = wait_event_interruptible(recv_wq, (op->done != 0));
if (op->info.wait) {
rv = wait_event_killable(recv_wq, (op->done != 0));
if (rv == -ERESTARTSYS) {
spin_lock(&ops_lock);
/* recheck under ops_lock if we got a done != 0,
......@@ -167,14 +166,19 @@ int dlm_posix_lock(dlm_lockspace_t *lockspace, u64 number, struct file *file,
spin_unlock(&ops_lock);
goto do_lock_wait;
}
op->sigint = true;
list_del(&op->list);
spin_unlock(&ops_lock);
log_debug(ls, "%s: wait interrupted %x %llx pid %d",
__func__, ls->ls_global_id,
(unsigned long long)number, op->info.pid);
do_unlock_close(&op->info);
dlm_release_plock_op(op);
goto out;
}
} else {
wait_event(recv_wq, (op->done != 0));
}
do_lock_wait:
......@@ -360,7 +364,9 @@ int dlm_posix_get(dlm_lockspace_t *lockspace, u64 number, struct file *file,
locks_init_lock(fl);
fl->fl_type = (op->info.ex) ? F_WRLCK : F_RDLCK;
fl->fl_flags = FL_POSIX;
fl->fl_pid = -op->info.pid;
fl->fl_pid = op->info.pid;
if (op->info.nodeid != dlm_our_nodeid())
fl->fl_pid = -fl->fl_pid;
fl->fl_start = op->info.start;
fl->fl_end = op->info.end;
rv = 0;
......@@ -389,7 +395,7 @@ static ssize_t dev_read(struct file *file, char __user *u, size_t count,
if (op->info.flags & DLM_PLOCK_FL_CLOSE)
list_del(&op->list);
else
list_move(&op->list, &recv_list);
list_move_tail(&op->list, &recv_list);
memcpy(&info, &op->info, sizeof(info));
}
spin_unlock(&ops_lock);
......@@ -427,34 +433,53 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count,
if (check_version(&info))
return -EINVAL;
/*
* The results for waiting ops (SETLKW) can be returned in any
* order, so match all fields to find the op. The results for
* non-waiting ops are returned in the order that they were sent
* to userspace, so match the result with the first non-waiting op.
*/
spin_lock(&ops_lock);
if (info.wait) {
list_for_each_entry(iter, &recv_list, list) {
if (iter->info.fsid == info.fsid &&
iter->info.number == info.number &&
iter->info.owner == info.owner) {
if (iter->sigint) {
list_del(&iter->list);
spin_unlock(&ops_lock);
pr_debug("%s: sigint cleanup %x %llx pid %d",
__func__, iter->info.fsid,
(unsigned long long)iter->info.number,
iter->info.pid);
do_unlock_close(&iter->info);
memcpy(&iter->info, &info, sizeof(info));
dlm_release_plock_op(iter);
return count;
iter->info.owner == info.owner &&
iter->info.pid == info.pid &&
iter->info.start == info.start &&
iter->info.end == info.end &&
iter->info.ex == info.ex &&
iter->info.wait) {
op = iter;
break;
}
list_del_init(&iter->list);
memcpy(&iter->info, &info, sizeof(info));
if (iter->data)
do_callback = 1;
else
iter->done = 1;
}
} else {
list_for_each_entry(iter, &recv_list, list) {
if (!iter->info.wait) {
op = iter;
break;
}
}
}
if (op) {
/* Sanity check that op and info match. */
if (info.wait)
WARN_ON(op->info.optype != DLM_PLOCK_OP_LOCK);
else
WARN_ON(op->info.fsid != info.fsid ||
op->info.number != info.number ||
op->info.owner != info.owner ||
op->info.optype != info.optype);
list_del_init(&op->list);
memcpy(&op->info, &info, sizeof(info));
if (op->data)
do_callback = 1;
else
op->done = 1;
}
spin_unlock(&ops_lock);
if (op) {
......@@ -463,7 +488,7 @@ static ssize_t dev_write(struct file *file, const char __user *u, size_t count,
else
wake_up(&recv_wq);
} else
log_print("%s: no op %x %llx", __func__,
pr_debug("%s: no op %x %llx", __func__,
info.fsid, (unsigned long long)info.number);
return count;
}
......
......@@ -145,6 +145,24 @@ static void compat_output(struct dlm_lock_result *res,
}
#endif
/* should held proc->asts_spin lock */
void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb)
{
struct dlm_callback *cb, *safe;
list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) {
list_del(&cb->list);
kref_put(&cb->ref, dlm_release_callback);
}
clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
/* invalidate */
dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL);
dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL);
lkb->lkb_last_bast_mode = -1;
}
/* Figure out if this lock is at the end of its life and no longer
available for the application to use. The lkb still exists until
the final ast is read. A lock becomes EOL in three situations:
......
......@@ -6,6 +6,7 @@
#ifndef __USER_DOT_H__
#define __USER_DOT_H__
void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb);
void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
int status, uint32_t sbflags);
int dlm_user_init(void);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment