Commit a47666eb authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland

fs: dlm: make connection hash lockless

There are some problems with the connections_lock. During my
experiements I saw sometimes circular dependencies with sock_lock.
The reason here might be code parts which runs nodeid2con() before
or after sock_lock is acquired.

Another issue are missing locks in for_conn() iteration. Maybe this
works fine because for_conn() is running in a context where
connection_hash cannot be manipulated by others anymore.

However this patch changes the connection_hash to be protected by
sleepable rcu. The hotpath function __find_con() is implemented
lockless as it is only a reader of connection_hash and this hopefully
fixes the circular locking dependencies. The iteration for_conn() will
still call some sleepable functionality, that's why we use sleepable rcu
in this case.

This patch removes the kmemcache functionality as I think I need to
make some free() functionality via call_rcu(). However allocation time
isn't here an issue. The dlm_allow_con will not be protected by a lock
anymore as I think it's enough to just set and flush workqueues
afterwards.
Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent aa7ab1e2
...@@ -4,6 +4,7 @@ menuconfig DLM ...@@ -4,6 +4,7 @@ menuconfig DLM
depends on INET depends on INET
depends on SYSFS && CONFIGFS_FS && (IPV6 || IPV6=n) depends on SYSFS && CONFIGFS_FS && (IPV6 || IPV6=n)
select IP_SCTP select IP_SCTP
select SRCU
help help
A general purpose distributed lock manager for kernel or userspace A general purpose distributed lock manager for kernel or userspace
applications. applications.
......
...@@ -126,6 +126,7 @@ struct connection { ...@@ -126,6 +126,7 @@ struct connection {
struct work_struct rwork; /* Receive workqueue */ struct work_struct rwork; /* Receive workqueue */
struct work_struct swork; /* Send workqueue */ struct work_struct swork; /* Send workqueue */
wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
struct rcu_head rcu;
}; };
#define sock2con(x) ((struct connection *)(x)->sk_user_data) #define sock2con(x) ((struct connection *)(x)->sk_user_data)
...@@ -167,8 +168,8 @@ static struct workqueue_struct *recv_workqueue; ...@@ -167,8 +168,8 @@ static struct workqueue_struct *recv_workqueue;
static struct workqueue_struct *send_workqueue; static struct workqueue_struct *send_workqueue;
static struct hlist_head connection_hash[CONN_HASH_SIZE]; static struct hlist_head connection_hash[CONN_HASH_SIZE];
static DEFINE_MUTEX(connections_lock); static DEFINE_SPINLOCK(connections_lock);
static struct kmem_cache *con_cache; DEFINE_STATIC_SRCU(connections_srcu);
static void process_recv_sockets(struct work_struct *work); static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work); static void process_send_sockets(struct work_struct *work);
...@@ -184,15 +185,20 @@ static inline int nodeid_hash(int nodeid) ...@@ -184,15 +185,20 @@ static inline int nodeid_hash(int nodeid)
static struct connection *__find_con(int nodeid) static struct connection *__find_con(int nodeid)
{ {
int r; int r, idx;
struct connection *con; struct connection *con;
r = nodeid_hash(nodeid); r = nodeid_hash(nodeid);
hlist_for_each_entry(con, &connection_hash[r], list) { idx = srcu_read_lock(&connections_srcu);
if (con->nodeid == nodeid) hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
if (con->nodeid == nodeid) {
srcu_read_unlock(&connections_srcu, idx);
return con; return con;
}
} }
srcu_read_unlock(&connections_srcu, idx);
return NULL; return NULL;
} }
...@@ -200,7 +206,7 @@ static struct connection *__find_con(int nodeid) ...@@ -200,7 +206,7 @@ static struct connection *__find_con(int nodeid)
* If 'allocation' is zero then we don't attempt to create a new * If 'allocation' is zero then we don't attempt to create a new
* connection structure for this node. * connection structure for this node.
*/ */
static struct connection *__nodeid2con(int nodeid, gfp_t alloc) static struct connection *nodeid2con(int nodeid, gfp_t alloc)
{ {
struct connection *con = NULL; struct connection *con = NULL;
int r; int r;
...@@ -209,13 +215,10 @@ static struct connection *__nodeid2con(int nodeid, gfp_t alloc) ...@@ -209,13 +215,10 @@ static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
if (con || !alloc) if (con || !alloc)
return con; return con;
con = kmem_cache_zalloc(con_cache, alloc); con = kzalloc(sizeof(*con), alloc);
if (!con) if (!con)
return NULL; return NULL;
r = nodeid_hash(nodeid);
hlist_add_head(&con->list, &connection_hash[r]);
con->nodeid = nodeid; con->nodeid = nodeid;
mutex_init(&con->sock_mutex); mutex_init(&con->sock_mutex);
INIT_LIST_HEAD(&con->writequeue); INIT_LIST_HEAD(&con->writequeue);
...@@ -233,31 +236,27 @@ static struct connection *__nodeid2con(int nodeid, gfp_t alloc) ...@@ -233,31 +236,27 @@ static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
con->rx_action = zerocon->rx_action; con->rx_action = zerocon->rx_action;
} }
r = nodeid_hash(nodeid);
spin_lock(&connections_lock);
hlist_add_head_rcu(&con->list, &connection_hash[r]);
spin_unlock(&connections_lock);
return con; return con;
} }
/* Loop round all connections */ /* Loop round all connections */
static void foreach_conn(void (*conn_func)(struct connection *c)) static void foreach_conn(void (*conn_func)(struct connection *c))
{ {
int i; int i, idx;
struct hlist_node *n;
struct connection *con; struct connection *con;
idx = srcu_read_lock(&connections_srcu);
for (i = 0; i < CONN_HASH_SIZE; i++) { for (i = 0; i < CONN_HASH_SIZE; i++) {
hlist_for_each_entry_safe(con, n, &connection_hash[i], list) hlist_for_each_entry_rcu(con, &connection_hash[i], list)
conn_func(con); conn_func(con);
} }
} srcu_read_unlock(&connections_srcu, idx);
static struct connection *nodeid2con(int nodeid, gfp_t allocation)
{
struct connection *con;
mutex_lock(&connections_lock);
con = __nodeid2con(nodeid, allocation);
mutex_unlock(&connections_lock);
return con;
} }
static struct dlm_node_addr *find_node_addr(int nodeid) static struct dlm_node_addr *find_node_addr(int nodeid)
...@@ -792,12 +791,9 @@ static int accept_from_sock(struct connection *con) ...@@ -792,12 +791,9 @@ static int accept_from_sock(struct connection *con)
struct connection *newcon; struct connection *newcon;
struct connection *addcon; struct connection *addcon;
mutex_lock(&connections_lock);
if (!dlm_allow_conn) { if (!dlm_allow_conn) {
mutex_unlock(&connections_lock);
return -1; return -1;
} }
mutex_unlock(&connections_lock);
mutex_lock_nested(&con->sock_mutex, 0); mutex_lock_nested(&con->sock_mutex, 0);
...@@ -847,7 +843,7 @@ static int accept_from_sock(struct connection *con) ...@@ -847,7 +843,7 @@ static int accept_from_sock(struct connection *con)
struct connection *othercon = newcon->othercon; struct connection *othercon = newcon->othercon;
if (!othercon) { if (!othercon) {
othercon = kmem_cache_zalloc(con_cache, GFP_NOFS); othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
if (!othercon) { if (!othercon) {
log_print("failed to allocate incoming socket"); log_print("failed to allocate incoming socket");
mutex_unlock(&newcon->sock_mutex); mutex_unlock(&newcon->sock_mutex);
...@@ -1612,16 +1608,17 @@ static void free_conn(struct connection *con) ...@@ -1612,16 +1608,17 @@ static void free_conn(struct connection *con)
{ {
close_connection(con, true, true, true); close_connection(con, true, true, true);
if (con->othercon) if (con->othercon)
kmem_cache_free(con_cache, con->othercon); kfree_rcu(con->othercon, rcu);
hlist_del(&con->list); spin_lock(&connections_lock);
kmem_cache_free(con_cache, con); hlist_del_rcu(&con->list);
spin_unlock(&connections_lock);
kfree_rcu(con, rcu);
} }
static void work_flush(void) static void work_flush(void)
{ {
int ok; int ok, idx;
int i; int i;
struct hlist_node *n;
struct connection *con; struct connection *con;
do { do {
...@@ -1631,9 +1628,10 @@ static void work_flush(void) ...@@ -1631,9 +1628,10 @@ static void work_flush(void)
flush_workqueue(recv_workqueue); flush_workqueue(recv_workqueue);
if (send_workqueue) if (send_workqueue)
flush_workqueue(send_workqueue); flush_workqueue(send_workqueue);
idx = srcu_read_lock(&connections_srcu);
for (i = 0; i < CONN_HASH_SIZE && ok; i++) { for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
hlist_for_each_entry_safe(con, n, hlist_for_each_entry_rcu(con, &connection_hash[i],
&connection_hash[i], list) { list) {
ok &= test_bit(CF_READ_PENDING, &con->flags); ok &= test_bit(CF_READ_PENDING, &con->flags);
ok &= test_bit(CF_WRITE_PENDING, &con->flags); ok &= test_bit(CF_WRITE_PENDING, &con->flags);
if (con->othercon) { if (con->othercon) {
...@@ -1644,6 +1642,7 @@ static void work_flush(void) ...@@ -1644,6 +1642,7 @@ static void work_flush(void)
} }
} }
} }
srcu_read_unlock(&connections_srcu, idx);
} while (!ok); } while (!ok);
} }
...@@ -1652,9 +1651,7 @@ void dlm_lowcomms_stop(void) ...@@ -1652,9 +1651,7 @@ void dlm_lowcomms_stop(void)
/* Set all the flags to prevent any /* Set all the flags to prevent any
socket activity. socket activity.
*/ */
mutex_lock(&connections_lock);
dlm_allow_conn = 0; dlm_allow_conn = 0;
mutex_unlock(&connections_lock);
if (recv_workqueue) if (recv_workqueue)
flush_workqueue(recv_workqueue); flush_workqueue(recv_workqueue);
...@@ -1666,8 +1663,6 @@ void dlm_lowcomms_stop(void) ...@@ -1666,8 +1663,6 @@ void dlm_lowcomms_stop(void)
clean_writequeues(); clean_writequeues();
foreach_conn(free_conn); foreach_conn(free_conn);
work_stop(); work_stop();
kmem_cache_destroy(con_cache);
} }
int dlm_lowcomms_start(void) int dlm_lowcomms_start(void)
...@@ -1686,16 +1681,9 @@ int dlm_lowcomms_start(void) ...@@ -1686,16 +1681,9 @@ int dlm_lowcomms_start(void)
goto fail; goto fail;
} }
error = -ENOMEM;
con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
__alignof__(struct connection), 0,
NULL);
if (!con_cache)
goto fail;
error = work_start(); error = work_start();
if (error) if (error)
goto fail_destroy; goto fail;
dlm_allow_conn = 1; dlm_allow_conn = 1;
...@@ -1714,10 +1702,8 @@ int dlm_lowcomms_start(void) ...@@ -1714,10 +1702,8 @@ int dlm_lowcomms_start(void)
con = nodeid2con(0,0); con = nodeid2con(0,0);
if (con) { if (con) {
close_connection(con, false, true, true); close_connection(con, false, true, true);
kmem_cache_free(con_cache, con); kfree_rcu(con, rcu);
} }
fail_destroy:
kmem_cache_destroy(con_cache);
fail: fail:
return error; return error;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment