Commit c7e86acf authored by David Howells's avatar David Howells Committed by David S. Miller

rxrpc: Fix lockup due to no error backoff after ack transmit error

If the network becomes (partially) unavailable, say by disabling IPv6, the
background ACK transmission routine can get itself into a tizzy by
proposing immediate ACK retransmission.  Since we're in the call event
processor, that happens immediately without returning to the workqueue
manager.

The condition should clear after a while when either the network comes back
or the call times out.

Fix this by:

 (1) When re-proposing an ACK on failed Tx, don't schedule it immediately.
     This will allow a certain amount of time to elapse before we try
     again.

 (2) Enforce a return to the workqueue manager after a certain number of
     iterations of the call processing loop.

 (3) Add a backoff delay that increases the delay on deferred ACKs by a
     jiffy per failed transmission to a limit of HZ.  The backoff delay is
     cleared on a successful return from kernel_sendmsg().

 (4) Cancel calls immediately if the opening sendmsg fails.  The layer
     above can arrange retransmission or rotate to another server.

Fixes: 248f219c ("rxrpc: Rewrite the data and ack handling code")
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 284fb78e
...@@ -611,6 +611,7 @@ struct rxrpc_call { ...@@ -611,6 +611,7 @@ struct rxrpc_call {
* not hard-ACK'd packet follows this. * not hard-ACK'd packet follows this.
*/ */
rxrpc_seq_t tx_top; /* Highest Tx slot allocated. */ rxrpc_seq_t tx_top; /* Highest Tx slot allocated. */
u16 tx_backoff; /* Delay to insert due to Tx failure */
/* TCP-style slow-start congestion control [RFC5681]. Since the SMSS /* TCP-style slow-start congestion control [RFC5681]. Since the SMSS
* is fixed, we keep these numbers in terms of segments (ie. DATA * is fixed, we keep these numbers in terms of segments (ie. DATA
......
...@@ -123,6 +123,7 @@ static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason, ...@@ -123,6 +123,7 @@ static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
else else
ack_at = expiry; ack_at = expiry;
ack_at += READ_ONCE(call->tx_backoff);
ack_at += now; ack_at += now;
if (time_before(ack_at, call->ack_at)) { if (time_before(ack_at, call->ack_at)) {
WRITE_ONCE(call->ack_at, ack_at); WRITE_ONCE(call->ack_at, ack_at);
...@@ -311,6 +312,7 @@ void rxrpc_process_call(struct work_struct *work) ...@@ -311,6 +312,7 @@ void rxrpc_process_call(struct work_struct *work)
container_of(work, struct rxrpc_call, processor); container_of(work, struct rxrpc_call, processor);
rxrpc_serial_t *send_ack; rxrpc_serial_t *send_ack;
unsigned long now, next, t; unsigned long now, next, t;
unsigned int iterations = 0;
rxrpc_see_call(call); rxrpc_see_call(call);
...@@ -319,6 +321,11 @@ void rxrpc_process_call(struct work_struct *work) ...@@ -319,6 +321,11 @@ void rxrpc_process_call(struct work_struct *work)
call->debug_id, rxrpc_call_states[call->state], call->events); call->debug_id, rxrpc_call_states[call->state], call->events);
recheck_state: recheck_state:
/* Limit the number of times we do this before returning to the manager */
iterations++;
if (iterations > 5)
goto requeue;
if (test_and_clear_bit(RXRPC_CALL_EV_ABORT, &call->events)) { if (test_and_clear_bit(RXRPC_CALL_EV_ABORT, &call->events)) {
rxrpc_send_abort_packet(call); rxrpc_send_abort_packet(call);
goto recheck_state; goto recheck_state;
...@@ -447,13 +454,16 @@ void rxrpc_process_call(struct work_struct *work) ...@@ -447,13 +454,16 @@ void rxrpc_process_call(struct work_struct *work)
rxrpc_reduce_call_timer(call, next, now, rxrpc_timer_restart); rxrpc_reduce_call_timer(call, next, now, rxrpc_timer_restart);
/* other events may have been raised since we started checking */ /* other events may have been raised since we started checking */
if (call->events && call->state < RXRPC_CALL_COMPLETE) { if (call->events && call->state < RXRPC_CALL_COMPLETE)
__rxrpc_queue_call(call); goto requeue;
goto out;
}
out_put: out_put:
rxrpc_put_call(call, rxrpc_call_put); rxrpc_put_call(call, rxrpc_call_put);
out: out:
_leave(""); _leave("");
return;
requeue:
__rxrpc_queue_call(call);
goto out;
} }
...@@ -34,6 +34,21 @@ struct rxrpc_abort_buffer { ...@@ -34,6 +34,21 @@ struct rxrpc_abort_buffer {
static const char rxrpc_keepalive_string[] = ""; static const char rxrpc_keepalive_string[] = "";
/*
* Increase Tx backoff on transmission failure and clear it on success.
*/
static void rxrpc_tx_backoff(struct rxrpc_call *call, int ret)
{
if (ret < 0) {
u16 tx_backoff = READ_ONCE(call->tx_backoff);
if (tx_backoff < HZ)
WRITE_ONCE(call->tx_backoff, tx_backoff + 1);
} else {
WRITE_ONCE(call->tx_backoff, 0);
}
}
/* /*
* Arrange for a keepalive ping a certain time after we last transmitted. This * Arrange for a keepalive ping a certain time after we last transmitted. This
* lets the far side know we're still interested in this call and helps keep * lets the far side know we're still interested in this call and helps keep
...@@ -210,6 +225,7 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping, ...@@ -210,6 +225,7 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping,
else else
trace_rxrpc_tx_packet(call->debug_id, &pkt->whdr, trace_rxrpc_tx_packet(call->debug_id, &pkt->whdr,
rxrpc_tx_point_call_ack); rxrpc_tx_point_call_ack);
rxrpc_tx_backoff(call, ret);
if (call->state < RXRPC_CALL_COMPLETE) { if (call->state < RXRPC_CALL_COMPLETE) {
if (ret < 0) { if (ret < 0) {
...@@ -218,7 +234,7 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping, ...@@ -218,7 +234,7 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping,
rxrpc_propose_ACK(call, pkt->ack.reason, rxrpc_propose_ACK(call, pkt->ack.reason,
ntohs(pkt->ack.maxSkew), ntohs(pkt->ack.maxSkew),
ntohl(pkt->ack.serial), ntohl(pkt->ack.serial),
true, true, false, true,
rxrpc_propose_ack_retry_tx); rxrpc_propose_ack_retry_tx);
} else { } else {
spin_lock_bh(&call->lock); spin_lock_bh(&call->lock);
...@@ -300,7 +316,7 @@ int rxrpc_send_abort_packet(struct rxrpc_call *call) ...@@ -300,7 +316,7 @@ int rxrpc_send_abort_packet(struct rxrpc_call *call)
else else
trace_rxrpc_tx_packet(call->debug_id, &pkt.whdr, trace_rxrpc_tx_packet(call->debug_id, &pkt.whdr,
rxrpc_tx_point_call_abort); rxrpc_tx_point_call_abort);
rxrpc_tx_backoff(call, ret);
rxrpc_put_connection(conn); rxrpc_put_connection(conn);
return ret; return ret;
...@@ -413,6 +429,7 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb, ...@@ -413,6 +429,7 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb,
else else
trace_rxrpc_tx_packet(call->debug_id, &whdr, trace_rxrpc_tx_packet(call->debug_id, &whdr,
rxrpc_tx_point_call_data_nofrag); rxrpc_tx_point_call_data_nofrag);
rxrpc_tx_backoff(call, ret);
if (ret == -EMSGSIZE) if (ret == -EMSGSIZE)
goto send_fragmentable; goto send_fragmentable;
...@@ -445,9 +462,18 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb, ...@@ -445,9 +462,18 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb,
rxrpc_reduce_call_timer(call, expect_rx_by, nowj, rxrpc_reduce_call_timer(call, expect_rx_by, nowj,
rxrpc_timer_set_for_normal); rxrpc_timer_set_for_normal);
} }
}
rxrpc_set_keepalive(call); rxrpc_set_keepalive(call);
} else {
/* Cancel the call if the initial transmission fails,
* particularly if that's due to network routing issues that
* aren't going away anytime soon. The layer above can arrange
* the retransmission.
*/
if (!test_and_set_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags))
rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
RX_USER_ABORT, ret);
}
_leave(" = %d [%u]", ret, call->peer->maxdata); _leave(" = %d [%u]", ret, call->peer->maxdata);
return ret; return ret;
...@@ -506,6 +532,7 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb, ...@@ -506,6 +532,7 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb,
else else
trace_rxrpc_tx_packet(call->debug_id, &whdr, trace_rxrpc_tx_packet(call->debug_id, &whdr,
rxrpc_tx_point_call_data_frag); rxrpc_tx_point_call_data_frag);
rxrpc_tx_backoff(call, ret);
up_write(&conn->params.local->defrag_sem); up_write(&conn->params.local->defrag_sem);
goto done; goto done;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment