Commit 001c1122 authored by David Howells's avatar David Howells

rxrpc: Maintain an extra ref on a conn for the cache list

Overhaul the usage count accounting for the rxrpc_connection struct to make
it easier to implement RCU access from the data_ready handler.

The problem is that currently we're using a lock to prevent the garbage
collector from trying to clean up a connection that we're contemplating
unidling.  We could just stick incoming packets on the connection we find,
but we've then got a problem that we may race when dispatching a work item
to process it as we need to give that a ref to prevent the rxrpc_connection
struct from disappearing in the meantime.

Further, incoming packets may get discarded if attached to an
rxrpc_connection struct that is going away.  Whilst this is not a total
disaster - the client will presumably resend - it would delay processing of
the call.  This would affect the AFS client filesystem's service manager
operation.

To this end:

 (1) We now maintain an extra count on the connection usage count whilst it
     is on the connection list.  This mean it is not in use when its
     refcount is 1.

 (2) When trying to reuse an old connection, we only increment the refcount
     if it is greater than 0.  If it is 0, we replace it in the tree with a
     new candidate connection.

 (3) Two connection flags are added to indicate whether or not a connection
     is in the local's client connection tree (used by sendmsg) or the
     peer's service connection tree (used by data_ready).  This makes sure
     that we don't try and remove a connection if it got replaced.

     The flags are tested under lock with the removal operation to prevent
     the reaper from killing the rxrpc_connection struct whilst someone
     else is trying to effect a replacement.

     This could probably be alleviated by using memory barriers between the
     flag set/test and the rb_tree ops.  The rb_tree op would still need to
     be under the lock, however.

 (4) When trying to reap an old connection, we try to flip the usage count
     from 1 to 0.  If it's not 1 at that point, then it must've come back
     to life temporarily and we ignore it.
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
parent d991b4a3
...@@ -258,6 +258,8 @@ struct rxrpc_conn_parameters { ...@@ -258,6 +258,8 @@ struct rxrpc_conn_parameters {
*/ */
enum rxrpc_conn_flag { enum rxrpc_conn_flag {
RXRPC_CONN_HAS_IDR, /* Has a client conn ID assigned */ RXRPC_CONN_HAS_IDR, /* Has a client conn ID assigned */
RXRPC_CONN_IN_SERVICE_CONNS, /* Conn is in peer->service_conns */
RXRPC_CONN_IN_CLIENT_CONNS, /* Conn is in local->client_conns */
}; };
/* /*
...@@ -544,10 +546,10 @@ void __exit rxrpc_destroy_all_calls(void); ...@@ -544,10 +546,10 @@ void __exit rxrpc_destroy_all_calls(void);
*/ */
extern struct idr rxrpc_client_conn_ids; extern struct idr rxrpc_client_conn_ids;
void rxrpc_put_client_connection_id(struct rxrpc_connection *);
void rxrpc_destroy_client_conn_ids(void); void rxrpc_destroy_client_conn_ids(void);
int rxrpc_connect_call(struct rxrpc_call *, struct rxrpc_conn_parameters *, int rxrpc_connect_call(struct rxrpc_call *, struct rxrpc_conn_parameters *,
struct sockaddr_rxrpc *, gfp_t); struct sockaddr_rxrpc *, gfp_t);
void rxrpc_unpublish_client_conn(struct rxrpc_connection *);
/* /*
* conn_event.c * conn_event.c
...@@ -609,6 +611,7 @@ static inline void rxrpc_queue_conn(struct rxrpc_connection *conn) ...@@ -609,6 +611,7 @@ static inline void rxrpc_queue_conn(struct rxrpc_connection *conn)
struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *, struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *,
struct sockaddr_rxrpc *, struct sockaddr_rxrpc *,
struct sk_buff *); struct sk_buff *);
void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
/* /*
* input.c * input.c
......
...@@ -84,7 +84,7 @@ static int rxrpc_get_client_connection_id(struct rxrpc_connection *conn, ...@@ -84,7 +84,7 @@ static int rxrpc_get_client_connection_id(struct rxrpc_connection *conn,
/* /*
* Release a connection ID for a client connection from the global pool. * Release a connection ID for a client connection from the global pool.
*/ */
void rxrpc_put_client_connection_id(struct rxrpc_connection *conn) static void rxrpc_put_client_connection_id(struct rxrpc_connection *conn)
{ {
if (test_bit(RXRPC_CONN_HAS_IDR, &conn->flags)) { if (test_bit(RXRPC_CONN_HAS_IDR, &conn->flags)) {
spin_lock(&rxrpc_conn_id_lock); spin_lock(&rxrpc_conn_id_lock);
...@@ -278,12 +278,13 @@ int rxrpc_connect_call(struct rxrpc_call *call, ...@@ -278,12 +278,13 @@ int rxrpc_connect_call(struct rxrpc_call *call,
* lock before dropping the client conn lock. * lock before dropping the client conn lock.
*/ */
_debug("new conn"); _debug("new conn");
set_bit(RXRPC_CONN_IN_CLIENT_CONNS, &candidate->flags);
rb_link_node(&candidate->client_node, parent, pp);
rb_insert_color(&candidate->client_node, &local->client_conns);
attached:
conn = candidate; conn = candidate;
candidate = NULL; candidate = NULL;
rb_link_node(&conn->client_node, parent, pp);
rb_insert_color(&conn->client_node, &local->client_conns);
atomic_set(&conn->avail_chans, RXRPC_MAXCALLS - 1); atomic_set(&conn->avail_chans, RXRPC_MAXCALLS - 1);
spin_lock(&conn->channel_lock); spin_lock(&conn->channel_lock);
spin_unlock(&local->client_conns_lock); spin_unlock(&local->client_conns_lock);
...@@ -307,13 +308,22 @@ int rxrpc_connect_call(struct rxrpc_call *call, ...@@ -307,13 +308,22 @@ int rxrpc_connect_call(struct rxrpc_call *call,
_leave(" = %p {u=%d}", conn, atomic_read(&conn->usage)); _leave(" = %p {u=%d}", conn, atomic_read(&conn->usage));
return 0; return 0;
/* We found a suitable connection already in existence. Discard any /* We found a potentially suitable connection already in existence. If
* candidate we may have allocated, and try to get a channel on this * we can reuse it (ie. its usage count hasn't been reduced to 0 by the
* one. * reaper), discard any candidate we may have allocated, and try to get
* a channel on this one, otherwise we have to replace it.
*/ */
found_extant_conn: found_extant_conn:
_debug("found conn"); _debug("found conn");
rxrpc_get_connection(conn); if (!rxrpc_get_connection_maybe(conn)) {
set_bit(RXRPC_CONN_IN_CLIENT_CONNS, &candidate->flags);
rb_replace_node(&conn->client_node,
&candidate->client_node,
&local->client_conns);
clear_bit(RXRPC_CONN_IN_CLIENT_CONNS, &conn->flags);
goto attached;
}
spin_unlock(&local->client_conns_lock); spin_unlock(&local->client_conns_lock);
rxrpc_put_connection(candidate); rxrpc_put_connection(candidate);
...@@ -357,3 +367,19 @@ int rxrpc_connect_call(struct rxrpc_call *call, ...@@ -357,3 +367,19 @@ int rxrpc_connect_call(struct rxrpc_call *call,
_leave(" = -ERESTARTSYS"); _leave(" = -ERESTARTSYS");
return -ERESTARTSYS; return -ERESTARTSYS;
} }
/*
* Remove a client connection from the local endpoint's tree, thereby removing
* it as a target for reuse for new client calls.
*/
void rxrpc_unpublish_client_conn(struct rxrpc_connection *conn)
{
struct rxrpc_local *local = conn->params.local;
spin_lock(&local->client_conns_lock);
if (test_and_clear_bit(RXRPC_CONN_IN_CLIENT_CONNS, &conn->flags))
rb_erase(&conn->client_node, &local->client_conns);
spin_unlock(&local->client_conns_lock);
rxrpc_put_client_connection_id(conn);
}
...@@ -49,7 +49,10 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp) ...@@ -49,7 +49,10 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
skb_queue_head_init(&conn->rx_queue); skb_queue_head_init(&conn->rx_queue);
conn->security = &rxrpc_no_security; conn->security = &rxrpc_no_security;
spin_lock_init(&conn->state_lock); spin_lock_init(&conn->state_lock);
atomic_set(&conn->usage, 1); /* We maintain an extra ref on the connection whilst it is
* on the rxrpc_connections list.
*/
atomic_set(&conn->usage, 2);
conn->debug_id = atomic_inc_return(&rxrpc_debug_id); conn->debug_id = atomic_inc_return(&rxrpc_debug_id);
atomic_set(&conn->avail_chans, RXRPC_MAXCALLS); atomic_set(&conn->avail_chans, RXRPC_MAXCALLS);
conn->size_align = 4; conn->size_align = 4;
...@@ -111,7 +114,7 @@ struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_local *local, ...@@ -111,7 +114,7 @@ struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_local *local,
return NULL; return NULL;
found: found:
rxrpc_get_connection(conn); conn = rxrpc_get_connection_maybe(conn);
read_unlock_bh(&peer->conn_lock); read_unlock_bh(&peer->conn_lock);
_leave(" = %p", conn); _leave(" = %p", conn);
return conn; return conn;
...@@ -173,10 +176,10 @@ void rxrpc_put_connection(struct rxrpc_connection *conn) ...@@ -173,10 +176,10 @@ void rxrpc_put_connection(struct rxrpc_connection *conn)
_enter("%p{u=%d,d=%d}", _enter("%p{u=%d,d=%d}",
conn, atomic_read(&conn->usage), conn->debug_id); conn, atomic_read(&conn->usage), conn->debug_id);
ASSERTCMP(atomic_read(&conn->usage), >, 0); ASSERTCMP(atomic_read(&conn->usage), >, 1);
conn->put_time = ktime_get_seconds(); conn->put_time = ktime_get_seconds();
if (atomic_dec_and_test(&conn->usage)) { if (atomic_dec_return(&conn->usage) == 1) {
_debug("zombie"); _debug("zombie");
rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0); rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
} }
...@@ -216,59 +219,41 @@ static void rxrpc_destroy_connection(struct rcu_head *rcu) ...@@ -216,59 +219,41 @@ static void rxrpc_destroy_connection(struct rcu_head *rcu)
static void rxrpc_connection_reaper(struct work_struct *work) static void rxrpc_connection_reaper(struct work_struct *work)
{ {
struct rxrpc_connection *conn, *_p; struct rxrpc_connection *conn, *_p;
struct rxrpc_peer *peer; unsigned long reap_older_than, earliest, put_time, now;
unsigned long now, earliest, reap_time;
LIST_HEAD(graveyard); LIST_HEAD(graveyard);
_enter(""); _enter("");
now = ktime_get_seconds(); now = ktime_get_seconds();
reap_older_than = now - rxrpc_connection_expiry;
earliest = ULONG_MAX; earliest = ULONG_MAX;
write_lock(&rxrpc_connection_lock); write_lock(&rxrpc_connection_lock);
list_for_each_entry_safe(conn, _p, &rxrpc_connections, link) { list_for_each_entry_safe(conn, _p, &rxrpc_connections, link) {
_debug("reap CONN %d { u=%d,t=%ld }", ASSERTCMP(atomic_read(&conn->usage), >, 0);
conn->debug_id, atomic_read(&conn->usage), if (likely(atomic_read(&conn->usage) > 1))
(long) now - (long) conn->put_time);
if (likely(atomic_read(&conn->usage) > 0))
continue; continue;
if (rxrpc_conn_is_client(conn)) { put_time = READ_ONCE(conn->put_time);
struct rxrpc_local *local = conn->params.local; if (time_after(put_time, reap_older_than)) {
spin_lock(&local->client_conns_lock); if (time_before(put_time, earliest))
reap_time = conn->put_time + rxrpc_connection_expiry; earliest = put_time;
continue;
if (atomic_read(&conn->usage) > 0) {
;
} else if (reap_time <= now) {
list_move_tail(&conn->link, &graveyard);
rxrpc_put_client_connection_id(conn);
rb_erase(&conn->client_node,
&local->client_conns);
} else if (reap_time < earliest) {
earliest = reap_time;
}
spin_unlock(&local->client_conns_lock);
} else {
peer = conn->params.peer;
write_lock_bh(&peer->conn_lock);
reap_time = conn->put_time + rxrpc_connection_expiry;
if (atomic_read(&conn->usage) > 0) {
;
} else if (reap_time <= now) {
list_move_tail(&conn->link, &graveyard);
rb_erase(&conn->service_node,
&peer->service_conns);
} else if (reap_time < earliest) {
earliest = reap_time;
}
write_unlock_bh(&peer->conn_lock);
} }
/* The usage count sits at 1 whilst the object is unused on the
* list; we reduce that to 0 to make the object unavailable.
*/
if (atomic_cmpxchg(&conn->usage, 1, 0) != 1)
continue;
if (rxrpc_conn_is_client(conn))
rxrpc_unpublish_client_conn(conn);
else
rxrpc_unpublish_service_conn(conn);
list_move_tail(&conn->link, &graveyard);
} }
write_unlock(&rxrpc_connection_lock); write_unlock(&rxrpc_connection_lock);
...@@ -279,7 +264,6 @@ static void rxrpc_connection_reaper(struct work_struct *work) ...@@ -279,7 +264,6 @@ static void rxrpc_connection_reaper(struct work_struct *work)
(earliest - now) * HZ); (earliest - now) * HZ);
} }
/* then destroy all those pulled out */
while (!list_empty(&graveyard)) { while (!list_empty(&graveyard)) {
conn = list_entry(graveyard.next, struct rxrpc_connection, conn = list_entry(graveyard.next, struct rxrpc_connection,
link); link);
......
...@@ -104,10 +104,12 @@ struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local, ...@@ -104,10 +104,12 @@ struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
} }
/* we can now add the new candidate to the list */ /* we can now add the new candidate to the list */
set_bit(RXRPC_CONN_IN_SERVICE_CONNS, &candidate->flags);
rb_link_node(&candidate->service_node, p, pp);
rb_insert_color(&candidate->service_node, &peer->service_conns);
attached:
conn = candidate; conn = candidate;
candidate = NULL; candidate = NULL;
rb_link_node(&conn->service_node, p, pp);
rb_insert_color(&conn->service_node, &peer->service_conns);
rxrpc_get_peer(peer); rxrpc_get_peer(peer);
rxrpc_get_local(local); rxrpc_get_local(local);
...@@ -128,11 +130,19 @@ struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local, ...@@ -128,11 +130,19 @@ struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
/* we found the connection in the list immediately */ /* we found the connection in the list immediately */
found_extant_connection: found_extant_connection:
if (!rxrpc_get_connection_maybe(conn)) {
set_bit(RXRPC_CONN_IN_SERVICE_CONNS, &candidate->flags);
rb_replace_node(&conn->service_node,
&candidate->service_node,
&peer->service_conns);
clear_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags);
goto attached;
}
if (sp->hdr.securityIndex != conn->security_ix) { if (sp->hdr.securityIndex != conn->security_ix) {
read_unlock_bh(&peer->conn_lock); read_unlock_bh(&peer->conn_lock);
goto security_mismatch; goto security_mismatch_put;
} }
rxrpc_get_connection(conn);
read_unlock_bh(&peer->conn_lock); read_unlock_bh(&peer->conn_lock);
goto success; goto success;
...@@ -147,8 +157,24 @@ struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local, ...@@ -147,8 +157,24 @@ struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
kfree(candidate); kfree(candidate);
goto success; goto success;
security_mismatch_put:
rxrpc_put_connection(conn);
security_mismatch: security_mismatch:
kfree(candidate); kfree(candidate);
_leave(" = -EKEYREJECTED"); _leave(" = -EKEYREJECTED");
return ERR_PTR(-EKEYREJECTED); return ERR_PTR(-EKEYREJECTED);
} }
/*
* Remove the service connection from the peer's tree, thereby removing it as a
* target for incoming packets.
*/
void rxrpc_unpublish_service_conn(struct rxrpc_connection *conn)
{
struct rxrpc_peer *peer = conn->params.peer;
write_lock_bh(&peer->conn_lock);
if (test_and_clear_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags))
rb_erase(&conn->service_node, &peer->service_conns);
write_unlock_bh(&peer->conn_lock);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment