Commit f3344303 authored by David Howells's avatar David Howells

rxrpc: Fix error distribution

Fix error distribution by immediately delivering the errors to all the
affected calls rather than deferring them to a worker thread.  The problem
with the latter is that retries and things can happen in the meantime when we
want to stop that sooner.

To this end:

 (1) Stop the error distributor from removing calls from the error_targets
     list so that peer->lock isn't needed to synchronise against other adds
     and removals.

 (2) Require the peer's error_targets list to be accessed with RCU, thereby
     avoiding the need to take peer->lock over distribution.

 (3) Don't attempt to affect a call's state if it is already marked complete.
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
parent 37a675e7
...@@ -56,7 +56,6 @@ enum rxrpc_peer_trace { ...@@ -56,7 +56,6 @@ enum rxrpc_peer_trace {
rxrpc_peer_new, rxrpc_peer_new,
rxrpc_peer_processing, rxrpc_peer_processing,
rxrpc_peer_put, rxrpc_peer_put,
rxrpc_peer_queued_error,
}; };
enum rxrpc_conn_trace { enum rxrpc_conn_trace {
...@@ -257,8 +256,7 @@ enum rxrpc_tx_point { ...@@ -257,8 +256,7 @@ enum rxrpc_tx_point {
EM(rxrpc_peer_got, "GOT") \ EM(rxrpc_peer_got, "GOT") \
EM(rxrpc_peer_new, "NEW") \ EM(rxrpc_peer_new, "NEW") \
EM(rxrpc_peer_processing, "PRO") \ EM(rxrpc_peer_processing, "PRO") \
EM(rxrpc_peer_put, "PUT") \ E_(rxrpc_peer_put, "PUT")
E_(rxrpc_peer_queued_error, "QER")
#define rxrpc_conn_traces \ #define rxrpc_conn_traces \
EM(rxrpc_conn_got, "GOT") \ EM(rxrpc_conn_got, "GOT") \
......
...@@ -288,7 +288,6 @@ struct rxrpc_peer { ...@@ -288,7 +288,6 @@ struct rxrpc_peer {
struct hlist_node hash_link; struct hlist_node hash_link;
struct rxrpc_local *local; struct rxrpc_local *local;
struct hlist_head error_targets; /* targets for net error distribution */ struct hlist_head error_targets; /* targets for net error distribution */
struct work_struct error_distributor;
struct rb_root service_conns; /* Service connections */ struct rb_root service_conns; /* Service connections */
struct list_head keepalive_link; /* Link in net->peer_keepalive[] */ struct list_head keepalive_link; /* Link in net->peer_keepalive[] */
time64_t last_tx_at; /* Last time packet sent here */ time64_t last_tx_at; /* Last time packet sent here */
...@@ -299,8 +298,6 @@ struct rxrpc_peer { ...@@ -299,8 +298,6 @@ struct rxrpc_peer {
unsigned int maxdata; /* data size (MTU - hdrsize) */ unsigned int maxdata; /* data size (MTU - hdrsize) */
unsigned short hdrsize; /* header size (IP + UDP + RxRPC) */ unsigned short hdrsize; /* header size (IP + UDP + RxRPC) */
int debug_id; /* debug ID for printks */ int debug_id; /* debug ID for printks */
int error_report; /* Net (+0) or local (+1000000) to distribute */
#define RXRPC_LOCAL_ERROR_OFFSET 1000000
struct sockaddr_rxrpc srx; /* remote address */ struct sockaddr_rxrpc srx; /* remote address */
/* calculated RTT cache */ /* calculated RTT cache */
...@@ -1039,7 +1036,6 @@ void rxrpc_send_keepalive(struct rxrpc_peer *); ...@@ -1039,7 +1036,6 @@ void rxrpc_send_keepalive(struct rxrpc_peer *);
* peer_event.c * peer_event.c
*/ */
void rxrpc_error_report(struct sock *); void rxrpc_error_report(struct sock *);
void rxrpc_peer_error_distributor(struct work_struct *);
void rxrpc_peer_add_rtt(struct rxrpc_call *, enum rxrpc_rtt_rx_trace, void rxrpc_peer_add_rtt(struct rxrpc_call *, enum rxrpc_rtt_rx_trace,
rxrpc_serial_t, rxrpc_serial_t, ktime_t, ktime_t); rxrpc_serial_t, rxrpc_serial_t, ktime_t, ktime_t);
void rxrpc_peer_keepalive_worker(struct work_struct *); void rxrpc_peer_keepalive_worker(struct work_struct *);
...@@ -1057,7 +1053,6 @@ void rxrpc_destroy_all_peers(struct rxrpc_net *); ...@@ -1057,7 +1053,6 @@ void rxrpc_destroy_all_peers(struct rxrpc_net *);
struct rxrpc_peer *rxrpc_get_peer(struct rxrpc_peer *); struct rxrpc_peer *rxrpc_get_peer(struct rxrpc_peer *);
struct rxrpc_peer *rxrpc_get_peer_maybe(struct rxrpc_peer *); struct rxrpc_peer *rxrpc_get_peer_maybe(struct rxrpc_peer *);
void rxrpc_put_peer(struct rxrpc_peer *); void rxrpc_put_peer(struct rxrpc_peer *);
void __rxrpc_queue_peer_error(struct rxrpc_peer *);
/* /*
* proc.c * proc.c
......
...@@ -400,7 +400,7 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx, ...@@ -400,7 +400,7 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
rcu_assign_pointer(conn->channels[chan].call, call); rcu_assign_pointer(conn->channels[chan].call, call);
spin_lock(&conn->params.peer->lock); spin_lock(&conn->params.peer->lock);
hlist_add_head(&call->error_link, &conn->params.peer->error_targets); hlist_add_head_rcu(&call->error_link, &conn->params.peer->error_targets);
spin_unlock(&conn->params.peer->lock); spin_unlock(&conn->params.peer->lock);
_net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id); _net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);
......
...@@ -710,7 +710,7 @@ int rxrpc_connect_call(struct rxrpc_call *call, ...@@ -710,7 +710,7 @@ int rxrpc_connect_call(struct rxrpc_call *call,
} }
spin_lock_bh(&call->conn->params.peer->lock); spin_lock_bh(&call->conn->params.peer->lock);
hlist_add_head(&call->error_link, hlist_add_head_rcu(&call->error_link,
&call->conn->params.peer->error_targets); &call->conn->params.peer->error_targets);
spin_unlock_bh(&call->conn->params.peer->lock); spin_unlock_bh(&call->conn->params.peer->lock);
......
...@@ -216,7 +216,7 @@ void rxrpc_disconnect_call(struct rxrpc_call *call) ...@@ -216,7 +216,7 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)
call->peer->cong_cwnd = call->cong_cwnd; call->peer->cong_cwnd = call->cong_cwnd;
spin_lock_bh(&conn->params.peer->lock); spin_lock_bh(&conn->params.peer->lock);
hlist_del_init(&call->error_link); hlist_del_rcu(&call->error_link);
spin_unlock_bh(&conn->params.peer->lock); spin_unlock_bh(&conn->params.peer->lock);
if (rxrpc_is_client_call(call)) if (rxrpc_is_client_call(call))
......
...@@ -23,6 +23,8 @@ ...@@ -23,6 +23,8 @@
#include "ar-internal.h" #include "ar-internal.h"
static void rxrpc_store_error(struct rxrpc_peer *, struct sock_exterr_skb *); static void rxrpc_store_error(struct rxrpc_peer *, struct sock_exterr_skb *);
static void rxrpc_distribute_error(struct rxrpc_peer *, int,
enum rxrpc_call_completion);
/* /*
* Find the peer associated with an ICMP packet. * Find the peer associated with an ICMP packet.
...@@ -194,8 +196,6 @@ void rxrpc_error_report(struct sock *sk) ...@@ -194,8 +196,6 @@ void rxrpc_error_report(struct sock *sk)
rcu_read_unlock(); rcu_read_unlock();
rxrpc_free_skb(skb, rxrpc_skb_rx_freed); rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
/* The ref we obtained is passed off to the work item */
__rxrpc_queue_peer_error(peer);
_leave(""); _leave("");
} }
...@@ -205,6 +205,7 @@ void rxrpc_error_report(struct sock *sk) ...@@ -205,6 +205,7 @@ void rxrpc_error_report(struct sock *sk)
static void rxrpc_store_error(struct rxrpc_peer *peer, static void rxrpc_store_error(struct rxrpc_peer *peer,
struct sock_exterr_skb *serr) struct sock_exterr_skb *serr)
{ {
enum rxrpc_call_completion compl = RXRPC_CALL_NETWORK_ERROR;
struct sock_extended_err *ee; struct sock_extended_err *ee;
int err; int err;
...@@ -255,7 +256,7 @@ static void rxrpc_store_error(struct rxrpc_peer *peer, ...@@ -255,7 +256,7 @@ static void rxrpc_store_error(struct rxrpc_peer *peer,
case SO_EE_ORIGIN_NONE: case SO_EE_ORIGIN_NONE:
case SO_EE_ORIGIN_LOCAL: case SO_EE_ORIGIN_LOCAL:
_proto("Rx Received local error { error=%d }", err); _proto("Rx Received local error { error=%d }", err);
err += RXRPC_LOCAL_ERROR_OFFSET; compl = RXRPC_CALL_LOCAL_ERROR;
break; break;
case SO_EE_ORIGIN_ICMP6: case SO_EE_ORIGIN_ICMP6:
...@@ -264,48 +265,23 @@ static void rxrpc_store_error(struct rxrpc_peer *peer, ...@@ -264,48 +265,23 @@ static void rxrpc_store_error(struct rxrpc_peer *peer,
break; break;
} }
peer->error_report = err; rxrpc_distribute_error(peer, err, compl);
} }
/* /*
* Distribute an error that occurred on a peer * Distribute an error that occurred on a peer.
*/ */
void rxrpc_peer_error_distributor(struct work_struct *work) static void rxrpc_distribute_error(struct rxrpc_peer *peer, int error,
enum rxrpc_call_completion compl)
{ {
struct rxrpc_peer *peer =
container_of(work, struct rxrpc_peer, error_distributor);
struct rxrpc_call *call; struct rxrpc_call *call;
enum rxrpc_call_completion compl;
int error;
_enter("");
error = READ_ONCE(peer->error_report);
if (error < RXRPC_LOCAL_ERROR_OFFSET) {
compl = RXRPC_CALL_NETWORK_ERROR;
} else {
compl = RXRPC_CALL_LOCAL_ERROR;
error -= RXRPC_LOCAL_ERROR_OFFSET;
}
_debug("ISSUE ERROR %s %d", rxrpc_call_completions[compl], error);
spin_lock_bh(&peer->lock); hlist_for_each_entry_rcu(call, &peer->error_targets, error_link) {
while (!hlist_empty(&peer->error_targets)) {
call = hlist_entry(peer->error_targets.first,
struct rxrpc_call, error_link);
hlist_del_init(&call->error_link);
rxrpc_see_call(call); rxrpc_see_call(call);
if (call->state < RXRPC_CALL_COMPLETE &&
if (rxrpc_set_call_completion(call, compl, 0, -error)) rxrpc_set_call_completion(call, compl, 0, -error))
rxrpc_notify_socket(call); rxrpc_notify_socket(call);
} }
spin_unlock_bh(&peer->lock);
rxrpc_put_peer(peer);
_leave("");
} }
/* /*
......
...@@ -220,8 +220,6 @@ struct rxrpc_peer *rxrpc_alloc_peer(struct rxrpc_local *local, gfp_t gfp) ...@@ -220,8 +220,6 @@ struct rxrpc_peer *rxrpc_alloc_peer(struct rxrpc_local *local, gfp_t gfp)
atomic_set(&peer->usage, 1); atomic_set(&peer->usage, 1);
peer->local = local; peer->local = local;
INIT_HLIST_HEAD(&peer->error_targets); INIT_HLIST_HEAD(&peer->error_targets);
INIT_WORK(&peer->error_distributor,
&rxrpc_peer_error_distributor);
peer->service_conns = RB_ROOT; peer->service_conns = RB_ROOT;
seqlock_init(&peer->service_conn_lock); seqlock_init(&peer->service_conn_lock);
spin_lock_init(&peer->lock); spin_lock_init(&peer->lock);
...@@ -402,21 +400,6 @@ struct rxrpc_peer *rxrpc_get_peer_maybe(struct rxrpc_peer *peer) ...@@ -402,21 +400,6 @@ struct rxrpc_peer *rxrpc_get_peer_maybe(struct rxrpc_peer *peer)
return peer; return peer;
} }
/*
* Queue a peer record. This passes the caller's ref to the workqueue.
*/
void __rxrpc_queue_peer_error(struct rxrpc_peer *peer)
{
const void *here = __builtin_return_address(0);
int n;
n = atomic_read(&peer->usage);
if (rxrpc_queue_work(&peer->error_distributor))
trace_rxrpc_peer(peer, rxrpc_peer_queued_error, n, here);
else
rxrpc_put_peer(peer);
}
/* /*
* Discard a peer record. * Discard a peer record.
*/ */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment