Commit f0b60bfa authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'dlm-4.15' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:
 "This set focuses, as usual, on fixes to the comms layer.

  New testing of the dlm with ocfs2 uncovered a number of bugs in the
  TCP connection handling during recovery, starting, and stopping"

* tag 'dlm-4.15' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  dlm: remove dlm_send_rcom_lookup_dump
  dlm: recheck kthread_should_stop() before schedule()
  DLM: fix NULL pointer dereference in send_to_sock()
  DLM: fix to reschedule rwork
  DLM: fix to use sk_callback_lock correctly
  DLM: fix overflow dlm_cb_seq
  DLM: fix memory leak in tcp_accept_from_sock()
  DLM: fix conversion deadlock when DLM_LKF_NODLCKWT flag is set
  DLM: use CF_CLOSE flag to stop dlm_send correctly
  DLM: Reanimate CF_WRITE_PENDING flag
  DLM: fix race condition between dlm_recoverd_stop and dlm_recoverd
  DLM: close othercon at send/receive error
  DLM: retry rcom when dlm_wait_function is timed out.
  DLM: fix to use sock_mutex correctly in xxx_accept_from_sock
  DLM: fix race condition between dlm_send and dlm_recv
  DLM: fix double list_del()
  DLM: fix remove save_cb argument from add_sock()
  DLM: Fix saving of NULL callbacks
  DLM: Eliminate CF_WRITE_PENDING flag
  DLM: Eliminate CF_CONNECT_PENDING flag
parents 29309a4e 9250e523
...@@ -181,6 +181,8 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, ...@@ -181,6 +181,8 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
spin_lock(&dlm_cb_seq_spin); spin_lock(&dlm_cb_seq_spin);
new_seq = ++dlm_cb_seq; new_seq = ++dlm_cb_seq;
if (!dlm_cb_seq)
new_seq = ++dlm_cb_seq;
spin_unlock(&dlm_cb_seq_spin); spin_unlock(&dlm_cb_seq_spin);
if (lkb->lkb_flags & DLM_IFL_USER) { if (lkb->lkb_flags & DLM_IFL_USER) {
......
...@@ -1003,7 +1003,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, ...@@ -1003,7 +1003,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
if (r->res_master_nodeid == our_nodeid) { if (r->res_master_nodeid == our_nodeid) {
log_error(ls, "from_master %d our_master", from_nodeid); log_error(ls, "from_master %d our_master", from_nodeid);
dlm_dump_rsb(r); dlm_dump_rsb(r);
dlm_send_rcom_lookup_dump(r, from_nodeid);
goto out_found; goto out_found;
} }
...@@ -2465,14 +2464,12 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, ...@@ -2465,14 +2464,12 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) { if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
lkb->lkb_grmode = DLM_LOCK_NL; lkb->lkb_grmode = DLM_LOCK_NL;
lkb->lkb_sbflags |= DLM_SBF_DEMOTED; lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
} else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { } else if (err) {
if (err) *err = -EDEADLK;
*err = -EDEADLK; } else {
else { log_print("can_be_granted deadlock %x now %d",
log_print("can_be_granted deadlock %x now %d", lkb->lkb_id, now);
lkb->lkb_id, now); dlm_dump_rsb(r);
dlm_dump_rsb(r);
}
} }
goto out; goto out;
} }
...@@ -2501,13 +2498,6 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, ...@@ -2501,13 +2498,6 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
return rv; return rv;
} }
/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
for locks pending on the convert list. Once verified (watch for these
log_prints), we should be able to just call _can_be_granted() and not
bother with the demote/deadlk cases here (and there's no easy way to deal
with a deadlk here, we'd have to generate something like grant_lock with
the deadlk error.) */
/* Returns the highest requested mode of all blocked conversions; sets /* Returns the highest requested mode of all blocked conversions; sets
cw if there's a blocked conversion to DLM_LOCK_CW. */ cw if there's a blocked conversion to DLM_LOCK_CW. */
...@@ -2545,9 +2535,22 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw, ...@@ -2545,9 +2535,22 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
} }
if (deadlk) { if (deadlk) {
log_print("WARN: pending deadlock %x node %d %s", /*
lkb->lkb_id, lkb->lkb_nodeid, r->res_name); * If DLM_LKB_NODLKWT flag is set and conversion
dlm_dump_rsb(r); * deadlock is detected, we request blocking AST and
* down (or cancel) conversion.
*/
if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
if (lkb->lkb_highbast < lkb->lkb_rqmode) {
queue_bast(r, lkb, lkb->lkb_rqmode);
lkb->lkb_highbast = lkb->lkb_rqmode;
}
} else {
log_print("WARN: pending deadlock %x node %d %s",
lkb->lkb_id, lkb->lkb_nodeid,
r->res_name);
dlm_dump_rsb(r);
}
continue; continue;
} }
...@@ -3123,7 +3126,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) ...@@ -3123,7 +3126,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
deadlock, so we leave it on the granted queue and return EDEADLK in deadlock, so we leave it on the granted queue and return EDEADLK in
the ast for the convert. */ the ast for the convert. */
if (deadlk) { if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
/* it's left on the granted queue */ /* it's left on the granted queue */
revert_lock(r, lkb); revert_lock(r, lkb);
queue_cast(r, lkb, -EDEADLK); queue_cast(r, lkb, -EDEADLK);
......
...@@ -107,11 +107,11 @@ struct connection { ...@@ -107,11 +107,11 @@ struct connection {
unsigned long flags; unsigned long flags;
#define CF_READ_PENDING 1 #define CF_READ_PENDING 1
#define CF_WRITE_PENDING 2 #define CF_WRITE_PENDING 2
#define CF_CONNECT_PENDING 3
#define CF_INIT_PENDING 4 #define CF_INIT_PENDING 4
#define CF_IS_OTHERCON 5 #define CF_IS_OTHERCON 5
#define CF_CLOSE 6 #define CF_CLOSE 6
#define CF_APP_LIMITED 7 #define CF_APP_LIMITED 7
#define CF_CLOSING 8
struct list_head writequeue; /* List of outgoing writequeue_entries */ struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock; spinlock_t writequeue_lock;
int (*rx_action) (struct connection *); /* What to do when active */ int (*rx_action) (struct connection *); /* What to do when active */
...@@ -124,10 +124,6 @@ struct connection { ...@@ -124,10 +124,6 @@ struct connection {
struct connection *othercon; struct connection *othercon;
struct work_struct rwork; /* Receive workqueue */ struct work_struct rwork; /* Receive workqueue */
struct work_struct swork; /* Send workqueue */ struct work_struct swork; /* Send workqueue */
void (*orig_error_report)(struct sock *);
void (*orig_data_ready)(struct sock *);
void (*orig_state_change)(struct sock *);
void (*orig_write_space)(struct sock *);
}; };
#define sock2con(x) ((struct connection *)(x)->sk_user_data) #define sock2con(x) ((struct connection *)(x)->sk_user_data)
...@@ -150,6 +146,13 @@ struct dlm_node_addr { ...@@ -150,6 +146,13 @@ struct dlm_node_addr {
struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
}; };
static struct listen_sock_callbacks {
void (*sk_error_report)(struct sock *);
void (*sk_data_ready)(struct sock *);
void (*sk_state_change)(struct sock *);
void (*sk_write_space)(struct sock *);
} listen_sock;
static LIST_HEAD(dlm_node_addrs); static LIST_HEAD(dlm_node_addrs);
static DEFINE_SPINLOCK(dlm_node_addrs_spin); static DEFINE_SPINLOCK(dlm_node_addrs_spin);
...@@ -408,17 +411,23 @@ int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) ...@@ -408,17 +411,23 @@ int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
/* Data available on socket or listen socket received a connect */ /* Data available on socket or listen socket received a connect */
static void lowcomms_data_ready(struct sock *sk) static void lowcomms_data_ready(struct sock *sk)
{ {
struct connection *con = sock2con(sk); struct connection *con;
read_lock_bh(&sk->sk_callback_lock);
con = sock2con(sk);
if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
queue_work(recv_workqueue, &con->rwork); queue_work(recv_workqueue, &con->rwork);
read_unlock_bh(&sk->sk_callback_lock);
} }
static void lowcomms_write_space(struct sock *sk) static void lowcomms_write_space(struct sock *sk)
{ {
struct connection *con = sock2con(sk); struct connection *con;
read_lock_bh(&sk->sk_callback_lock);
con = sock2con(sk);
if (!con) if (!con)
return; goto out;
clear_bit(SOCK_NOSPACE, &con->sock->flags); clear_bit(SOCK_NOSPACE, &con->sock->flags);
...@@ -427,16 +436,17 @@ static void lowcomms_write_space(struct sock *sk) ...@@ -427,16 +436,17 @@ static void lowcomms_write_space(struct sock *sk)
clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
} }
if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) queue_work(send_workqueue, &con->swork);
queue_work(send_workqueue, &con->swork); out:
read_unlock_bh(&sk->sk_callback_lock);
} }
static inline void lowcomms_connect_sock(struct connection *con) static inline void lowcomms_connect_sock(struct connection *con)
{ {
if (test_bit(CF_CLOSE, &con->flags)) if (test_bit(CF_CLOSE, &con->flags))
return; return;
if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) queue_work(send_workqueue, &con->swork);
queue_work(send_workqueue, &con->swork); cond_resched();
} }
static void lowcomms_state_change(struct sock *sk) static void lowcomms_state_change(struct sock *sk)
...@@ -480,7 +490,7 @@ static void lowcomms_error_report(struct sock *sk) ...@@ -480,7 +490,7 @@ static void lowcomms_error_report(struct sock *sk)
if (con == NULL) if (con == NULL)
goto out; goto out;
orig_report = con->orig_error_report; orig_report = listen_sock.sk_error_report;
if (con->sock == NULL || if (con->sock == NULL ||
kernel_getpeername(con->sock, (struct sockaddr *)&saddr, &buflen)) { kernel_getpeername(con->sock, (struct sockaddr *)&saddr, &buflen)) {
printk_ratelimited(KERN_ERR "dlm: node %d: socket error " printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
...@@ -517,27 +527,31 @@ static void lowcomms_error_report(struct sock *sk) ...@@ -517,27 +527,31 @@ static void lowcomms_error_report(struct sock *sk)
} }
/* Note: sk_callback_lock must be locked before calling this function. */ /* Note: sk_callback_lock must be locked before calling this function. */
static void save_callbacks(struct connection *con, struct sock *sk) static void save_listen_callbacks(struct socket *sock)
{ {
con->orig_data_ready = sk->sk_data_ready; struct sock *sk = sock->sk;
con->orig_state_change = sk->sk_state_change;
con->orig_write_space = sk->sk_write_space; listen_sock.sk_data_ready = sk->sk_data_ready;
con->orig_error_report = sk->sk_error_report; listen_sock.sk_state_change = sk->sk_state_change;
listen_sock.sk_write_space = sk->sk_write_space;
listen_sock.sk_error_report = sk->sk_error_report;
} }
static void restore_callbacks(struct connection *con, struct sock *sk) static void restore_callbacks(struct socket *sock)
{ {
struct sock *sk = sock->sk;
write_lock_bh(&sk->sk_callback_lock); write_lock_bh(&sk->sk_callback_lock);
sk->sk_user_data = NULL; sk->sk_user_data = NULL;
sk->sk_data_ready = con->orig_data_ready; sk->sk_data_ready = listen_sock.sk_data_ready;
sk->sk_state_change = con->orig_state_change; sk->sk_state_change = listen_sock.sk_state_change;
sk->sk_write_space = con->orig_write_space; sk->sk_write_space = listen_sock.sk_write_space;
sk->sk_error_report = con->orig_error_report; sk->sk_error_report = listen_sock.sk_error_report;
write_unlock_bh(&sk->sk_callback_lock); write_unlock_bh(&sk->sk_callback_lock);
} }
/* Make a socket active */ /* Make a socket active */
static void add_sock(struct socket *sock, struct connection *con, bool save_cb) static void add_sock(struct socket *sock, struct connection *con)
{ {
struct sock *sk = sock->sk; struct sock *sk = sock->sk;
...@@ -545,8 +559,6 @@ static void add_sock(struct socket *sock, struct connection *con, bool save_cb) ...@@ -545,8 +559,6 @@ static void add_sock(struct socket *sock, struct connection *con, bool save_cb)
con->sock = sock; con->sock = sock;
sk->sk_user_data = con; sk->sk_user_data = con;
if (save_cb)
save_callbacks(con, sk);
/* Install a data_ready callback */ /* Install a data_ready callback */
sk->sk_data_ready = lowcomms_data_ready; sk->sk_data_ready = lowcomms_data_ready;
sk->sk_write_space = lowcomms_write_space; sk->sk_write_space = lowcomms_write_space;
...@@ -579,17 +591,20 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, ...@@ -579,17 +591,20 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
static void close_connection(struct connection *con, bool and_other, static void close_connection(struct connection *con, bool and_other,
bool tx, bool rx) bool tx, bool rx)
{ {
clear_bit(CF_CONNECT_PENDING, &con->flags); bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
clear_bit(CF_WRITE_PENDING, &con->flags);
if (tx && cancel_work_sync(&con->swork)) if (tx && !closing && cancel_work_sync(&con->swork)) {
log_print("canceled swork for node %d", con->nodeid); log_print("canceled swork for node %d", con->nodeid);
if (rx && cancel_work_sync(&con->rwork)) clear_bit(CF_WRITE_PENDING, &con->flags);
}
if (rx && !closing && cancel_work_sync(&con->rwork)) {
log_print("canceled rwork for node %d", con->nodeid); log_print("canceled rwork for node %d", con->nodeid);
clear_bit(CF_READ_PENDING, &con->flags);
}
mutex_lock(&con->sock_mutex); mutex_lock(&con->sock_mutex);
if (con->sock) { if (con->sock) {
if (!test_bit(CF_IS_OTHERCON, &con->flags)) restore_callbacks(con->sock);
restore_callbacks(con, con->sock->sk);
sock_release(con->sock); sock_release(con->sock);
con->sock = NULL; con->sock = NULL;
} }
...@@ -604,6 +619,7 @@ static void close_connection(struct connection *con, bool and_other, ...@@ -604,6 +619,7 @@ static void close_connection(struct connection *con, bool and_other,
con->retries = 0; con->retries = 0;
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
clear_bit(CF_CLOSING, &con->flags);
} }
/* Data received from remote end */ /* Data received from remote end */
...@@ -700,7 +716,7 @@ static int receive_from_sock(struct connection *con) ...@@ -700,7 +716,7 @@ static int receive_from_sock(struct connection *con)
out_close: out_close:
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
if (ret != -EAGAIN) { if (ret != -EAGAIN) {
close_connection(con, false, true, false); close_connection(con, true, true, false);
/* Reconnect when there is something to send */ /* Reconnect when there is something to send */
} }
/* Don't return success if we really got EOF */ /* Don't return success if we really got EOF */
...@@ -728,22 +744,14 @@ static int tcp_accept_from_sock(struct connection *con) ...@@ -728,22 +744,14 @@ static int tcp_accept_from_sock(struct connection *con)
} }
mutex_unlock(&connections_lock); mutex_unlock(&connections_lock);
memset(&peeraddr, 0, sizeof(peeraddr));
result = sock_create_lite(dlm_local_addr[0]->ss_family,
SOCK_STREAM, IPPROTO_TCP, &newsock);
if (result < 0)
return -ENOMEM;
mutex_lock_nested(&con->sock_mutex, 0); mutex_lock_nested(&con->sock_mutex, 0);
result = -ENOTCONN; if (!con->sock) {
if (con->sock == NULL) mutex_unlock(&con->sock_mutex);
goto accept_err; return -ENOTCONN;
}
newsock->type = con->sock->type;
newsock->ops = con->sock->ops;
result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK, true); result = kernel_accept(con->sock, &newsock, O_NONBLOCK);
if (result < 0) if (result < 0)
goto accept_err; goto accept_err;
...@@ -794,31 +802,33 @@ static int tcp_accept_from_sock(struct connection *con) ...@@ -794,31 +802,33 @@ static int tcp_accept_from_sock(struct connection *con)
othercon->nodeid = nodeid; othercon->nodeid = nodeid;
othercon->rx_action = receive_from_sock; othercon->rx_action = receive_from_sock;
mutex_init(&othercon->sock_mutex); mutex_init(&othercon->sock_mutex);
INIT_LIST_HEAD(&othercon->writequeue);
spin_lock_init(&othercon->writequeue_lock);
INIT_WORK(&othercon->swork, process_send_sockets); INIT_WORK(&othercon->swork, process_send_sockets);
INIT_WORK(&othercon->rwork, process_recv_sockets); INIT_WORK(&othercon->rwork, process_recv_sockets);
set_bit(CF_IS_OTHERCON, &othercon->flags); set_bit(CF_IS_OTHERCON, &othercon->flags);
} }
mutex_lock_nested(&othercon->sock_mutex, 2);
if (!othercon->sock) { if (!othercon->sock) {
newcon->othercon = othercon; newcon->othercon = othercon;
othercon->sock = newsock; add_sock(newsock, othercon);
newsock->sk->sk_user_data = othercon;
add_sock(newsock, othercon, false);
addcon = othercon; addcon = othercon;
mutex_unlock(&othercon->sock_mutex);
} }
else { else {
printk("Extra connection from node %d attempted\n", nodeid); printk("Extra connection from node %d attempted\n", nodeid);
result = -EAGAIN; result = -EAGAIN;
mutex_unlock(&othercon->sock_mutex);
mutex_unlock(&newcon->sock_mutex); mutex_unlock(&newcon->sock_mutex);
goto accept_err; goto accept_err;
} }
} }
else { else {
newsock->sk->sk_user_data = newcon;
newcon->rx_action = receive_from_sock; newcon->rx_action = receive_from_sock;
/* accept copies the sk after we've saved the callbacks, so we /* accept copies the sk after we've saved the callbacks, so we
don't want to save them a second time or comm errors will don't want to save them a second time or comm errors will
result in calling sk_error_report recursively. */ result in calling sk_error_report recursively. */
add_sock(newsock, newcon, false); add_sock(newsock, newcon);
addcon = newcon; addcon = newcon;
} }
...@@ -837,7 +847,8 @@ static int tcp_accept_from_sock(struct connection *con) ...@@ -837,7 +847,8 @@ static int tcp_accept_from_sock(struct connection *con)
accept_err: accept_err:
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
sock_release(newsock); if (newsock)
sock_release(newsock);
if (result != -EAGAIN) if (result != -EAGAIN)
log_print("error accepting connection from node: %d", result); log_print("error accepting connection from node: %d", result);
...@@ -911,26 +922,28 @@ static int sctp_accept_from_sock(struct connection *con) ...@@ -911,26 +922,28 @@ static int sctp_accept_from_sock(struct connection *con)
othercon->nodeid = nodeid; othercon->nodeid = nodeid;
othercon->rx_action = receive_from_sock; othercon->rx_action = receive_from_sock;
mutex_init(&othercon->sock_mutex); mutex_init(&othercon->sock_mutex);
INIT_LIST_HEAD(&othercon->writequeue);
spin_lock_init(&othercon->writequeue_lock);
INIT_WORK(&othercon->swork, process_send_sockets); INIT_WORK(&othercon->swork, process_send_sockets);
INIT_WORK(&othercon->rwork, process_recv_sockets); INIT_WORK(&othercon->rwork, process_recv_sockets);
set_bit(CF_IS_OTHERCON, &othercon->flags); set_bit(CF_IS_OTHERCON, &othercon->flags);
} }
mutex_lock_nested(&othercon->sock_mutex, 2);
if (!othercon->sock) { if (!othercon->sock) {
newcon->othercon = othercon; newcon->othercon = othercon;
othercon->sock = newsock; add_sock(newsock, othercon);
newsock->sk->sk_user_data = othercon;
add_sock(newsock, othercon, false);
addcon = othercon; addcon = othercon;
mutex_unlock(&othercon->sock_mutex);
} else { } else {
printk("Extra connection from node %d attempted\n", nodeid); printk("Extra connection from node %d attempted\n", nodeid);
ret = -EAGAIN; ret = -EAGAIN;
mutex_unlock(&othercon->sock_mutex);
mutex_unlock(&newcon->sock_mutex); mutex_unlock(&newcon->sock_mutex);
goto accept_err; goto accept_err;
} }
} else { } else {
newsock->sk->sk_user_data = newcon;
newcon->rx_action = receive_from_sock; newcon->rx_action = receive_from_sock;
add_sock(newsock, newcon, false); add_sock(newsock, newcon);
addcon = newcon; addcon = newcon;
} }
...@@ -1055,10 +1068,9 @@ static void sctp_connect_to_sock(struct connection *con) ...@@ -1055,10 +1068,9 @@ static void sctp_connect_to_sock(struct connection *con)
if (result < 0) if (result < 0)
goto socket_err; goto socket_err;
sock->sk->sk_user_data = con;
con->rx_action = receive_from_sock; con->rx_action = receive_from_sock;
con->connect_action = sctp_connect_to_sock; con->connect_action = sctp_connect_to_sock;
add_sock(sock, con, true); add_sock(sock, con);
/* Bind to all addresses. */ /* Bind to all addresses. */
if (sctp_bind_addrs(con, 0)) if (sctp_bind_addrs(con, 0))
...@@ -1079,7 +1091,6 @@ static void sctp_connect_to_sock(struct connection *con) ...@@ -1079,7 +1091,6 @@ static void sctp_connect_to_sock(struct connection *con)
if (result == 0) if (result == 0)
goto out; goto out;
bind_err: bind_err:
con->sock = NULL; con->sock = NULL;
sock_release(sock); sock_release(sock);
...@@ -1098,14 +1109,12 @@ static void sctp_connect_to_sock(struct connection *con) ...@@ -1098,14 +1109,12 @@ static void sctp_connect_to_sock(struct connection *con)
con->retries, result); con->retries, result);
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
msleep(1000); msleep(1000);
clear_bit(CF_CONNECT_PENDING, &con->flags);
lowcomms_connect_sock(con); lowcomms_connect_sock(con);
return; return;
} }
out: out:
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
set_bit(CF_WRITE_PENDING, &con->flags);
} }
/* Connect a new socket to its peer */ /* Connect a new socket to its peer */
...@@ -1143,10 +1152,9 @@ static void tcp_connect_to_sock(struct connection *con) ...@@ -1143,10 +1152,9 @@ static void tcp_connect_to_sock(struct connection *con)
goto out_err; goto out_err;
} }
sock->sk->sk_user_data = con;
con->rx_action = receive_from_sock; con->rx_action = receive_from_sock;
con->connect_action = tcp_connect_to_sock; con->connect_action = tcp_connect_to_sock;
add_sock(sock, con, true); add_sock(sock, con);
/* Bind to our cluster-known address connecting to avoid /* Bind to our cluster-known address connecting to avoid
routing problems */ routing problems */
...@@ -1194,13 +1202,11 @@ static void tcp_connect_to_sock(struct connection *con) ...@@ -1194,13 +1202,11 @@ static void tcp_connect_to_sock(struct connection *con)
con->retries, result); con->retries, result);
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
msleep(1000); msleep(1000);
clear_bit(CF_CONNECT_PENDING, &con->flags);
lowcomms_connect_sock(con); lowcomms_connect_sock(con);
return; return;
} }
out: out:
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
set_bit(CF_WRITE_PENDING, &con->flags);
return; return;
} }
...@@ -1235,10 +1241,12 @@ static struct socket *tcp_create_listen_sock(struct connection *con, ...@@ -1235,10 +1241,12 @@ static struct socket *tcp_create_listen_sock(struct connection *con,
if (result < 0) { if (result < 0) {
log_print("Failed to set SO_REUSEADDR on socket: %d", result); log_print("Failed to set SO_REUSEADDR on socket: %d", result);
} }
write_lock_bh(&sock->sk->sk_callback_lock);
sock->sk->sk_user_data = con; sock->sk->sk_user_data = con;
save_listen_callbacks(sock);
con->rx_action = tcp_accept_from_sock; con->rx_action = tcp_accept_from_sock;
con->connect_action = tcp_connect_to_sock; con->connect_action = tcp_connect_to_sock;
write_unlock_bh(&sock->sk->sk_callback_lock);
/* Bind to our port */ /* Bind to our port */
make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
...@@ -1320,6 +1328,7 @@ static int sctp_listen_for_all(void) ...@@ -1320,6 +1328,7 @@ static int sctp_listen_for_all(void)
write_lock_bh(&sock->sk->sk_callback_lock); write_lock_bh(&sock->sk->sk_callback_lock);
/* Init con struct */ /* Init con struct */
sock->sk->sk_user_data = con; sock->sk->sk_user_data = con;
save_listen_callbacks(sock);
con->sock = sock; con->sock = sock;
con->sock->sk->sk_data_ready = lowcomms_data_ready; con->sock->sk->sk_data_ready = lowcomms_data_ready;
con->rx_action = sctp_accept_from_sock; con->rx_action = sctp_accept_from_sock;
...@@ -1366,7 +1375,7 @@ static int tcp_listen_for_all(void) ...@@ -1366,7 +1375,7 @@ static int tcp_listen_for_all(void)
sock = tcp_create_listen_sock(con, dlm_local_addr[0]); sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
if (sock) { if (sock) {
add_sock(sock, con, true); add_sock(sock, con);
result = 0; result = 0;
} }
else { else {
...@@ -1456,9 +1465,7 @@ void dlm_lowcomms_commit_buffer(void *mh) ...@@ -1456,9 +1465,7 @@ void dlm_lowcomms_commit_buffer(void *mh)
e->len = e->end - e->offset; e->len = e->end - e->offset;
spin_unlock(&con->writequeue_lock); spin_unlock(&con->writequeue_lock);
if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { queue_work(send_workqueue, &con->swork);
queue_work(send_workqueue, &con->swork);
}
return; return;
out: out:
...@@ -1527,13 +1534,16 @@ static void send_to_sock(struct connection *con) ...@@ -1527,13 +1534,16 @@ static void send_to_sock(struct connection *con)
send_error: send_error:
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
close_connection(con, false, false, true); close_connection(con, true, false, true);
lowcomms_connect_sock(con); /* Requeue the send work. When the work daemon runs again, it will try
a new connection, then call this function again. */
queue_work(send_workqueue, &con->swork);
return; return;
out_connect: out_connect:
mutex_unlock(&con->sock_mutex); mutex_unlock(&con->sock_mutex);
lowcomms_connect_sock(con); queue_work(send_workqueue, &con->swork);
cond_resched();
} }
static void clean_one_writequeue(struct connection *con) static void clean_one_writequeue(struct connection *con)
...@@ -1593,9 +1603,10 @@ static void process_send_sockets(struct work_struct *work) ...@@ -1593,9 +1603,10 @@ static void process_send_sockets(struct work_struct *work)
{ {
struct connection *con = container_of(work, struct connection, swork); struct connection *con = container_of(work, struct connection, swork);
if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) clear_bit(CF_WRITE_PENDING, &con->flags);
if (con->sock == NULL) /* not mutex protected so check it inside too */
con->connect_action(con); con->connect_action(con);
if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) if (!list_empty(&con->writequeue))
send_to_sock(con); send_to_sock(con);
} }
...@@ -1632,11 +1643,25 @@ static int work_start(void) ...@@ -1632,11 +1643,25 @@ static int work_start(void)
return 0; return 0;
} }
static void stop_conn(struct connection *con) static void _stop_conn(struct connection *con, bool and_other)
{ {
con->flags |= 0x0F; mutex_lock(&con->sock_mutex);
if (con->sock && con->sock->sk) set_bit(CF_CLOSE, &con->flags);
set_bit(CF_READ_PENDING, &con->flags);
set_bit(CF_WRITE_PENDING, &con->flags);
if (con->sock && con->sock->sk) {
write_lock_bh(&con->sock->sk->sk_callback_lock);
con->sock->sk->sk_user_data = NULL; con->sock->sk->sk_user_data = NULL;
write_unlock_bh(&con->sock->sk->sk_callback_lock);
}
if (con->othercon && and_other)
_stop_conn(con->othercon, false);
mutex_unlock(&con->sock_mutex);
}
static void stop_conn(struct connection *con)
{
_stop_conn(con, true);
} }
static void free_conn(struct connection *con) static void free_conn(struct connection *con)
...@@ -1648,6 +1673,36 @@ static void free_conn(struct connection *con) ...@@ -1648,6 +1673,36 @@ static void free_conn(struct connection *con)
kmem_cache_free(con_cache, con); kmem_cache_free(con_cache, con);
} }
static void work_flush(void)
{
int ok;
int i;
struct hlist_node *n;
struct connection *con;
flush_workqueue(recv_workqueue);
flush_workqueue(send_workqueue);
do {
ok = 1;
foreach_conn(stop_conn);
flush_workqueue(recv_workqueue);
flush_workqueue(send_workqueue);
for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
hlist_for_each_entry_safe(con, n,
&connection_hash[i], list) {
ok &= test_bit(CF_READ_PENDING, &con->flags);
ok &= test_bit(CF_WRITE_PENDING, &con->flags);
if (con->othercon) {
ok &= test_bit(CF_READ_PENDING,
&con->othercon->flags);
ok &= test_bit(CF_WRITE_PENDING,
&con->othercon->flags);
}
}
}
} while (!ok);
}
void dlm_lowcomms_stop(void) void dlm_lowcomms_stop(void)
{ {
/* Set all the flags to prevent any /* Set all the flags to prevent any
...@@ -1655,11 +1710,10 @@ void dlm_lowcomms_stop(void) ...@@ -1655,11 +1710,10 @@ void dlm_lowcomms_stop(void)
*/ */
mutex_lock(&connections_lock); mutex_lock(&connections_lock);
dlm_allow_conn = 0; dlm_allow_conn = 0;
foreach_conn(stop_conn); mutex_unlock(&connections_lock);
work_flush();
clean_writequeues(); clean_writequeues();
foreach_conn(free_conn); foreach_conn(free_conn);
mutex_unlock(&connections_lock);
work_stop(); work_stop();
kmem_cache_destroy(con_cache); kmem_cache_destroy(con_cache);
......
...@@ -155,6 +155,7 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags) ...@@ -155,6 +155,7 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
goto out; goto out;
} }
retry:
error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
sizeof(struct rcom_status), &rc, &mh); sizeof(struct rcom_status), &rc, &mh);
if (error) if (error)
...@@ -169,6 +170,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags) ...@@ -169,6 +170,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
error = dlm_wait_function(ls, &rcom_response); error = dlm_wait_function(ls, &rcom_response);
disallow_sync_reply(ls); disallow_sync_reply(ls);
if (error == -ETIMEDOUT)
goto retry;
if (error) if (error)
goto out; goto out;
...@@ -276,6 +279,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) ...@@ -276,6 +279,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
ls->ls_recover_nodeid = nodeid; ls->ls_recover_nodeid = nodeid;
retry:
error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh); error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
if (error) if (error)
goto out; goto out;
...@@ -288,6 +292,8 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) ...@@ -288,6 +292,8 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
error = dlm_wait_function(ls, &rcom_response); error = dlm_wait_function(ls, &rcom_response);
disallow_sync_reply(ls); disallow_sync_reply(ls);
if (error == -ETIMEDOUT)
goto retry;
out: out:
return error; return error;
} }
...@@ -332,25 +338,6 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) ...@@ -332,25 +338,6 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
return error; return error;
} }
int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
struct dlm_ls *ls = r->res_ls;
int error;
error = create_rcom(ls, to_nodeid, DLM_RCOM_LOOKUP, r->res_length,
&rc, &mh);
if (error)
goto out;
memcpy(rc->rc_buf, r->res_name, r->res_length);
rc->rc_id = 0xFFFFFFFF;
send_rcom(ls, mh, rc);
out:
return error;
}
static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
{ {
struct dlm_rcom *rc; struct dlm_rcom *rc;
...@@ -362,6 +349,7 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) ...@@ -362,6 +349,7 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
if (error) if (error)
return; return;
/* Old code would send this special id to trigger a debug dump. */
if (rc_in->rc_id == 0xFFFFFFFF) { if (rc_in->rc_id == 0xFFFFFFFF) {
log_error(ls, "receive_rcom_lookup dump from %d", nodeid); log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
dlm_dump_rsb_name(ls, rc_in->rc_buf, len); dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags); int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags);
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len); int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid); int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid);
int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid); void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid);
int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in); int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in);
......
...@@ -52,6 +52,10 @@ int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls)) ...@@ -52,6 +52,10 @@ int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
dlm_config.ci_recover_timer * HZ); dlm_config.ci_recover_timer * HZ);
if (rv) if (rv)
break; break;
if (test_bit(LSFL_RCOM_WAIT, &ls->ls_flags)) {
log_debug(ls, "dlm_wait_function timed out");
return -ETIMEDOUT;
}
} }
if (dlm_recovery_stopped(ls)) { if (dlm_recovery_stopped(ls)) {
......
...@@ -287,11 +287,23 @@ static int dlm_recoverd(void *arg) ...@@ -287,11 +287,23 @@ static int dlm_recoverd(void *arg)
set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags); set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
wake_up(&ls->ls_recover_lock_wait); wake_up(&ls->ls_recover_lock_wait);
while (!kthread_should_stop()) { while (1) {
/*
* We call kthread_should_stop() after set_current_state().
* This is because it works correctly if kthread_stop() is
* called just before set_current_state().
*/
set_current_state(TASK_INTERRUPTIBLE); set_current_state(TASK_INTERRUPTIBLE);
if (kthread_should_stop()) {
set_current_state(TASK_RUNNING);
break;
}
if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) && if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) &&
!test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) !test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
if (kthread_should_stop())
break;
schedule(); schedule();
}
set_current_state(TASK_RUNNING); set_current_state(TASK_RUNNING);
if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) { if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment