Commit f2cce89a authored by David Howells's avatar David Howells

rxrpc: Implement a mechanism to send an event notification to a connection

Provide a means by which an event notification can be sent to a connection
through such that the I/O thread can pick it up and handle it rather than
doing it in a separate workqueue.

This is then used to move the deferred final ACK of a call into the I/O
thread rather than a separate work queue as part of the drive to do all
transmission from the I/O thread.
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
parent 03fc55ad
...@@ -111,7 +111,7 @@ ...@@ -111,7 +111,7 @@
EM(rxrpc_conn_get_call_input, "GET inp-call") \ EM(rxrpc_conn_get_call_input, "GET inp-call") \
EM(rxrpc_conn_get_conn_input, "GET inp-conn") \ EM(rxrpc_conn_get_conn_input, "GET inp-conn") \
EM(rxrpc_conn_get_idle, "GET idle ") \ EM(rxrpc_conn_get_idle, "GET idle ") \
EM(rxrpc_conn_get_poke, "GET poke ") \ EM(rxrpc_conn_get_poke_timer, "GET poke ") \
EM(rxrpc_conn_get_service_conn, "GET svc-conn") \ EM(rxrpc_conn_get_service_conn, "GET svc-conn") \
EM(rxrpc_conn_new_client, "NEW client ") \ EM(rxrpc_conn_new_client, "NEW client ") \
EM(rxrpc_conn_new_service, "NEW service ") \ EM(rxrpc_conn_new_service, "NEW service ") \
...@@ -126,10 +126,9 @@ ...@@ -126,10 +126,9 @@
EM(rxrpc_conn_put_service_reaped, "PUT svc-reap") \ EM(rxrpc_conn_put_service_reaped, "PUT svc-reap") \
EM(rxrpc_conn_put_unbundle, "PUT unbundle") \ EM(rxrpc_conn_put_unbundle, "PUT unbundle") \
EM(rxrpc_conn_put_unidle, "PUT unidle ") \ EM(rxrpc_conn_put_unidle, "PUT unidle ") \
EM(rxrpc_conn_put_work, "PUT work ") \
EM(rxrpc_conn_queue_challenge, "QUE chall ") \ EM(rxrpc_conn_queue_challenge, "QUE chall ") \
EM(rxrpc_conn_queue_retry_work, "QUE retry-wk") \
EM(rxrpc_conn_queue_rx_work, "QUE rx-work ") \ EM(rxrpc_conn_queue_rx_work, "QUE rx-work ") \
EM(rxrpc_conn_queue_timer, "QUE timer ") \
EM(rxrpc_conn_see_new_service_conn, "SEE new-svc ") \ EM(rxrpc_conn_see_new_service_conn, "SEE new-svc ") \
EM(rxrpc_conn_see_reap_service, "SEE reap-svc") \ EM(rxrpc_conn_see_reap_service, "SEE reap-svc") \
E_(rxrpc_conn_see_work, "SEE work ") E_(rxrpc_conn_see_work, "SEE work ")
......
...@@ -202,6 +202,7 @@ struct rxrpc_host_header { ...@@ -202,6 +202,7 @@ struct rxrpc_host_header {
* - max 48 bytes (struct sk_buff::cb) * - max 48 bytes (struct sk_buff::cb)
*/ */
struct rxrpc_skb_priv { struct rxrpc_skb_priv {
struct rxrpc_connection *conn; /* Connection referred to (poke packet) */
u16 offset; /* Offset of data */ u16 offset; /* Offset of data */
u16 len; /* Length of data */ u16 len; /* Length of data */
u8 flags; u8 flags;
...@@ -292,6 +293,7 @@ struct rxrpc_local { ...@@ -292,6 +293,7 @@ struct rxrpc_local {
struct rxrpc_sock __rcu *service; /* Service(s) listening on this endpoint */ struct rxrpc_sock __rcu *service; /* Service(s) listening on this endpoint */
struct rw_semaphore defrag_sem; /* control re-enablement of IP DF bit */ struct rw_semaphore defrag_sem; /* control re-enablement of IP DF bit */
struct sk_buff_head rx_queue; /* Received packets */ struct sk_buff_head rx_queue; /* Received packets */
struct list_head conn_attend_q; /* Conns requiring immediate attention */
struct list_head call_attend_q; /* Calls requiring immediate attention */ struct list_head call_attend_q; /* Calls requiring immediate attention */
struct rb_root client_bundles; /* Client connection bundles by socket params */ struct rb_root client_bundles; /* Client connection bundles by socket params */
spinlock_t client_bundles_lock; /* Lock for client_bundles */ spinlock_t client_bundles_lock; /* Lock for client_bundles */
...@@ -441,6 +443,7 @@ struct rxrpc_connection { ...@@ -441,6 +443,7 @@ struct rxrpc_connection {
struct rxrpc_peer *peer; /* Remote endpoint */ struct rxrpc_peer *peer; /* Remote endpoint */
struct rxrpc_net *rxnet; /* Network namespace to which call belongs */ struct rxrpc_net *rxnet; /* Network namespace to which call belongs */
struct key *key; /* Security details */ struct key *key; /* Security details */
struct list_head attend_link; /* Link in local->conn_attend_q */
refcount_t ref; refcount_t ref;
atomic_t active; /* Active count for service conns */ atomic_t active; /* Active count for service conns */
...@@ -905,6 +908,7 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn, struct sk_buff *s ...@@ -905,6 +908,7 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn, struct sk_buff *s
void rxrpc_process_connection(struct work_struct *); void rxrpc_process_connection(struct work_struct *);
void rxrpc_process_delayed_final_acks(struct rxrpc_connection *, bool); void rxrpc_process_delayed_final_acks(struct rxrpc_connection *, bool);
int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb); int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb);
void rxrpc_input_conn_event(struct rxrpc_connection *conn, struct sk_buff *skb);
/* /*
* conn_object.c * conn_object.c
...@@ -912,6 +916,7 @@ int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb); ...@@ -912,6 +916,7 @@ int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb);
extern unsigned int rxrpc_connection_expiry; extern unsigned int rxrpc_connection_expiry;
extern unsigned int rxrpc_closed_conn_expiry; extern unsigned int rxrpc_closed_conn_expiry;
void rxrpc_poke_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why);
struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *, gfp_t); struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *, gfp_t);
struct rxrpc_connection *rxrpc_find_client_connection_rcu(struct rxrpc_local *, struct rxrpc_connection *rxrpc_find_client_connection_rcu(struct rxrpc_local *,
struct sockaddr_rxrpc *, struct sockaddr_rxrpc *,
......
...@@ -412,10 +412,6 @@ static void rxrpc_do_process_connection(struct rxrpc_connection *conn) ...@@ -412,10 +412,6 @@ static void rxrpc_do_process_connection(struct rxrpc_connection *conn)
if (test_and_clear_bit(RXRPC_CONN_EV_CHALLENGE, &conn->events)) if (test_and_clear_bit(RXRPC_CONN_EV_CHALLENGE, &conn->events))
rxrpc_secure_connection(conn); rxrpc_secure_connection(conn);
/* Process delayed ACKs whose time has come. */
if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK)
rxrpc_process_delayed_final_acks(conn, false);
/* go through the conn-level event packets, releasing the ref on this /* go through the conn-level event packets, releasing the ref on this
* connection that each one has when we've finished with it */ * connection that each one has when we've finished with it */
while ((skb = skb_dequeue(&conn->rx_queue))) { while ((skb = skb_dequeue(&conn->rx_queue))) {
...@@ -515,3 +511,13 @@ int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb) ...@@ -515,3 +511,13 @@ int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb)
return -EPROTO; return -EPROTO;
} }
} }
/*
* Input a connection event.
*/
void rxrpc_input_conn_event(struct rxrpc_connection *conn, struct sk_buff *skb)
{
/* Process delayed ACKs whose time has come. */
if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK)
rxrpc_process_delayed_final_acks(conn, false);
}
...@@ -23,12 +23,30 @@ static void rxrpc_clean_up_connection(struct work_struct *work); ...@@ -23,12 +23,30 @@ static void rxrpc_clean_up_connection(struct work_struct *work);
static void rxrpc_set_service_reap_timer(struct rxrpc_net *rxnet, static void rxrpc_set_service_reap_timer(struct rxrpc_net *rxnet,
unsigned long reap_at); unsigned long reap_at);
void rxrpc_poke_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why)
{
struct rxrpc_local *local = conn->local;
bool busy;
if (WARN_ON_ONCE(!local))
return;
spin_lock_bh(&local->lock);
busy = !list_empty(&conn->attend_link);
if (!busy) {
rxrpc_get_connection(conn, why);
list_add_tail(&conn->attend_link, &local->conn_attend_q);
}
spin_unlock_bh(&local->lock);
rxrpc_wake_up_io_thread(local);
}
static void rxrpc_connection_timer(struct timer_list *timer) static void rxrpc_connection_timer(struct timer_list *timer)
{ {
struct rxrpc_connection *conn = struct rxrpc_connection *conn =
container_of(timer, struct rxrpc_connection, timer); container_of(timer, struct rxrpc_connection, timer);
rxrpc_queue_conn(conn, rxrpc_conn_queue_timer); rxrpc_poke_conn(conn, rxrpc_conn_get_poke_timer);
} }
/* /*
......
...@@ -421,6 +421,7 @@ static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn, ...@@ -421,6 +421,7 @@ static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
*/ */
int rxrpc_io_thread(void *data) int rxrpc_io_thread(void *data)
{ {
struct rxrpc_connection *conn;
struct sk_buff_head rx_queue; struct sk_buff_head rx_queue;
struct rxrpc_local *local = data; struct rxrpc_local *local = data;
struct rxrpc_call *call; struct rxrpc_call *call;
...@@ -436,6 +437,20 @@ int rxrpc_io_thread(void *data) ...@@ -436,6 +437,20 @@ int rxrpc_io_thread(void *data)
for (;;) { for (;;) {
rxrpc_inc_stat(local->rxnet, stat_io_loop); rxrpc_inc_stat(local->rxnet, stat_io_loop);
/* Deal with connections that want immediate attention. */
conn = list_first_entry_or_null(&local->conn_attend_q,
struct rxrpc_connection,
attend_link);
if (conn) {
spin_lock_bh(&local->lock);
list_del_init(&conn->attend_link);
spin_unlock_bh(&local->lock);
rxrpc_input_conn_event(conn, NULL);
rxrpc_put_connection(conn, rxrpc_conn_put_poke);
continue;
}
/* Deal with calls that want immediate attention. */ /* Deal with calls that want immediate attention. */
if ((call = list_first_entry_or_null(&local->call_attend_q, if ((call = list_first_entry_or_null(&local->call_attend_q,
struct rxrpc_call, struct rxrpc_call,
...@@ -463,6 +478,7 @@ int rxrpc_io_thread(void *data) ...@@ -463,6 +478,7 @@ int rxrpc_io_thread(void *data)
rxrpc_input_error(local, skb); rxrpc_input_error(local, skb);
rxrpc_free_skb(skb, rxrpc_skb_put_error_report); rxrpc_free_skb(skb, rxrpc_skb_put_error_report);
break; break;
break;
default: default:
WARN_ON_ONCE(1); WARN_ON_ONCE(1);
rxrpc_free_skb(skb, rxrpc_skb_put_unknown); rxrpc_free_skb(skb, rxrpc_skb_put_unknown);
...@@ -481,7 +497,8 @@ int rxrpc_io_thread(void *data) ...@@ -481,7 +497,8 @@ int rxrpc_io_thread(void *data)
set_current_state(TASK_INTERRUPTIBLE); set_current_state(TASK_INTERRUPTIBLE);
should_stop = kthread_should_stop(); should_stop = kthread_should_stop();
if (!skb_queue_empty(&local->rx_queue) || if (!skb_queue_empty(&local->rx_queue) ||
!list_empty(&local->call_attend_q)) { !list_empty(&local->call_attend_q) ||
!list_empty(&local->conn_attend_q)) {
__set_current_state(TASK_RUNNING); __set_current_state(TASK_RUNNING);
continue; continue;
} }
......
...@@ -100,6 +100,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct net *net, ...@@ -100,6 +100,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct net *net,
init_rwsem(&local->defrag_sem); init_rwsem(&local->defrag_sem);
init_completion(&local->io_thread_ready); init_completion(&local->io_thread_ready);
skb_queue_head_init(&local->rx_queue); skb_queue_head_init(&local->rx_queue);
INIT_LIST_HEAD(&local->conn_attend_q);
INIT_LIST_HEAD(&local->call_attend_q); INIT_LIST_HEAD(&local->call_attend_q);
local->client_bundles = RB_ROOT; local->client_bundles = RB_ROOT;
spin_lock_init(&local->client_bundles_lock); spin_lock_init(&local->client_bundles_lock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment