Commit 16fa29ae authored by David S. Miller's avatar David S. Miller

Merge branch 'smc-fixes'

Dust Li says:

====================
net/smc: fix kernel panic caused by race of smc_sock

This patchset fixes the race between smc_release triggered by
close(2) and cdc_handle triggered by underlaying RDMA device.

The race is caused because the smc_connection may been released
before the pending tx CDC messages got its CQEs. In order to fix
this, I add a counter to track how many pending WRs we have posted
through the smc_connection, and only release the smc_connection
after there is no pending WRs on the connection.

The first patch prevents posting WR on a QP that is not in RTS
state. This patch is needed because if we post WR on a QP that
is not in RTS state, ib_post_send() may success but no CQE will
return, and that will confuse the counter tracking the pending
WRs.

The second patch add a counter to track how many WRs were posted
through the smc_connection, and don't reset the QP on link destroying
to prevent leak of the counter.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 1b9dadba 349d4312
......@@ -180,6 +180,11 @@ struct smc_connection {
u16 tx_cdc_seq; /* sequence # for CDC send */
u16 tx_cdc_seq_fin; /* sequence # - tx completed */
spinlock_t send_lock; /* protect wr_sends */
atomic_t cdc_pend_tx_wr; /* number of pending tx CDC wqe
* - inc when post wqe,
* - dec on polled tx cqe
*/
wait_queue_head_t cdc_pend_tx_wq; /* wakeup on no cdc_pend_tx_wr*/
struct delayed_work tx_work; /* retry of smc_cdc_msg_send */
u32 tx_off; /* base offset in peer rmb */
......
......@@ -31,10 +31,6 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd,
struct smc_sock *smc;
int diff;
if (!conn)
/* already dismissed */
return;
smc = container_of(conn, struct smc_sock, conn);
bh_lock_sock(&smc->sk);
if (!wc_status) {
......@@ -51,6 +47,12 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd,
conn);
conn->tx_cdc_seq_fin = cdcpend->ctrl_seq;
}
if (atomic_dec_and_test(&conn->cdc_pend_tx_wr) &&
unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq)))
wake_up(&conn->cdc_pend_tx_wq);
WARN_ON(atomic_read(&conn->cdc_pend_tx_wr) < 0);
smc_tx_sndbuf_nonfull(smc);
bh_unlock_sock(&smc->sk);
}
......@@ -107,6 +109,10 @@ int smc_cdc_msg_send(struct smc_connection *conn,
conn->tx_cdc_seq++;
conn->local_tx_ctrl.seqno = conn->tx_cdc_seq;
smc_host_msg_to_cdc((struct smc_cdc_msg *)wr_buf, conn, &cfed);
atomic_inc(&conn->cdc_pend_tx_wr);
smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */
rc = smc_wr_tx_send(link, (struct smc_wr_tx_pend_priv *)pend);
if (!rc) {
smc_curs_copy(&conn->rx_curs_confirmed, &cfed, conn);
......@@ -114,6 +120,7 @@ int smc_cdc_msg_send(struct smc_connection *conn,
} else {
conn->tx_cdc_seq--;
conn->local_tx_ctrl.seqno = conn->tx_cdc_seq;
atomic_dec(&conn->cdc_pend_tx_wr);
}
return rc;
......@@ -136,7 +143,18 @@ int smcr_cdc_msg_send_validation(struct smc_connection *conn,
peer->token = htonl(local->token);
peer->prod_flags.failover_validation = 1;
/* We need to set pend->conn here to make sure smc_cdc_tx_handler()
* can handle properly
*/
smc_cdc_add_pending_send(conn, pend);
atomic_inc(&conn->cdc_pend_tx_wr);
smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */
rc = smc_wr_tx_send(link, (struct smc_wr_tx_pend_priv *)pend);
if (unlikely(rc))
atomic_dec(&conn->cdc_pend_tx_wr);
return rc;
}
......@@ -193,31 +211,9 @@ int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn)
return rc;
}
static bool smc_cdc_tx_filter(struct smc_wr_tx_pend_priv *tx_pend,
unsigned long data)
void smc_cdc_wait_pend_tx_wr(struct smc_connection *conn)
{
struct smc_connection *conn = (struct smc_connection *)data;
struct smc_cdc_tx_pend *cdc_pend =
(struct smc_cdc_tx_pend *)tx_pend;
return cdc_pend->conn == conn;
}
static void smc_cdc_tx_dismisser(struct smc_wr_tx_pend_priv *tx_pend)
{
struct smc_cdc_tx_pend *cdc_pend =
(struct smc_cdc_tx_pend *)tx_pend;
cdc_pend->conn = NULL;
}
void smc_cdc_tx_dismiss_slots(struct smc_connection *conn)
{
struct smc_link *link = conn->lnk;
smc_wr_tx_dismiss_slots(link, SMC_CDC_MSG_TYPE,
smc_cdc_tx_filter, smc_cdc_tx_dismisser,
(unsigned long)conn);
wait_event(conn->cdc_pend_tx_wq, !atomic_read(&conn->cdc_pend_tx_wr));
}
/* Send a SMC-D CDC header.
......
......@@ -291,7 +291,7 @@ int smc_cdc_get_free_slot(struct smc_connection *conn,
struct smc_wr_buf **wr_buf,
struct smc_rdma_wr **wr_rdma_buf,
struct smc_cdc_tx_pend **pend);
void smc_cdc_tx_dismiss_slots(struct smc_connection *conn);
void smc_cdc_wait_pend_tx_wr(struct smc_connection *conn);
int smc_cdc_msg_send(struct smc_connection *conn, struct smc_wr_buf *wr_buf,
struct smc_cdc_tx_pend *pend);
int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn);
......
......@@ -647,7 +647,7 @@ static void smcr_lgr_link_deactivate_all(struct smc_link_group *lgr)
for (i = 0; i < SMC_LINKS_PER_LGR_MAX; i++) {
struct smc_link *lnk = &lgr->lnk[i];
if (smc_link_usable(lnk))
if (smc_link_sendable(lnk))
lnk->state = SMC_LNK_INACTIVE;
}
wake_up_all(&lgr->llc_msg_waiter);
......@@ -1127,7 +1127,7 @@ void smc_conn_free(struct smc_connection *conn)
smc_ism_unset_conn(conn);
tasklet_kill(&conn->rx_tsklet);
} else {
smc_cdc_tx_dismiss_slots(conn);
smc_cdc_wait_pend_tx_wr(conn);
if (current_work() != &conn->abort_work)
cancel_work_sync(&conn->abort_work);
}
......@@ -1204,7 +1204,7 @@ void smcr_link_clear(struct smc_link *lnk, bool log)
smc_llc_link_clear(lnk, log);
smcr_buf_unmap_lgr(lnk);
smcr_rtoken_clear_link(lnk);
smc_ib_modify_qp_reset(lnk);
smc_ib_modify_qp_error(lnk);
smc_wr_free_link(lnk);
smc_ib_destroy_queue_pair(lnk);
smc_ib_dealloc_protection_domain(lnk);
......@@ -1336,7 +1336,7 @@ static void smc_conn_kill(struct smc_connection *conn, bool soft)
else
tasklet_unlock_wait(&conn->rx_tsklet);
} else {
smc_cdc_tx_dismiss_slots(conn);
smc_cdc_wait_pend_tx_wr(conn);
}
smc_lgr_unregister_conn(conn);
smc_close_active_abort(smc);
......@@ -1459,11 +1459,16 @@ void smc_smcd_terminate_all(struct smcd_dev *smcd)
/* Called when an SMCR device is removed or the smc module is unloaded.
* If smcibdev is given, all SMCR link groups using this device are terminated.
* If smcibdev is NULL, all SMCR link groups are terminated.
*
* We must wait here for QPs been destroyed before we destroy the CQs,
* or we won't received any CQEs and cdc_pend_tx_wr cannot reach 0 thus
* smc_sock cannot be released.
*/
void smc_smcr_terminate_all(struct smc_ib_device *smcibdev)
{
struct smc_link_group *lgr, *lg;
LIST_HEAD(lgr_free_list);
LIST_HEAD(lgr_linkdown_list);
int i;
spin_lock_bh(&smc_lgr_list.lock);
......@@ -1475,7 +1480,7 @@ void smc_smcr_terminate_all(struct smc_ib_device *smcibdev)
list_for_each_entry_safe(lgr, lg, &smc_lgr_list.list, list) {
for (i = 0; i < SMC_LINKS_PER_LGR_MAX; i++) {
if (lgr->lnk[i].smcibdev == smcibdev)
smcr_link_down_cond_sched(&lgr->lnk[i]);
list_move_tail(&lgr->list, &lgr_linkdown_list);
}
}
}
......@@ -1487,6 +1492,16 @@ void smc_smcr_terminate_all(struct smc_ib_device *smcibdev)
__smc_lgr_terminate(lgr, false);
}
list_for_each_entry_safe(lgr, lg, &lgr_linkdown_list, list) {
for (i = 0; i < SMC_LINKS_PER_LGR_MAX; i++) {
if (lgr->lnk[i].smcibdev == smcibdev) {
mutex_lock(&lgr->llc_conf_mutex);
smcr_link_down_cond(&lgr->lnk[i]);
mutex_unlock(&lgr->llc_conf_mutex);
}
}
}
if (smcibdev) {
if (atomic_read(&smcibdev->lnk_cnt))
wait_event(smcibdev->lnks_deleted,
......@@ -1586,7 +1601,6 @@ static void smcr_link_down(struct smc_link *lnk)
if (!lgr || lnk->state == SMC_LNK_UNUSED || list_empty(&lgr->list))
return;
smc_ib_modify_qp_reset(lnk);
to_lnk = smc_switch_conns(lgr, lnk, true);
if (!to_lnk) { /* no backup link available */
smcr_link_clear(lnk, true);
......@@ -1824,6 +1838,7 @@ int smc_conn_create(struct smc_sock *smc, struct smc_init_info *ini)
conn->local_tx_ctrl.common.type = SMC_CDC_MSG_TYPE;
conn->local_tx_ctrl.len = SMC_WR_TX_SIZE;
conn->urg_state = SMC_URG_READ;
init_waitqueue_head(&conn->cdc_pend_tx_wq);
INIT_WORK(&smc->conn.abort_work, smc_conn_abort_work);
if (ini->is_smcd) {
conn->rx_off = sizeof(struct smcd_cdc_msg);
......
......@@ -415,6 +415,12 @@ static inline bool smc_link_usable(struct smc_link *lnk)
return true;
}
static inline bool smc_link_sendable(struct smc_link *lnk)
{
return smc_link_usable(lnk) &&
lnk->qp_attr.cur_qp_state == IB_QPS_RTS;
}
static inline bool smc_link_active(struct smc_link *lnk)
{
return lnk->state == SMC_LNK_ACTIVE;
......
......@@ -109,12 +109,12 @@ int smc_ib_modify_qp_rts(struct smc_link *lnk)
IB_QP_MAX_QP_RD_ATOMIC);
}
int smc_ib_modify_qp_reset(struct smc_link *lnk)
int smc_ib_modify_qp_error(struct smc_link *lnk)
{
struct ib_qp_attr qp_attr;
memset(&qp_attr, 0, sizeof(qp_attr));
qp_attr.qp_state = IB_QPS_RESET;
qp_attr.qp_state = IB_QPS_ERR;
return ib_modify_qp(lnk->roce_qp, &qp_attr, IB_QP_STATE);
}
......
......@@ -90,6 +90,7 @@ int smc_ib_create_queue_pair(struct smc_link *lnk);
int smc_ib_ready_link(struct smc_link *lnk);
int smc_ib_modify_qp_rts(struct smc_link *lnk);
int smc_ib_modify_qp_reset(struct smc_link *lnk);
int smc_ib_modify_qp_error(struct smc_link *lnk);
long smc_ib_setup_per_ibdev(struct smc_ib_device *smcibdev);
int smc_ib_get_memory_region(struct ib_pd *pd, int access_flags,
struct smc_buf_desc *buf_slot, u8 link_idx);
......
......@@ -1630,7 +1630,7 @@ void smc_llc_send_link_delete_all(struct smc_link_group *lgr, bool ord, u32 rsn)
delllc.reason = htonl(rsn);
for (i = 0; i < SMC_LINKS_PER_LGR_MAX; i++) {
if (!smc_link_usable(&lgr->lnk[i]))
if (!smc_link_sendable(&lgr->lnk[i]))
continue;
if (!smc_llc_send_message_wait(&lgr->lnk[i], &delllc))
break;
......
......@@ -62,13 +62,9 @@ static inline bool smc_wr_is_tx_pend(struct smc_link *link)
}
/* wait till all pending tx work requests on the given link are completed */
int smc_wr_tx_wait_no_pending_sends(struct smc_link *link)
void smc_wr_tx_wait_no_pending_sends(struct smc_link *link)
{
if (wait_event_timeout(link->wr_tx_wait, !smc_wr_is_tx_pend(link),
SMC_WR_TX_WAIT_PENDING_TIME))
return 0;
else /* timeout */
return -EPIPE;
wait_event(link->wr_tx_wait, !smc_wr_is_tx_pend(link));
}
static inline int smc_wr_tx_find_pending_index(struct smc_link *link, u64 wr_id)
......@@ -87,7 +83,6 @@ static inline void smc_wr_tx_process_cqe(struct ib_wc *wc)
struct smc_wr_tx_pend pnd_snd;
struct smc_link *link;
u32 pnd_snd_idx;
int i;
link = wc->qp->qp_context;
......@@ -128,14 +123,6 @@ static inline void smc_wr_tx_process_cqe(struct ib_wc *wc)
}
if (wc->status) {
for_each_set_bit(i, link->wr_tx_mask, link->wr_tx_cnt) {
/* clear full struct smc_wr_tx_pend including .priv */
memset(&link->wr_tx_pends[i], 0,
sizeof(link->wr_tx_pends[i]));
memset(&link->wr_tx_bufs[i], 0,
sizeof(link->wr_tx_bufs[i]));
clear_bit(i, link->wr_tx_mask);
}
if (link->lgr->smc_version == SMC_V2) {
memset(link->wr_tx_v2_pend, 0,
sizeof(*link->wr_tx_v2_pend));
......@@ -188,7 +175,7 @@ void smc_wr_tx_cq_handler(struct ib_cq *ib_cq, void *cq_context)
static inline int smc_wr_tx_get_free_slot_index(struct smc_link *link, u32 *idx)
{
*idx = link->wr_tx_cnt;
if (!smc_link_usable(link))
if (!smc_link_sendable(link))
return -ENOLINK;
for_each_clear_bit(*idx, link->wr_tx_mask, link->wr_tx_cnt) {
if (!test_and_set_bit(*idx, link->wr_tx_mask))
......@@ -231,7 +218,7 @@ int smc_wr_tx_get_free_slot(struct smc_link *link,
} else {
rc = wait_event_interruptible_timeout(
link->wr_tx_wait,
!smc_link_usable(link) ||
!smc_link_sendable(link) ||
lgr->terminating ||
(smc_wr_tx_get_free_slot_index(link, &idx) != -EBUSY),
SMC_WR_TX_WAIT_FREE_SLOT_TIME);
......@@ -421,25 +408,6 @@ int smc_wr_reg_send(struct smc_link *link, struct ib_mr *mr)
return rc;
}
void smc_wr_tx_dismiss_slots(struct smc_link *link, u8 wr_tx_hdr_type,
smc_wr_tx_filter filter,
smc_wr_tx_dismisser dismisser,
unsigned long data)
{
struct smc_wr_tx_pend_priv *tx_pend;
struct smc_wr_rx_hdr *wr_tx;
int i;
for_each_set_bit(i, link->wr_tx_mask, link->wr_tx_cnt) {
wr_tx = (struct smc_wr_rx_hdr *)&link->wr_tx_bufs[i];
if (wr_tx->type != wr_tx_hdr_type)
continue;
tx_pend = &link->wr_tx_pends[i].priv;
if (filter(tx_pend, data))
dismisser(tx_pend);
}
}
/****************************** receive queue ********************************/
int smc_wr_rx_register_handler(struct smc_wr_rx_handler *handler)
......@@ -675,10 +643,7 @@ void smc_wr_free_link(struct smc_link *lnk)
smc_wr_wakeup_reg_wait(lnk);
smc_wr_wakeup_tx_wait(lnk);
if (smc_wr_tx_wait_no_pending_sends(lnk))
memset(lnk->wr_tx_mask, 0,
BITS_TO_LONGS(SMC_WR_BUF_CNT) *
sizeof(*lnk->wr_tx_mask));
smc_wr_tx_wait_no_pending_sends(lnk);
wait_event(lnk->wr_reg_wait, (!atomic_read(&lnk->wr_reg_refcnt)));
wait_event(lnk->wr_tx_wait, (!atomic_read(&lnk->wr_tx_refcnt)));
......
......@@ -22,7 +22,6 @@
#define SMC_WR_BUF_CNT 16 /* # of ctrl buffers per link */
#define SMC_WR_TX_WAIT_FREE_SLOT_TIME (10 * HZ)
#define SMC_WR_TX_WAIT_PENDING_TIME (5 * HZ)
#define SMC_WR_TX_SIZE 44 /* actual size of wr_send data (<=SMC_WR_BUF_SIZE) */
......@@ -62,7 +61,7 @@ static inline void smc_wr_tx_set_wr_id(atomic_long_t *wr_tx_id, long val)
static inline bool smc_wr_tx_link_hold(struct smc_link *link)
{
if (!smc_link_usable(link))
if (!smc_link_sendable(link))
return false;
atomic_inc(&link->wr_tx_refcnt);
return true;
......@@ -130,7 +129,7 @@ void smc_wr_tx_dismiss_slots(struct smc_link *lnk, u8 wr_rx_hdr_type,
smc_wr_tx_filter filter,
smc_wr_tx_dismisser dismisser,
unsigned long data);
int smc_wr_tx_wait_no_pending_sends(struct smc_link *link);
void smc_wr_tx_wait_no_pending_sends(struct smc_link *link);
int smc_wr_rx_register_handler(struct smc_wr_rx_handler *handler);
int smc_wr_rx_post_init(struct smc_link *link);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment