Commit 0d6bf319 authored by David Howells's avatar David Howells

rxrpc: Move the client conn cache management to the I/O thread

Move the management of the client connection cache to the I/O thread rather
than managing it from the namespace as an aggregate across all the local
endpoints within the namespace.

This will allow a load of locking to be got rid of in a future patch as
only the I/O thread will be looking at the this.

The downside is that the total number of cached connections on the system
can get higher because the limit is now per-local rather than per-netns.
We can, however, keep the number of client conns in use across the entire
netfs and use that to reduce the expiration time of idle connection.
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
parent 96b4059f
...@@ -76,13 +76,7 @@ struct rxrpc_net { ...@@ -76,13 +76,7 @@ struct rxrpc_net {
bool live; bool live;
bool kill_all_client_conns;
atomic_t nr_client_conns; atomic_t nr_client_conns;
spinlock_t client_conn_cache_lock; /* Lock for ->*_client_conns */
struct mutex client_conn_discard_lock; /* Prevent multiple discarders */
struct list_head idle_client_conns;
struct work_struct client_conn_reaper;
struct timer_list client_conn_reap_timer;
struct hlist_head local_endpoints; struct hlist_head local_endpoints;
struct mutex local_mutex; /* Lock for ->local_endpoints */ struct mutex local_mutex; /* Lock for ->local_endpoints */
...@@ -294,8 +288,16 @@ struct rxrpc_local { ...@@ -294,8 +288,16 @@ struct rxrpc_local {
struct sk_buff_head rx_queue; /* Received packets */ struct sk_buff_head rx_queue; /* Received packets */
struct list_head conn_attend_q; /* Conns requiring immediate attention */ struct list_head conn_attend_q; /* Conns requiring immediate attention */
struct list_head call_attend_q; /* Calls requiring immediate attention */ struct list_head call_attend_q; /* Calls requiring immediate attention */
struct rb_root client_bundles; /* Client connection bundles by socket params */ struct rb_root client_bundles; /* Client connection bundles by socket params */
spinlock_t client_bundles_lock; /* Lock for client_bundles */ spinlock_t client_bundles_lock; /* Lock for client_bundles */
bool kill_all_client_conns;
spinlock_t client_conn_cache_lock; /* Lock for ->*_client_conns */
struct list_head idle_client_conns;
struct timer_list client_conn_reap_timer;
unsigned long client_conn_flags;
#define RXRPC_CLIENT_CONN_REAP_TIMER 0 /* The client conn reap timer expired */
spinlock_t lock; /* access lock */ spinlock_t lock; /* access lock */
rwlock_t services_lock; /* lock for services list */ rwlock_t services_lock; /* lock for services list */
int debug_id; /* debug ID for printks */ int debug_id; /* debug ID for printks */
...@@ -946,8 +948,7 @@ void rxrpc_expose_client_call(struct rxrpc_call *); ...@@ -946,8 +948,7 @@ void rxrpc_expose_client_call(struct rxrpc_call *);
void rxrpc_disconnect_client_call(struct rxrpc_bundle *, struct rxrpc_call *); void rxrpc_disconnect_client_call(struct rxrpc_bundle *, struct rxrpc_call *);
void rxrpc_deactivate_bundle(struct rxrpc_bundle *bundle); void rxrpc_deactivate_bundle(struct rxrpc_bundle *bundle);
void rxrpc_put_client_conn(struct rxrpc_connection *, enum rxrpc_conn_trace); void rxrpc_put_client_conn(struct rxrpc_connection *, enum rxrpc_conn_trace);
void rxrpc_discard_expired_client_conns(struct work_struct *); void rxrpc_discard_expired_client_conns(struct rxrpc_local *local);
void rxrpc_destroy_all_client_connections(struct rxrpc_net *);
void rxrpc_clean_up_local_conns(struct rxrpc_local *); void rxrpc_clean_up_local_conns(struct rxrpc_local *);
/* /*
......
...@@ -578,17 +578,17 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn, ...@@ -578,17 +578,17 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
*/ */
static void rxrpc_unidle_conn(struct rxrpc_bundle *bundle, struct rxrpc_connection *conn) static void rxrpc_unidle_conn(struct rxrpc_bundle *bundle, struct rxrpc_connection *conn)
{ {
struct rxrpc_net *rxnet = bundle->local->rxnet; struct rxrpc_local *local = bundle->local;
bool drop_ref; bool drop_ref;
if (!list_empty(&conn->cache_link)) { if (!list_empty(&conn->cache_link)) {
drop_ref = false; drop_ref = false;
spin_lock(&rxnet->client_conn_cache_lock); spin_lock(&local->client_conn_cache_lock);
if (!list_empty(&conn->cache_link)) { if (!list_empty(&conn->cache_link)) {
list_del_init(&conn->cache_link); list_del_init(&conn->cache_link);
drop_ref = true; drop_ref = true;
} }
spin_unlock(&rxnet->client_conn_cache_lock); spin_unlock(&local->client_conn_cache_lock);
if (drop_ref) if (drop_ref)
rxrpc_put_connection(conn, rxrpc_conn_put_unidle); rxrpc_put_connection(conn, rxrpc_conn_put_unidle);
} }
...@@ -710,14 +710,10 @@ static int rxrpc_wait_for_channel(struct rxrpc_bundle *bundle, ...@@ -710,14 +710,10 @@ static int rxrpc_wait_for_channel(struct rxrpc_bundle *bundle,
int rxrpc_connect_call(struct rxrpc_call *call, gfp_t gfp) int rxrpc_connect_call(struct rxrpc_call *call, gfp_t gfp)
{ {
struct rxrpc_bundle *bundle; struct rxrpc_bundle *bundle;
struct rxrpc_local *local = call->local;
struct rxrpc_net *rxnet = local->rxnet;
int ret = 0; int ret = 0;
_enter("{%d,%lx},", call->debug_id, call->user_call_ID); _enter("{%d,%lx},", call->debug_id, call->user_call_ID);
rxrpc_discard_expired_client_conns(&rxnet->client_conn_reaper);
rxrpc_get_call(call, rxrpc_call_get_io_thread); rxrpc_get_call(call, rxrpc_call_get_io_thread);
bundle = rxrpc_prep_call(call, gfp); bundle = rxrpc_prep_call(call, gfp);
...@@ -787,14 +783,14 @@ void rxrpc_expose_client_call(struct rxrpc_call *call) ...@@ -787,14 +783,14 @@ void rxrpc_expose_client_call(struct rxrpc_call *call)
/* /*
* Set the reap timer. * Set the reap timer.
*/ */
static void rxrpc_set_client_reap_timer(struct rxrpc_net *rxnet) static void rxrpc_set_client_reap_timer(struct rxrpc_local *local)
{ {
if (!rxnet->kill_all_client_conns) { if (!local->kill_all_client_conns) {
unsigned long now = jiffies; unsigned long now = jiffies;
unsigned long reap_at = now + rxrpc_conn_idle_client_expiry; unsigned long reap_at = now + rxrpc_conn_idle_client_expiry;
if (rxnet->live) if (local->rxnet->live)
timer_reduce(&rxnet->client_conn_reap_timer, reap_at); timer_reduce(&local->client_conn_reap_timer, reap_at);
} }
} }
...@@ -805,7 +801,7 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call ...@@ -805,7 +801,7 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
{ {
struct rxrpc_connection *conn; struct rxrpc_connection *conn;
struct rxrpc_channel *chan = NULL; struct rxrpc_channel *chan = NULL;
struct rxrpc_net *rxnet = bundle->local->rxnet; struct rxrpc_local *local = bundle->local;
unsigned int channel; unsigned int channel;
bool may_reuse; bool may_reuse;
u32 cid; u32 cid;
...@@ -895,11 +891,11 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call ...@@ -895,11 +891,11 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
conn->idle_timestamp = jiffies; conn->idle_timestamp = jiffies;
rxrpc_get_connection(conn, rxrpc_conn_get_idle); rxrpc_get_connection(conn, rxrpc_conn_get_idle);
spin_lock(&rxnet->client_conn_cache_lock); spin_lock(&local->client_conn_cache_lock);
list_move_tail(&conn->cache_link, &rxnet->idle_client_conns); list_move_tail(&conn->cache_link, &local->idle_client_conns);
spin_unlock(&rxnet->client_conn_cache_lock); spin_unlock(&local->client_conn_cache_lock);
rxrpc_set_client_reap_timer(rxnet); rxrpc_set_client_reap_timer(local);
} }
out: out:
...@@ -986,42 +982,34 @@ void rxrpc_kill_client_conn(struct rxrpc_connection *conn) ...@@ -986,42 +982,34 @@ void rxrpc_kill_client_conn(struct rxrpc_connection *conn)
* This may be called from conn setup or from a work item so cannot be * This may be called from conn setup or from a work item so cannot be
* considered non-reentrant. * considered non-reentrant.
*/ */
void rxrpc_discard_expired_client_conns(struct work_struct *work) void rxrpc_discard_expired_client_conns(struct rxrpc_local *local)
{ {
struct rxrpc_connection *conn; struct rxrpc_connection *conn;
struct rxrpc_net *rxnet =
container_of(work, struct rxrpc_net, client_conn_reaper);
unsigned long expiry, conn_expires_at, now; unsigned long expiry, conn_expires_at, now;
unsigned int nr_conns; unsigned int nr_conns;
_enter(""); _enter("");
if (list_empty(&rxnet->idle_client_conns)) { if (list_empty(&local->idle_client_conns)) {
_leave(" [empty]"); _leave(" [empty]");
return; return;
} }
/* Don't double up on the discarding */
if (!mutex_trylock(&rxnet->client_conn_discard_lock)) {
_leave(" [already]");
return;
}
/* We keep an estimate of what the number of conns ought to be after /* We keep an estimate of what the number of conns ought to be after
* we've discarded some so that we don't overdo the discarding. * we've discarded some so that we don't overdo the discarding.
*/ */
nr_conns = atomic_read(&rxnet->nr_client_conns); nr_conns = atomic_read(&local->rxnet->nr_client_conns);
next: next:
spin_lock(&rxnet->client_conn_cache_lock); spin_lock(&local->client_conn_cache_lock);
if (list_empty(&rxnet->idle_client_conns)) if (list_empty(&local->idle_client_conns))
goto out; goto out;
conn = list_entry(rxnet->idle_client_conns.next, conn = list_entry(local->idle_client_conns.next,
struct rxrpc_connection, cache_link); struct rxrpc_connection, cache_link);
if (!rxnet->kill_all_client_conns) { if (!local->kill_all_client_conns) {
/* If the number of connections is over the reap limit, we /* If the number of connections is over the reap limit, we
* expedite discard by reducing the expiry timeout. We must, * expedite discard by reducing the expiry timeout. We must,
* however, have at least a short grace period to be able to do * however, have at least a short grace period to be able to do
...@@ -1044,7 +1032,7 @@ void rxrpc_discard_expired_client_conns(struct work_struct *work) ...@@ -1044,7 +1032,7 @@ void rxrpc_discard_expired_client_conns(struct work_struct *work)
trace_rxrpc_client(conn, -1, rxrpc_client_discard); trace_rxrpc_client(conn, -1, rxrpc_client_discard);
list_del_init(&conn->cache_link); list_del_init(&conn->cache_link);
spin_unlock(&rxnet->client_conn_cache_lock); spin_unlock(&local->client_conn_cache_lock);
rxrpc_unbundle_conn(conn); rxrpc_unbundle_conn(conn);
/* Drop the ->cache_link ref */ /* Drop the ->cache_link ref */
...@@ -1062,32 +1050,11 @@ void rxrpc_discard_expired_client_conns(struct work_struct *work) ...@@ -1062,32 +1050,11 @@ void rxrpc_discard_expired_client_conns(struct work_struct *work)
* then things get messier. * then things get messier.
*/ */
_debug("not yet"); _debug("not yet");
if (!rxnet->kill_all_client_conns) if (!local->kill_all_client_conns)
timer_reduce(&rxnet->client_conn_reap_timer, conn_expires_at); timer_reduce(&local->client_conn_reap_timer, conn_expires_at);
out: out:
spin_unlock(&rxnet->client_conn_cache_lock); spin_unlock(&local->client_conn_cache_lock);
mutex_unlock(&rxnet->client_conn_discard_lock);
_leave("");
}
/*
* Preemptively destroy all the client connection records rather than waiting
* for them to time out
*/
void rxrpc_destroy_all_client_connections(struct rxrpc_net *rxnet)
{
_enter("");
spin_lock(&rxnet->client_conn_cache_lock);
rxnet->kill_all_client_conns = true;
spin_unlock(&rxnet->client_conn_cache_lock);
del_timer_sync(&rxnet->client_conn_reap_timer);
if (!rxrpc_queue_work(&rxnet->client_conn_reaper))
_debug("destroy: queue failed");
_leave(""); _leave("");
} }
...@@ -1097,14 +1064,19 @@ void rxrpc_destroy_all_client_connections(struct rxrpc_net *rxnet) ...@@ -1097,14 +1064,19 @@ void rxrpc_destroy_all_client_connections(struct rxrpc_net *rxnet)
void rxrpc_clean_up_local_conns(struct rxrpc_local *local) void rxrpc_clean_up_local_conns(struct rxrpc_local *local)
{ {
struct rxrpc_connection *conn, *tmp; struct rxrpc_connection *conn, *tmp;
struct rxrpc_net *rxnet = local->rxnet;
LIST_HEAD(graveyard); LIST_HEAD(graveyard);
_enter(""); _enter("");
spin_lock(&rxnet->client_conn_cache_lock); spin_lock(&local->client_conn_cache_lock);
local->kill_all_client_conns = true;
spin_unlock(&local->client_conn_cache_lock);
del_timer_sync(&local->client_conn_reap_timer);
spin_lock(&local->client_conn_cache_lock);
list_for_each_entry_safe(conn, tmp, &rxnet->idle_client_conns, list_for_each_entry_safe(conn, tmp, &local->idle_client_conns,
cache_link) { cache_link) {
if (conn->local == local) { if (conn->local == local) {
atomic_dec(&conn->active); atomic_dec(&conn->active);
...@@ -1113,7 +1085,7 @@ void rxrpc_clean_up_local_conns(struct rxrpc_local *local) ...@@ -1113,7 +1085,7 @@ void rxrpc_clean_up_local_conns(struct rxrpc_local *local)
} }
} }
spin_unlock(&rxnet->client_conn_cache_lock); spin_unlock(&local->client_conn_cache_lock);
while (!list_empty(&graveyard)) { while (!list_empty(&graveyard)) {
conn = list_entry(graveyard.next, conn = list_entry(graveyard.next,
......
...@@ -470,7 +470,6 @@ void rxrpc_destroy_all_connections(struct rxrpc_net *rxnet) ...@@ -470,7 +470,6 @@ void rxrpc_destroy_all_connections(struct rxrpc_net *rxnet)
_enter(""); _enter("");
atomic_dec(&rxnet->nr_conns); atomic_dec(&rxnet->nr_conns);
rxrpc_destroy_all_client_connections(rxnet);
del_timer_sync(&rxnet->service_conn_reap_timer); del_timer_sync(&rxnet->service_conn_reap_timer);
rxrpc_queue_work(&rxnet->service_conn_reaper); rxrpc_queue_work(&rxnet->service_conn_reaper);
......
...@@ -435,6 +435,10 @@ int rxrpc_io_thread(void *data) ...@@ -435,6 +435,10 @@ int rxrpc_io_thread(void *data)
continue; continue;
} }
if (test_and_clear_bit(RXRPC_CLIENT_CONN_REAP_TIMER,
&local->client_conn_flags))
rxrpc_discard_expired_client_conns(local);
/* Deal with calls that want immediate attention. */ /* Deal with calls that want immediate attention. */
if ((call = list_first_entry_or_null(&local->call_attend_q, if ((call = list_first_entry_or_null(&local->call_attend_q,
struct rxrpc_call, struct rxrpc_call,
......
...@@ -82,6 +82,16 @@ static long rxrpc_local_cmp_key(const struct rxrpc_local *local, ...@@ -82,6 +82,16 @@ static long rxrpc_local_cmp_key(const struct rxrpc_local *local,
} }
} }
static void rxrpc_client_conn_reap_timeout(struct timer_list *timer)
{
struct rxrpc_local *local =
container_of(timer, struct rxrpc_local, client_conn_reap_timer);
if (local->kill_all_client_conns &&
test_and_set_bit(RXRPC_CLIENT_CONN_REAP_TIMER, &local->client_conn_flags))
rxrpc_wake_up_io_thread(local);
}
/* /*
* Allocate a new local endpoint. * Allocate a new local endpoint.
*/ */
...@@ -103,8 +113,15 @@ static struct rxrpc_local *rxrpc_alloc_local(struct net *net, ...@@ -103,8 +113,15 @@ static struct rxrpc_local *rxrpc_alloc_local(struct net *net,
skb_queue_head_init(&local->rx_queue); skb_queue_head_init(&local->rx_queue);
INIT_LIST_HEAD(&local->conn_attend_q); INIT_LIST_HEAD(&local->conn_attend_q);
INIT_LIST_HEAD(&local->call_attend_q); INIT_LIST_HEAD(&local->call_attend_q);
local->client_bundles = RB_ROOT; local->client_bundles = RB_ROOT;
spin_lock_init(&local->client_bundles_lock); spin_lock_init(&local->client_bundles_lock);
local->kill_all_client_conns = false;
spin_lock_init(&local->client_conn_cache_lock);
INIT_LIST_HEAD(&local->idle_client_conns);
timer_setup(&local->client_conn_reap_timer,
rxrpc_client_conn_reap_timeout, 0);
spin_lock_init(&local->lock); spin_lock_init(&local->lock);
rwlock_init(&local->services_lock); rwlock_init(&local->services_lock);
local->debug_id = atomic_inc_return(&rxrpc_debug_id); local->debug_id = atomic_inc_return(&rxrpc_debug_id);
......
...@@ -10,15 +10,6 @@ ...@@ -10,15 +10,6 @@
unsigned int rxrpc_net_id; unsigned int rxrpc_net_id;
static void rxrpc_client_conn_reap_timeout(struct timer_list *timer)
{
struct rxrpc_net *rxnet =
container_of(timer, struct rxrpc_net, client_conn_reap_timer);
if (rxnet->live)
rxrpc_queue_work(&rxnet->client_conn_reaper);
}
static void rxrpc_service_conn_reap_timeout(struct timer_list *timer) static void rxrpc_service_conn_reap_timeout(struct timer_list *timer)
{ {
struct rxrpc_net *rxnet = struct rxrpc_net *rxnet =
...@@ -63,14 +54,6 @@ static __net_init int rxrpc_init_net(struct net *net) ...@@ -63,14 +54,6 @@ static __net_init int rxrpc_init_net(struct net *net)
rxrpc_service_conn_reap_timeout, 0); rxrpc_service_conn_reap_timeout, 0);
atomic_set(&rxnet->nr_client_conns, 0); atomic_set(&rxnet->nr_client_conns, 0);
rxnet->kill_all_client_conns = false;
spin_lock_init(&rxnet->client_conn_cache_lock);
mutex_init(&rxnet->client_conn_discard_lock);
INIT_LIST_HEAD(&rxnet->idle_client_conns);
INIT_WORK(&rxnet->client_conn_reaper,
rxrpc_discard_expired_client_conns);
timer_setup(&rxnet->client_conn_reap_timer,
rxrpc_client_conn_reap_timeout, 0);
INIT_HLIST_HEAD(&rxnet->local_endpoints); INIT_HLIST_HEAD(&rxnet->local_endpoints);
mutex_init(&rxnet->local_mutex); mutex_init(&rxnet->local_mutex);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment