Commit 03fc55ad authored by David Howells's avatar David Howells

rxrpc: Only disconnect calls in the I/O thread

Only perform call disconnection in the I/O thread to reduce the locking
requirement.

This is the first part of a fix for a race that exists between call
connection and call disconnection whereby the data transmission code adds
the call to the peer error distribution list after the call has been
disconnected (say by the rxrpc socket getting closed).

The fix is to complete the process of moving call connection, data
transmission and call disconnection into the I/O thread and thus forcibly
serialising them.

Note that the issue may predate the overhaul to an I/O thread model that
were included in the merge window for v6.2, but the timing is very much
changed by the change given below.

Fixes: cf37b598 ("rxrpc: Move DATA transmission into call processor work item")
Reported-by: syzbot+c22650d2844392afdcfd@syzkaller.appspotmail.com
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
parent a343b174
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
*/ */
#define rxrpc_call_poke_traces \ #define rxrpc_call_poke_traces \
EM(rxrpc_call_poke_abort, "Abort") \ EM(rxrpc_call_poke_abort, "Abort") \
EM(rxrpc_call_poke_complete, "Compl") \
EM(rxrpc_call_poke_error, "Error") \ EM(rxrpc_call_poke_error, "Error") \
EM(rxrpc_call_poke_idle, "Idle") \ EM(rxrpc_call_poke_idle, "Idle") \
EM(rxrpc_call_poke_start, "Start") \ EM(rxrpc_call_poke_start, "Start") \
......
...@@ -484,8 +484,13 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb) ...@@ -484,8 +484,13 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
} }
out: out:
if (call->state == RXRPC_CALL_COMPLETE) if (call->state == RXRPC_CALL_COMPLETE) {
del_timer_sync(&call->timer); del_timer_sync(&call->timer);
if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
rxrpc_disconnect_call(call);
if (call->security)
call->security->free_call_crypto(call);
}
if (call->acks_hard_ack != call->tx_bottom) if (call->acks_hard_ack != call->tx_bottom)
rxrpc_shrink_call_tx_buffer(call); rxrpc_shrink_call_tx_buffer(call);
_leave(""); _leave("");
......
...@@ -50,7 +50,7 @@ void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what) ...@@ -50,7 +50,7 @@ void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what)
struct rxrpc_local *local = call->local; struct rxrpc_local *local = call->local;
bool busy; bool busy;
if (call->state < RXRPC_CALL_COMPLETE) { if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags)) {
spin_lock_bh(&local->lock); spin_lock_bh(&local->lock);
busy = !list_empty(&call->attend_link); busy = !list_empty(&call->attend_link);
trace_rxrpc_poke_call(call, busy, what); trace_rxrpc_poke_call(call, busy, what);
...@@ -533,13 +533,10 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call) ...@@ -533,13 +533,10 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), trace_rxrpc_call(call->debug_id, refcount_read(&call->ref),
call->flags, rxrpc_call_see_release); call->flags, rxrpc_call_see_release);
ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags)) if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
BUG(); BUG();
rxrpc_put_call_slot(call); rxrpc_put_call_slot(call);
del_timer_sync(&call->timer);
/* Make sure we don't get any more notifications */ /* Make sure we don't get any more notifications */
write_lock(&rx->recvmsg_lock); write_lock(&rx->recvmsg_lock);
...@@ -572,10 +569,6 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call) ...@@ -572,10 +569,6 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
_debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn); _debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
if (conn && !test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
rxrpc_disconnect_call(call);
if (call->security)
call->security->free_call_crypto(call);
_leave(""); _leave("");
} }
......
...@@ -997,8 +997,6 @@ void rxrpc_input_call_packet(struct rxrpc_call *call, struct sk_buff *skb) ...@@ -997,8 +997,6 @@ void rxrpc_input_call_packet(struct rxrpc_call *call, struct sk_buff *skb)
*/ */
void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb) void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb)
{ {
struct rxrpc_connection *conn = call->conn;
switch (READ_ONCE(call->state)) { switch (READ_ONCE(call->state)) {
case RXRPC_CALL_SERVER_AWAIT_ACK: case RXRPC_CALL_SERVER_AWAIT_ACK:
rxrpc_call_completed(call); rxrpc_call_completed(call);
...@@ -1012,8 +1010,4 @@ void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb) ...@@ -1012,8 +1010,4 @@ void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb)
} }
rxrpc_input_call_event(call, skb); rxrpc_input_call_event(call, skb);
spin_lock(&conn->bundle->channel_lock);
__rxrpc_disconnect_call(conn, call);
spin_unlock(&conn->bundle->channel_lock);
} }
...@@ -201,6 +201,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial) ...@@ -201,6 +201,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
case RXRPC_CALL_CLIENT_RECV_REPLY: case RXRPC_CALL_CLIENT_RECV_REPLY:
__rxrpc_call_completed(call); __rxrpc_call_completed(call);
write_unlock(&call->state_lock); write_unlock(&call->state_lock);
rxrpc_poke_call(call, rxrpc_call_poke_complete);
break; break;
case RXRPC_CALL_SERVER_RECV_REQUEST: case RXRPC_CALL_SERVER_RECV_REQUEST:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment