Commit bcf50ef2 authored by Chris Mason's avatar Chris Mason Committed by Andy Grover

rds: use RCU to protect the connection hash

The connection hash was almost entirely RCU ready, this
just makes the final couple of changes to use RCU instead
of spinlocks for everything.
Signed-off-by: default avatarChris Mason <chris.mason@oracle.com>
parent abf45439
...@@ -62,6 +62,7 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr) ...@@ -62,6 +62,7 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr)
var |= RDS_INFO_CONNECTION_FLAG_##suffix; \ var |= RDS_INFO_CONNECTION_FLAG_##suffix; \
} while (0) } while (0)
/* rcu read lock must be held or the connection spinlock */
static struct rds_connection *rds_conn_lookup(struct hlist_head *head, static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
__be32 laddr, __be32 faddr, __be32 laddr, __be32 faddr,
struct rds_transport *trans) struct rds_transport *trans)
...@@ -69,7 +70,7 @@ static struct rds_connection *rds_conn_lookup(struct hlist_head *head, ...@@ -69,7 +70,7 @@ static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
struct rds_connection *conn, *ret = NULL; struct rds_connection *conn, *ret = NULL;
struct hlist_node *pos; struct hlist_node *pos;
hlist_for_each_entry(conn, pos, head, c_hash_node) { hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
if (conn->c_faddr == faddr && conn->c_laddr == laddr && if (conn->c_faddr == faddr && conn->c_laddr == laddr &&
conn->c_trans == trans) { conn->c_trans == trans) {
ret = conn; ret = conn;
...@@ -119,7 +120,8 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, ...@@ -119,7 +120,8 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
unsigned long flags; unsigned long flags;
int ret; int ret;
spin_lock_irqsave(&rds_conn_lock, flags);
rcu_read_lock();
conn = rds_conn_lookup(head, laddr, faddr, trans); conn = rds_conn_lookup(head, laddr, faddr, trans);
if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport && if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport &&
!is_outgoing) { !is_outgoing) {
...@@ -130,7 +132,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, ...@@ -130,7 +132,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
parent = conn; parent = conn;
conn = parent->c_passive; conn = parent->c_passive;
} }
spin_unlock_irqrestore(&rds_conn_lock, flags); rcu_read_unlock();
if (conn) if (conn)
goto out; goto out;
...@@ -227,7 +229,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, ...@@ -227,7 +229,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
kmem_cache_free(rds_conn_slab, conn); kmem_cache_free(rds_conn_slab, conn);
conn = found; conn = found;
} else { } else {
hlist_add_head(&conn->c_hash_node, head); hlist_add_head_rcu(&conn->c_hash_node, head);
rds_cong_add_conn(conn); rds_cong_add_conn(conn);
rds_conn_count++; rds_conn_count++;
} }
...@@ -306,8 +308,13 @@ void rds_conn_shutdown(struct rds_connection *conn) ...@@ -306,8 +308,13 @@ void rds_conn_shutdown(struct rds_connection *conn)
* to the conn hash, so we never trigger a reconnect on this * to the conn hash, so we never trigger a reconnect on this
* conn - the reconnect is always triggered by the active peer. */ * conn - the reconnect is always triggered by the active peer. */
cancel_delayed_work_sync(&conn->c_conn_w); cancel_delayed_work_sync(&conn->c_conn_w);
if (!hlist_unhashed(&conn->c_hash_node)) rcu_read_lock();
if (!hlist_unhashed(&conn->c_hash_node)) {
rcu_read_unlock();
rds_queue_reconnect(conn); rds_queue_reconnect(conn);
} else {
rcu_read_unlock();
}
} }
/* /*
...@@ -323,14 +330,12 @@ void rds_conn_destroy(struct rds_connection *conn) ...@@ -323,14 +330,12 @@ void rds_conn_destroy(struct rds_connection *conn)
/* Ensure conn will not be scheduled for reconnect */ /* Ensure conn will not be scheduled for reconnect */
spin_lock_irq(&rds_conn_lock); spin_lock_irq(&rds_conn_lock);
hlist_del_init(&conn->c_hash_node); hlist_del_init_rcu(&conn->c_hash_node);
spin_unlock_irq(&rds_conn_lock); spin_unlock_irq(&rds_conn_lock);
/* wait for the rds thread to shut it down */ synchronize_rcu();
atomic_set(&conn->c_state, RDS_CONN_ERROR);
cancel_delayed_work(&conn->c_conn_w); rds_conn_shutdown(conn);
queue_work(rds_wq, &conn->c_down_w);
flush_workqueue(rds_wq);
/* tear down queued messages */ /* tear down queued messages */
list_for_each_entry_safe(rm, rtmp, list_for_each_entry_safe(rm, rtmp,
...@@ -369,17 +374,16 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len, ...@@ -369,17 +374,16 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
struct list_head *list; struct list_head *list;
struct rds_connection *conn; struct rds_connection *conn;
struct rds_message *rm; struct rds_message *rm;
unsigned long flags;
unsigned int total = 0; unsigned int total = 0;
size_t i; size_t i;
len /= sizeof(struct rds_info_message); len /= sizeof(struct rds_info_message);
spin_lock_irqsave(&rds_conn_lock, flags); rcu_read_lock();
for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash); for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
i++, head++) { i++, head++) {
hlist_for_each_entry(conn, pos, head, c_hash_node) { hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
if (want_send) if (want_send)
list = &conn->c_send_queue; list = &conn->c_send_queue;
else else
...@@ -399,8 +403,7 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len, ...@@ -399,8 +403,7 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
spin_unlock(&conn->c_lock); spin_unlock(&conn->c_lock);
} }
} }
rcu_read_unlock();
spin_unlock_irqrestore(&rds_conn_lock, flags);
lens->nr = total; lens->nr = total;
lens->each = sizeof(struct rds_info_message); lens->each = sizeof(struct rds_info_message);
...@@ -430,19 +433,17 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len, ...@@ -430,19 +433,17 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
uint64_t buffer[(item_len + 7) / 8]; uint64_t buffer[(item_len + 7) / 8];
struct hlist_head *head; struct hlist_head *head;
struct hlist_node *pos; struct hlist_node *pos;
struct hlist_node *tmp;
struct rds_connection *conn; struct rds_connection *conn;
unsigned long flags;
size_t i; size_t i;
spin_lock_irqsave(&rds_conn_lock, flags); rcu_read_lock();
lens->nr = 0; lens->nr = 0;
lens->each = item_len; lens->each = item_len;
for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash); for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
i++, head++) { i++, head++) {
hlist_for_each_entry_safe(conn, pos, tmp, head, c_hash_node) { hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
/* XXX no c_lock usage.. */ /* XXX no c_lock usage.. */
if (!visitor(conn, buffer)) if (!visitor(conn, buffer))
...@@ -458,8 +459,7 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len, ...@@ -458,8 +459,7 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
lens->nr++; lens->nr++;
} }
} }
rcu_read_unlock();
spin_unlock_irqrestore(&rds_conn_lock, flags);
} }
EXPORT_SYMBOL_GPL(rds_for_each_conn_info); EXPORT_SYMBOL_GPL(rds_for_each_conn_info);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment