Commit d41b3f5b authored by David Howells's avatar David Howells

rxrpc: Wrap accesses to get call state to put the barrier in one place

Wrap accesses to get the state of a call from outside of the I/O thread in
a single place so that the barrier needed to order wrt the error code and
abort code is in just that place.

Also use a barrier when setting the call state and again when reading the
call state such that the auxiliary completion info (error code, abort code)
can be read without taking a read lock on the call state lock.
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
parent 0b9bb322
...@@ -379,7 +379,7 @@ EXPORT_SYMBOL(rxrpc_kernel_end_call); ...@@ -379,7 +379,7 @@ EXPORT_SYMBOL(rxrpc_kernel_end_call);
bool rxrpc_kernel_check_life(const struct socket *sock, bool rxrpc_kernel_check_life(const struct socket *sock,
const struct rxrpc_call *call) const struct rxrpc_call *call)
{ {
return call->state != RXRPC_CALL_COMPLETE; return !rxrpc_call_is_complete(call);
} }
EXPORT_SYMBOL(rxrpc_kernel_check_life); EXPORT_SYMBOL(rxrpc_kernel_check_life);
......
...@@ -903,6 +903,22 @@ bool __rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq, ...@@ -903,6 +903,22 @@ bool __rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq,
bool rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq, bool rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq,
u32 abort_code, int error, enum rxrpc_abort_reason why); u32 abort_code, int error, enum rxrpc_abort_reason why);
static inline enum rxrpc_call_state rxrpc_call_state(const struct rxrpc_call *call)
{
/* Order read ->state before read ->error. */
return smp_load_acquire(&call->state);
}
static inline bool rxrpc_call_is_complete(const struct rxrpc_call *call)
{
return rxrpc_call_state(call) == RXRPC_CALL_COMPLETE;
}
static inline bool rxrpc_call_has_failed(const struct rxrpc_call *call)
{
return rxrpc_call_is_complete(call) && call->completion != RXRPC_CALL_SUCCEEDED;
}
/* /*
* conn_client.c * conn_client.c
*/ */
......
...@@ -19,7 +19,8 @@ bool __rxrpc_set_call_completion(struct rxrpc_call *call, ...@@ -19,7 +19,8 @@ bool __rxrpc_set_call_completion(struct rxrpc_call *call,
call->abort_code = abort_code; call->abort_code = abort_code;
call->error = error; call->error = error;
call->completion = compl; call->completion = compl;
call->state = RXRPC_CALL_COMPLETE; /* Allow reader of completion state to operate locklessly */
smp_store_release(&call->state, RXRPC_CALL_COMPLETE);
trace_rxrpc_call_complete(call); trace_rxrpc_call_complete(call);
wake_up(&call->waitq); wake_up(&call->waitq);
rxrpc_notify_socket(call); rxrpc_notify_socket(call);
......
...@@ -89,7 +89,7 @@ static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg) ...@@ -89,7 +89,7 @@ static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp); ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
break; break;
default: default:
pr_err("Invalid terminal call state %u\n", call->state); pr_err("Invalid terminal call state %u\n", call->completion);
BUG(); BUG();
break; break;
} }
...@@ -111,7 +111,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial) ...@@ -111,7 +111,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh); trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);
if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) if (rxrpc_call_state(call) == RXRPC_CALL_CLIENT_RECV_REPLY)
rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack); rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
write_lock(&call->state_lock); write_lock(&call->state_lock);
...@@ -210,7 +210,7 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, ...@@ -210,7 +210,7 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
rx_pkt_offset = call->rx_pkt_offset; rx_pkt_offset = call->rx_pkt_offset;
rx_pkt_len = call->rx_pkt_len; rx_pkt_len = call->rx_pkt_len;
if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) { if (rxrpc_call_state(call) >= RXRPC_CALL_SERVER_ACK_REQUEST) {
seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1; seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
ret = 1; ret = 1;
goto done; goto done;
...@@ -416,7 +416,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, ...@@ -416,7 +416,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
msg->msg_namelen = len; msg->msg_namelen = len;
} }
switch (READ_ONCE(call->state)) { switch (rxrpc_call_state(call)) {
case RXRPC_CALL_CLIENT_RECV_REPLY: case RXRPC_CALL_CLIENT_RECV_REPLY:
case RXRPC_CALL_SERVER_RECV_REQUEST: case RXRPC_CALL_SERVER_RECV_REQUEST:
case RXRPC_CALL_SERVER_ACK_REQUEST: case RXRPC_CALL_SERVER_ACK_REQUEST:
...@@ -436,7 +436,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, ...@@ -436,7 +436,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
if (ret < 0) if (ret < 0)
goto error_unlock_call; goto error_unlock_call;
if (call->state == RXRPC_CALL_COMPLETE) { if (rxrpc_call_is_complete(call)) {
ret = rxrpc_recvmsg_term(call, msg); ret = rxrpc_recvmsg_term(call, msg);
if (ret < 0) if (ret < 0)
goto error_unlock_call; goto error_unlock_call;
...@@ -516,7 +516,7 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call, ...@@ -516,7 +516,7 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
mutex_lock(&call->user_mutex); mutex_lock(&call->user_mutex);
switch (READ_ONCE(call->state)) { switch (rxrpc_call_state(call)) {
case RXRPC_CALL_CLIENT_RECV_REPLY: case RXRPC_CALL_CLIENT_RECV_REPLY:
case RXRPC_CALL_SERVER_RECV_REQUEST: case RXRPC_CALL_SERVER_RECV_REQUEST:
case RXRPC_CALL_SERVER_ACK_REQUEST: case RXRPC_CALL_SERVER_ACK_REQUEST:
......
...@@ -25,7 +25,7 @@ bool rxrpc_propose_abort(struct rxrpc_call *call, s32 abort_code, int error, ...@@ -25,7 +25,7 @@ bool rxrpc_propose_abort(struct rxrpc_call *call, s32 abort_code, int error,
{ {
_enter("{%d},%d,%d,%u", call->debug_id, abort_code, error, why); _enter("{%d},%d,%d,%u", call->debug_id, abort_code, error, why);
if (!call->send_abort && call->state < RXRPC_CALL_COMPLETE) { if (!call->send_abort && !rxrpc_call_is_complete(call)) {
call->send_abort_why = why; call->send_abort_why = why;
call->send_abort_err = error; call->send_abort_err = error;
call->send_abort_seq = 0; call->send_abort_seq = 0;
...@@ -60,7 +60,7 @@ static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx, ...@@ -60,7 +60,7 @@ static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
if (rxrpc_check_tx_space(call, NULL)) if (rxrpc_check_tx_space(call, NULL))
return 0; return 0;
if (call->state >= RXRPC_CALL_COMPLETE) if (rxrpc_call_is_complete(call))
return call->error; return call->error;
if (signal_pending(current)) if (signal_pending(current))
...@@ -95,7 +95,7 @@ static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx, ...@@ -95,7 +95,7 @@ static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx,
if (rxrpc_check_tx_space(call, &tx_win)) if (rxrpc_check_tx_space(call, &tx_win))
return 0; return 0;
if (call->state >= RXRPC_CALL_COMPLETE) if (rxrpc_call_is_complete(call))
return call->error; return call->error;
if (timeout == 0 && if (timeout == 0 &&
...@@ -124,7 +124,7 @@ static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx, ...@@ -124,7 +124,7 @@ static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
if (rxrpc_check_tx_space(call, NULL)) if (rxrpc_check_tx_space(call, NULL))
return 0; return 0;
if (call->state >= RXRPC_CALL_COMPLETE) if (rxrpc_call_is_complete(call))
return call->error; return call->error;
trace_rxrpc_txqueue(call, rxrpc_txqueue_wait); trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
...@@ -273,7 +273,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, ...@@ -273,7 +273,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
ret = -EPIPE; ret = -EPIPE;
if (sk->sk_shutdown & SEND_SHUTDOWN) if (sk->sk_shutdown & SEND_SHUTDOWN)
goto maybe_error; goto maybe_error;
state = READ_ONCE(call->state); state = rxrpc_call_state(call);
ret = -ESHUTDOWN; ret = -ESHUTDOWN;
if (state >= RXRPC_CALL_COMPLETE) if (state >= RXRPC_CALL_COMPLETE)
goto maybe_error; goto maybe_error;
...@@ -350,7 +350,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, ...@@ -350,7 +350,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
/* check for the far side aborting the call or a network error /* check for the far side aborting the call or a network error
* occurring */ * occurring */
if (call->state == RXRPC_CALL_COMPLETE) if (rxrpc_call_is_complete(call))
goto call_terminated; goto call_terminated;
/* add the packet to the send queue if it's now full */ /* add the packet to the send queue if it's now full */
...@@ -375,12 +375,9 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, ...@@ -375,12 +375,9 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
success: success:
ret = copied; ret = copied;
if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE) { if (rxrpc_call_is_complete(call) &&
read_lock(&call->state_lock); call->error < 0)
if (call->error < 0)
ret = call->error; ret = call->error;
read_unlock(&call->state_lock);
}
out: out:
call->tx_pending = txb; call->tx_pending = txb;
_leave(" = %d", ret); _leave(" = %d", ret);
...@@ -618,10 +615,10 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) ...@@ -618,10 +615,10 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
return PTR_ERR(call); return PTR_ERR(call);
/* ... and we have the call lock. */ /* ... and we have the call lock. */
ret = 0; ret = 0;
if (READ_ONCE(call->state) == RXRPC_CALL_COMPLETE) if (rxrpc_call_is_complete(call))
goto out_put_unlock; goto out_put_unlock;
} else { } else {
switch (READ_ONCE(call->state)) { switch (rxrpc_call_state(call)) {
case RXRPC_CALL_UNINITIALISED: case RXRPC_CALL_UNINITIALISED:
case RXRPC_CALL_CLIENT_AWAIT_CONN: case RXRPC_CALL_CLIENT_AWAIT_CONN:
case RXRPC_CALL_SERVER_PREALLOC: case RXRPC_CALL_SERVER_PREALLOC:
...@@ -675,7 +672,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len) ...@@ -675,7 +672,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
break; break;
} }
state = READ_ONCE(call->state); state = rxrpc_call_state(call);
_debug("CALL %d USR %lx ST %d on CONN %p", _debug("CALL %d USR %lx ST %d on CONN %p",
call->debug_id, call->user_call_ID, state, call->conn); call->debug_id, call->user_call_ID, state, call->conn);
...@@ -735,7 +732,7 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call, ...@@ -735,7 +732,7 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
_debug("CALL %d USR %lx ST %d on CONN %p", _debug("CALL %d USR %lx ST %d on CONN %p",
call->debug_id, call->user_call_ID, call->state, call->conn); call->debug_id, call->user_call_ID, call->state, call->conn);
switch (READ_ONCE(call->state)) { switch (rxrpc_call_state(call)) {
case RXRPC_CALL_CLIENT_SEND_REQUEST: case RXRPC_CALL_CLIENT_SEND_REQUEST:
case RXRPC_CALL_SERVER_ACK_REQUEST: case RXRPC_CALL_SERVER_ACK_REQUEST:
case RXRPC_CALL_SERVER_SEND_REPLY: case RXRPC_CALL_SERVER_SEND_REPLY:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment