Commit 446b3e14 authored by David Howells's avatar David Howells

rxrpc: Move packet reception processing into I/O thread

Split the packet input handler to make the softirq side just dump the
received packet into the local endpoint receive queue and then call the
remainder of the input function from the I/O thread.
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
parent a275da62
...@@ -36,6 +36,7 @@ struct rxrpc_txbuf; ...@@ -36,6 +36,7 @@ struct rxrpc_txbuf;
* to pass supplementary information. * to pass supplementary information.
*/ */
enum rxrpc_skb_mark { enum rxrpc_skb_mark {
RXRPC_SKB_MARK_PACKET, /* Received packet */
RXRPC_SKB_MARK_REJECT_BUSY, /* Reject with BUSY */ RXRPC_SKB_MARK_REJECT_BUSY, /* Reject with BUSY */
RXRPC_SKB_MARK_REJECT_ABORT, /* Reject with ABORT (code in skb->priority) */ RXRPC_SKB_MARK_REJECT_ABORT, /* Reject with ABORT (code in skb->priority) */
}; };
...@@ -957,7 +958,7 @@ void rxrpc_input_implicit_end_call(struct rxrpc_sock *, struct rxrpc_connection ...@@ -957,7 +958,7 @@ void rxrpc_input_implicit_end_call(struct rxrpc_sock *, struct rxrpc_connection
/* /*
* io_thread.c * io_thread.c
*/ */
int rxrpc_input_packet(struct sock *, struct sk_buff *); int rxrpc_encap_rcv(struct sock *, struct sk_buff *);
int rxrpc_io_thread(void *data); int rxrpc_io_thread(void *data);
static inline void rxrpc_wake_up_io_thread(struct rxrpc_local *local) static inline void rxrpc_wake_up_io_thread(struct rxrpc_local *local)
{ {
......
...@@ -83,7 +83,7 @@ void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason, ...@@ -83,7 +83,7 @@ void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason,
rxrpc_inc_stat(call->rxnet, stat_tx_acks[ack_reason]); rxrpc_inc_stat(call->rxnet, stat_tx_acks[ack_reason]);
txb = rxrpc_alloc_txbuf(call, RXRPC_PACKET_TYPE_ACK, txb = rxrpc_alloc_txbuf(call, RXRPC_PACKET_TYPE_ACK,
in_softirq() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS); rcu_read_lock_held() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS);
if (!txb) { if (!txb) {
kleave(" = -ENOMEM"); kleave(" = -ENOMEM");
return; return;
...@@ -111,7 +111,7 @@ void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason, ...@@ -111,7 +111,7 @@ void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason,
spin_unlock_bh(&local->ack_tx_lock); spin_unlock_bh(&local->ack_tx_lock);
trace_rxrpc_send_ack(call, why, ack_reason, serial); trace_rxrpc_send_ack(call, why, ack_reason, serial);
if (in_task()) { if (!rcu_read_lock_held()) {
rxrpc_transmit_ack_packets(call->peer->local); rxrpc_transmit_ack_packets(call->peer->local);
} else { } else {
rxrpc_get_local(local, rxrpc_local_get_queue); rxrpc_get_local(local, rxrpc_local_get_queue);
......
...@@ -632,7 +632,7 @@ void rxrpc_cleanup_call(struct rxrpc_call *call) ...@@ -632,7 +632,7 @@ void rxrpc_cleanup_call(struct rxrpc_call *call)
del_timer_sync(&call->timer); del_timer_sync(&call->timer);
cancel_work(&call->processor); cancel_work(&call->processor);
if (in_softirq() || work_busy(&call->processor)) if (rcu_read_lock_held() || work_busy(&call->processor))
/* Can't use the rxrpc workqueue as we need to cancel/flush /* Can't use the rxrpc workqueue as we need to cancel/flush
* something that may be running/waiting there. * something that may be running/waiting there.
*/ */
......
...@@ -9,6 +9,34 @@ ...@@ -9,6 +9,34 @@
#include "ar-internal.h" #include "ar-internal.h"
/*
* handle data received on the local endpoint
* - may be called in interrupt context
*
* [!] Note that as this is called from the encap_rcv hook, the socket is not
* held locked by the caller and nothing prevents sk_user_data on the UDP from
* being cleared in the middle of processing this function.
*
* Called with the RCU read lock held from the IP layer via UDP.
*/
int rxrpc_encap_rcv(struct sock *udp_sk, struct sk_buff *skb)
{
struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
if (unlikely(!local)) {
kfree_skb(skb);
return 0;
}
if (skb->tstamp == 0)
skb->tstamp = ktime_get_real();
skb->mark = RXRPC_SKB_MARK_PACKET;
rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv);
skb_queue_tail(&local->rx_queue, skb);
rxrpc_wake_up_io_thread(local);
return 0;
}
/* /*
* post connection-level events to the connection * post connection-level events to the connection
* - this includes challenges, responses, some aborts and call terminal packet * - this includes challenges, responses, some aborts and call terminal packet
...@@ -98,18 +126,10 @@ static bool rxrpc_extract_abort(struct sk_buff *skb) ...@@ -98,18 +126,10 @@ static bool rxrpc_extract_abort(struct sk_buff *skb)
} }
/* /*
* handle data received on the local endpoint * Process packets received on the local endpoint
* - may be called in interrupt context
*
* [!] Note that as this is called from the encap_rcv hook, the socket is not
* held locked by the caller and nothing prevents sk_user_data on the UDP from
* being cleared in the middle of processing this function.
*
* Called with the RCU read lock held from the IP layer via UDP.
*/ */
int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb) static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff *skb)
{ {
struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
struct rxrpc_connection *conn; struct rxrpc_connection *conn;
struct rxrpc_channel *chan; struct rxrpc_channel *chan;
struct rxrpc_call *call = NULL; struct rxrpc_call *call = NULL;
...@@ -118,17 +138,9 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb) ...@@ -118,17 +138,9 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
struct rxrpc_sock *rx = NULL; struct rxrpc_sock *rx = NULL;
unsigned int channel; unsigned int channel;
_enter("%p", udp_sk);
if (unlikely(!local)) {
kfree_skb(skb);
return 0;
}
if (skb->tstamp == 0) if (skb->tstamp == 0)
skb->tstamp = ktime_get_real(); skb->tstamp = ktime_get_real();
rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv);
skb_pull(skb, sizeof(struct udphdr)); skb_pull(skb, sizeof(struct udphdr));
/* The UDP protocol already released all skb resources; /* The UDP protocol already released all skb resources;
...@@ -387,8 +399,17 @@ int rxrpc_io_thread(void *data) ...@@ -387,8 +399,17 @@ int rxrpc_io_thread(void *data)
/* Process received packets and errors. */ /* Process received packets and errors. */
if ((skb = __skb_dequeue(&rx_queue))) { if ((skb = __skb_dequeue(&rx_queue))) {
// TODO: Input packet switch (skb->mark) {
rxrpc_free_skb(skb, rxrpc_skb_put_input); case RXRPC_SKB_MARK_PACKET:
rcu_read_lock();
rxrpc_input_packet(local, skb);
rcu_read_unlock();
break;
default:
WARN_ON_ONCE(1);
rxrpc_free_skb(skb, rxrpc_skb_put_unknown);
break;
}
continue; continue;
} }
......
...@@ -154,7 +154,7 @@ static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net) ...@@ -154,7 +154,7 @@ static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
} }
tuncfg.encap_type = UDP_ENCAP_RXRPC; tuncfg.encap_type = UDP_ENCAP_RXRPC;
tuncfg.encap_rcv = rxrpc_input_packet; tuncfg.encap_rcv = rxrpc_encap_rcv;
tuncfg.encap_err_rcv = rxrpc_encap_err_rcv; tuncfg.encap_err_rcv = rxrpc_encap_err_rcv;
tuncfg.sk_user_data = local; tuncfg.sk_user_data = local;
setup_udp_tunnel_sock(net, local->socket, &tuncfg); setup_udp_tunnel_sock(net, local->socket, &tuncfg);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment