Commit a343b174 authored by David Howells's avatar David Howells

rxrpc: Only set/transmit aborts in the I/O thread

Only set the abort call completion state in the I/O thread and only
transmit ABORT packets from there.  rxrpc_abort_call() can then be made to
actually send the packet.

Further, ABORT packets should only be sent if the call has been exposed to
the network (ie. at least one attempted DATA transmission has occurred for
it).
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
parent 30df927b
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
* Declare tracing information enums and their string mappings for display. * Declare tracing information enums and their string mappings for display.
*/ */
#define rxrpc_call_poke_traces \ #define rxrpc_call_poke_traces \
EM(rxrpc_call_poke_abort, "Abort") \
EM(rxrpc_call_poke_error, "Error") \ EM(rxrpc_call_poke_error, "Error") \
EM(rxrpc_call_poke_idle, "Idle") \ EM(rxrpc_call_poke_idle, "Idle") \
EM(rxrpc_call_poke_start, "Start") \ EM(rxrpc_call_poke_start, "Start") \
......
...@@ -625,7 +625,10 @@ struct rxrpc_call { ...@@ -625,7 +625,10 @@ struct rxrpc_call {
unsigned long events; unsigned long events;
spinlock_t notify_lock; /* Kernel notification lock */ spinlock_t notify_lock; /* Kernel notification lock */
rwlock_t state_lock; /* lock for state transition */ rwlock_t state_lock; /* lock for state transition */
u32 abort_code; /* Local/remote abort code */ const char *send_abort_why; /* String indicating why the abort was sent */
s32 send_abort; /* Abort code to be sent */
short send_abort_err; /* Error to be associated with the abort */
s32 abort_code; /* Local/remote abort code */
int error; /* Local error incurred */ int error; /* Local error incurred */
enum rxrpc_call_state state; /* current state of call */ enum rxrpc_call_state state; /* current state of call */
enum rxrpc_call_completion completion; /* Call completion condition */ enum rxrpc_call_completion completion; /* Call completion condition */
...@@ -1146,6 +1149,8 @@ struct key *rxrpc_look_up_server_security(struct rxrpc_connection *, ...@@ -1146,6 +1149,8 @@ struct key *rxrpc_look_up_server_security(struct rxrpc_connection *,
/* /*
* sendmsg.c * sendmsg.c
*/ */
bool rxrpc_propose_abort(struct rxrpc_call *call,
u32 abort_code, int error, const char *why);
int rxrpc_do_sendmsg(struct rxrpc_sock *, struct msghdr *, size_t); int rxrpc_do_sendmsg(struct rxrpc_sock *, struct msghdr *, size_t);
/* /*
......
...@@ -270,9 +270,11 @@ static void rxrpc_decant_prepared_tx(struct rxrpc_call *call) ...@@ -270,9 +270,11 @@ static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
{ {
struct rxrpc_txbuf *txb; struct rxrpc_txbuf *txb;
if (rxrpc_is_client_call(call) && if (!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) if (list_empty(&call->tx_sendmsg))
return;
rxrpc_expose_client_call(call); rxrpc_expose_client_call(call);
}
while ((txb = list_first_entry_or_null(&call->tx_sendmsg, while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
struct rxrpc_txbuf, call_link))) { struct rxrpc_txbuf, call_link))) {
...@@ -336,6 +338,7 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb) ...@@ -336,6 +338,7 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
unsigned long now, next, t; unsigned long now, next, t;
rxrpc_serial_t ackr_serial; rxrpc_serial_t ackr_serial;
bool resend = false, expired = false; bool resend = false, expired = false;
s32 abort_code;
rxrpc_see_call(call, rxrpc_call_see_input); rxrpc_see_call(call, rxrpc_call_see_input);
...@@ -346,6 +349,14 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb) ...@@ -346,6 +349,14 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
if (call->state == RXRPC_CALL_COMPLETE) if (call->state == RXRPC_CALL_COMPLETE)
goto out; goto out;
/* Handle abort request locklessly, vs rxrpc_propose_abort(). */
abort_code = smp_load_acquire(&call->send_abort);
if (abort_code) {
rxrpc_abort_call(call->send_abort_why, call, 0, call->send_abort,
call->send_abort_err);
goto out;
}
if (skb && skb->mark == RXRPC_SKB_MARK_ERROR) if (skb && skb->mark == RXRPC_SKB_MARK_ERROR)
goto out; goto out;
...@@ -433,7 +444,6 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb) ...@@ -433,7 +444,6 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
} else { } else {
rxrpc_abort_call("EXP", call, 0, RX_CALL_TIMEOUT, -ETIME); rxrpc_abort_call("EXP", call, 0, RX_CALL_TIMEOUT, -ETIME);
} }
rxrpc_send_abort_packet(call);
goto out; goto out;
} }
......
...@@ -430,6 +430,8 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx, ...@@ -430,6 +430,8 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
call->state = RXRPC_CALL_SERVER_SECURING; call->state = RXRPC_CALL_SERVER_SECURING;
call->cong_tstamp = skb->tstamp; call->cong_tstamp = skb->tstamp;
__set_bit(RXRPC_CALL_EXPOSED, &call->flags);
spin_lock(&conn->state_lock); spin_lock(&conn->state_lock);
switch (conn->state) { switch (conn->state) {
...@@ -590,7 +592,7 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx) ...@@ -590,7 +592,7 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
call = list_entry(rx->to_be_accepted.next, call = list_entry(rx->to_be_accepted.next,
struct rxrpc_call, accept_link); struct rxrpc_call, accept_link);
list_del(&call->accept_link); list_del(&call->accept_link);
rxrpc_abort_call("SKR", call, 0, RX_CALL_DEAD, -ECONNRESET); rxrpc_propose_abort(call, RX_CALL_DEAD, -ECONNRESET, "SKR");
rxrpc_put_call(call, rxrpc_call_put_release_sock_tba); rxrpc_put_call(call, rxrpc_call_put_release_sock_tba);
} }
...@@ -598,8 +600,7 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx) ...@@ -598,8 +600,7 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
call = list_entry(rx->sock_calls.next, call = list_entry(rx->sock_calls.next,
struct rxrpc_call, sock_link); struct rxrpc_call, sock_link);
rxrpc_get_call(call, rxrpc_call_get_release_sock); rxrpc_get_call(call, rxrpc_call_get_release_sock);
rxrpc_abort_call("SKT", call, 0, RX_CALL_DEAD, -ECONNRESET); rxrpc_propose_abort(call, RX_CALL_DEAD, -ECONNRESET, "SKT");
rxrpc_send_abort_packet(call);
rxrpc_release_call(rx, call); rxrpc_release_call(rx, call);
rxrpc_put_call(call, rxrpc_call_put_release_sock); rxrpc_put_call(call, rxrpc_call_put_release_sock);
} }
......
...@@ -12,8 +12,7 @@ ...@@ -12,8 +12,7 @@
static void rxrpc_proto_abort(const char *why, static void rxrpc_proto_abort(const char *why,
struct rxrpc_call *call, rxrpc_seq_t seq) struct rxrpc_call *call, rxrpc_seq_t seq)
{ {
if (rxrpc_abort_call(why, call, seq, RX_PROTOCOL_ERROR, -EBADMSG)) rxrpc_abort_call(why, call, seq, RX_PROTOCOL_ERROR, -EBADMSG);
rxrpc_send_abort_packet(call);
} }
/* /*
...@@ -1007,8 +1006,7 @@ void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb) ...@@ -1007,8 +1006,7 @@ void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb)
case RXRPC_CALL_COMPLETE: case RXRPC_CALL_COMPLETE:
break; break;
default: default:
if (rxrpc_abort_call("IMP", call, 0, RX_CALL_DEAD, -ESHUTDOWN)) rxrpc_abort_call("IMP", call, 0, RX_CALL_DEAD, -ESHUTDOWN);
rxrpc_send_abort_packet(call);
trace_rxrpc_improper_term(call); trace_rxrpc_improper_term(call);
break; break;
} }
......
...@@ -134,6 +134,8 @@ bool rxrpc_abort_call(const char *why, struct rxrpc_call *call, ...@@ -134,6 +134,8 @@ bool rxrpc_abort_call(const char *why, struct rxrpc_call *call,
write_lock(&call->state_lock); write_lock(&call->state_lock);
ret = __rxrpc_abort_call(why, call, seq, abort_code, error); ret = __rxrpc_abort_call(why, call, seq, abort_code, error);
write_unlock(&call->state_lock); write_unlock(&call->state_lock);
if (ret && test_bit(RXRPC_CALL_EXPOSED, &call->flags))
rxrpc_send_abort_packet(call);
return ret; return ret;
} }
......
...@@ -17,6 +17,26 @@ ...@@ -17,6 +17,26 @@
#include <net/af_rxrpc.h> #include <net/af_rxrpc.h>
#include "ar-internal.h" #include "ar-internal.h"
/*
* Propose an abort to be made in the I/O thread.
*/
bool rxrpc_propose_abort(struct rxrpc_call *call,
u32 abort_code, int error, const char *why)
{
_enter("{%d},%d,%d,%s", call->debug_id, abort_code, error, why);
if (!call->send_abort && call->state < RXRPC_CALL_COMPLETE) {
call->send_abort_why = why;
call->send_abort_err = error;
/* Request abort locklessly vs rxrpc_input_call_event(). */
smp_store_release(&call->send_abort, abort_code);
rxrpc_poke_call(call, rxrpc_call_poke_abort);
return true;
}
return false;
}
/* /*
* Return true if there's sufficient Tx queue space. * Return true if there's sufficient Tx queue space.
*/ */
...@@ -663,9 +683,8 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) ...@@ -663,9 +683,8 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
/* it's too late for this call */ /* it's too late for this call */
ret = -ESHUTDOWN; ret = -ESHUTDOWN;
} else if (p.command == RXRPC_CMD_SEND_ABORT) { } else if (p.command == RXRPC_CMD_SEND_ABORT) {
rxrpc_propose_abort(call, p.abort_code, -ECONNABORTED, "CMD");
ret = 0; ret = 0;
if (rxrpc_abort_call("CMD", call, 0, p.abort_code, -ECONNABORTED))
ret = rxrpc_send_abort_packet(call);
} else if (p.command != RXRPC_CMD_SEND_DATA) { } else if (p.command != RXRPC_CMD_SEND_DATA) {
ret = -EINVAL; ret = -EINVAL;
} else { } else {
...@@ -760,11 +779,7 @@ bool rxrpc_kernel_abort_call(struct socket *sock, struct rxrpc_call *call, ...@@ -760,11 +779,7 @@ bool rxrpc_kernel_abort_call(struct socket *sock, struct rxrpc_call *call,
_enter("{%d},%d,%d,%s", call->debug_id, abort_code, error, why); _enter("{%d},%d,%d,%s", call->debug_id, abort_code, error, why);
mutex_lock(&call->user_mutex); mutex_lock(&call->user_mutex);
aborted = rxrpc_propose_abort(call, abort_code, error, why);
aborted = rxrpc_abort_call(why, call, 0, abort_code, error);
if (aborted)
rxrpc_send_abort_packet(call);
mutex_unlock(&call->user_mutex); mutex_unlock(&call->user_mutex);
return aborted; return aborted;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment