Commit 7e3f2952 authored by Chris Mason's avatar Chris Mason Committed by Andy Grover

rds: don't let RDS shutdown a connection while senders are present

This is the first in a long line of patches that tries to fix races
between RDS connection shutdown and RDS traffic.

Here we are maintaining a count of active senders to make sure
the connection doesn't go away while they are using it.
Signed-off-by: default avatarChris Mason <chris.mason@oracle.com>
parent 38a4e5e6
...@@ -148,6 +148,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, ...@@ -148,6 +148,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
spin_lock_init(&conn->c_send_lock); spin_lock_init(&conn->c_send_lock);
atomic_set(&conn->c_send_generation, 1); atomic_set(&conn->c_send_generation, 1);
atomic_set(&conn->c_senders, 0);
INIT_LIST_HEAD(&conn->c_send_queue); INIT_LIST_HEAD(&conn->c_send_queue);
INIT_LIST_HEAD(&conn->c_retrans); INIT_LIST_HEAD(&conn->c_retrans);
...@@ -276,6 +277,12 @@ void rds_conn_shutdown(struct rds_connection *conn) ...@@ -276,6 +277,12 @@ void rds_conn_shutdown(struct rds_connection *conn)
spin_lock_irq(&conn->c_send_lock); spin_lock_irq(&conn->c_send_lock);
spin_unlock_irq(&conn->c_send_lock); spin_unlock_irq(&conn->c_send_lock);
while(atomic_read(&conn->c_senders)) {
schedule_timeout(1);
spin_lock_irq(&conn->c_send_lock);
spin_unlock_irq(&conn->c_send_lock);
}
conn->c_trans->conn_shutdown(conn); conn->c_trans->conn_shutdown(conn);
rds_conn_reset(conn); rds_conn_reset(conn);
......
...@@ -863,18 +863,6 @@ int rds_ib_recv(struct rds_connection *conn) ...@@ -863,18 +863,6 @@ int rds_ib_recv(struct rds_connection *conn)
int ret = 0; int ret = 0;
rdsdebug("conn %p\n", conn); rdsdebug("conn %p\n", conn);
/*
* If we get a temporary posting failure in this context then
* we're really low and we want the caller to back off for a bit.
*/
mutex_lock(&ic->i_recv_mutex);
if (rds_ib_recv_refill(conn, 0))
ret = -ENOMEM;
else
rds_ib_stats_inc(s_ib_rx_refill_from_thread);
mutex_unlock(&ic->i_recv_mutex);
if (rds_conn_up(conn)) if (rds_conn_up(conn))
rds_ib_attempt_ack(ic); rds_ib_attempt_ack(ic);
......
...@@ -81,7 +81,10 @@ static void rds_message_purge(struct rds_message *rm) ...@@ -81,7 +81,10 @@ static void rds_message_purge(struct rds_message *rm)
void rds_message_put(struct rds_message *rm) void rds_message_put(struct rds_message *rm)
{ {
rdsdebug("put rm %p ref %d\n", rm, atomic_read(&rm->m_refcount)); rdsdebug("put rm %p ref %d\n", rm, atomic_read(&rm->m_refcount));
if (atomic_read(&rm->m_refcount) == 0) {
printk(KERN_CRIT "danger refcount zero on %p\n", rm);
WARN_ON(1);
}
if (atomic_dec_and_test(&rm->m_refcount)) { if (atomic_dec_and_test(&rm->m_refcount)) {
BUG_ON(!list_empty(&rm->m_sock_item)); BUG_ON(!list_empty(&rm->m_sock_item));
BUG_ON(!list_empty(&rm->m_conn_item)); BUG_ON(!list_empty(&rm->m_conn_item));
......
...@@ -93,6 +93,7 @@ struct rds_connection { ...@@ -93,6 +93,7 @@ struct rds_connection {
spinlock_t c_send_lock; /* protect send ring */ spinlock_t c_send_lock; /* protect send ring */
atomic_t c_send_generation; atomic_t c_send_generation;
atomic_t c_senders;
struct rds_message *c_xmit_rm; struct rds_message *c_xmit_rm;
unsigned long c_xmit_sg; unsigned long c_xmit_sg;
unsigned int c_xmit_hdr_off; unsigned int c_xmit_hdr_off;
......
...@@ -60,15 +60,23 @@ void rds_send_reset(struct rds_connection *conn) ...@@ -60,15 +60,23 @@ void rds_send_reset(struct rds_connection *conn)
struct rds_message *rm, *tmp; struct rds_message *rm, *tmp;
unsigned long flags; unsigned long flags;
spin_lock_irqsave(&conn->c_send_lock, flags);
if (conn->c_xmit_rm) { if (conn->c_xmit_rm) {
rm = conn->c_xmit_rm;
conn->c_xmit_rm = NULL;
/* Tell the user the RDMA op is no longer mapped by the /* Tell the user the RDMA op is no longer mapped by the
* transport. This isn't entirely true (it's flushed out * transport. This isn't entirely true (it's flushed out
* independently) but as the connection is down, there's * independently) but as the connection is down, there's
* no ongoing RDMA to/from that memory */ * no ongoing RDMA to/from that memory */
rds_message_unmapped(conn->c_xmit_rm); printk(KERN_CRIT "send reset unmapping %p\n", rm);
rds_message_put(conn->c_xmit_rm); rds_message_unmapped(rm);
conn->c_xmit_rm = NULL; spin_unlock_irqrestore(&conn->c_send_lock, flags);
rds_message_put(rm);
} else {
spin_unlock_irqrestore(&conn->c_send_lock, flags);
} }
conn->c_xmit_sg = 0; conn->c_xmit_sg = 0;
conn->c_xmit_hdr_off = 0; conn->c_xmit_hdr_off = 0;
conn->c_xmit_data_off = 0; conn->c_xmit_data_off = 0;
...@@ -131,6 +139,7 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -131,6 +139,7 @@ int rds_send_xmit(struct rds_connection *conn)
ret = -ENOMEM; ret = -ENOMEM;
goto out; goto out;
} }
atomic_inc(&conn->c_senders);
if (conn->c_trans->xmit_prepare) if (conn->c_trans->xmit_prepare)
conn->c_trans->xmit_prepare(conn); conn->c_trans->xmit_prepare(conn);
...@@ -350,6 +359,8 @@ int rds_send_xmit(struct rds_connection *conn) ...@@ -350,6 +359,8 @@ int rds_send_xmit(struct rds_connection *conn)
rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
} }
atomic_dec(&conn->c_senders);
/* /*
* Other senders will see we have c_send_lock and exit. We * Other senders will see we have c_send_lock and exit. We
* need to recheck the send queue and race again for c_send_lock * need to recheck the send queue and race again for c_send_lock
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment