Commit 72f0c6fb authored by David Howells's avatar David Howells

rxrpc: Allocate ACK records at proposal and queue for transmission

Allocate rxrpc_txbuf records for ACKs and put onto a queue for the
transmitter thread to dispatch.
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
parent 02a19356
......@@ -34,7 +34,8 @@
EM(rxrpc_local_new, "NEW") \
EM(rxrpc_local_processing, "PRO") \
EM(rxrpc_local_put, "PUT") \
E_(rxrpc_local_queued, "QUE")
EM(rxrpc_local_queued, "QUE") \
E_(rxrpc_local_tx_ack, "TAK")
#define rxrpc_peer_traces \
EM(rxrpc_peer_got, "GOT") \
......@@ -258,7 +259,9 @@
EM(rxrpc_txbuf_free, "FREE ") \
EM(rxrpc_txbuf_get_trans, "GET TRANS ") \
EM(rxrpc_txbuf_get_retrans, "GET RETRANS") \
EM(rxrpc_txbuf_put_ack_tx, "PUT ACK TX ") \
EM(rxrpc_txbuf_put_cleaned, "PUT CLEANED") \
EM(rxrpc_txbuf_put_nomem, "PUT NOMEM ") \
EM(rxrpc_txbuf_put_rotated, "PUT ROTATED") \
EM(rxrpc_txbuf_put_send_aborted, "PUT SEND-X ") \
EM(rxrpc_txbuf_see_send_more, "SEE SEND+ ") \
......@@ -1095,19 +1098,16 @@ TRACE_EVENT(rxrpc_rx_lose,
TRACE_EVENT(rxrpc_propose_ack,
TP_PROTO(struct rxrpc_call *call, enum rxrpc_propose_ack_trace why,
u8 ack_reason, rxrpc_serial_t serial, bool immediate,
bool background, enum rxrpc_propose_ack_outcome outcome),
u8 ack_reason, rxrpc_serial_t serial,
enum rxrpc_propose_ack_outcome outcome),
TP_ARGS(call, why, ack_reason, serial, immediate, background,
outcome),
TP_ARGS(call, why, ack_reason, serial, outcome),
TP_STRUCT__entry(
__field(unsigned int, call )
__field(enum rxrpc_propose_ack_trace, why )
__field(rxrpc_serial_t, serial )
__field(u8, ack_reason )
__field(bool, immediate )
__field(bool, background )
__field(enum rxrpc_propose_ack_outcome, outcome )
),
......@@ -1116,21 +1116,44 @@ TRACE_EVENT(rxrpc_propose_ack,
__entry->why = why;
__entry->serial = serial;
__entry->ack_reason = ack_reason;
__entry->immediate = immediate;
__entry->background = background;
__entry->outcome = outcome;
),
TP_printk("c=%08x %s %s r=%08x i=%u b=%u%s",
TP_printk("c=%08x %s %s r=%08x%s",
__entry->call,
__print_symbolic(__entry->why, rxrpc_propose_ack_traces),
__print_symbolic(__entry->ack_reason, rxrpc_ack_names),
__entry->serial,
__entry->immediate,
__entry->background,
__print_symbolic(__entry->outcome, rxrpc_propose_ack_outcomes))
);
TRACE_EVENT(rxrpc_send_ack,
TP_PROTO(struct rxrpc_call *call, enum rxrpc_propose_ack_trace why,
u8 ack_reason, rxrpc_serial_t serial),
TP_ARGS(call, why, ack_reason, serial),
TP_STRUCT__entry(
__field(unsigned int, call )
__field(enum rxrpc_propose_ack_trace, why )
__field(rxrpc_serial_t, serial )
__field(u8, ack_reason )
),
TP_fast_assign(
__entry->call = call->debug_id;
__entry->why = why;
__entry->serial = serial;
__entry->ack_reason = ack_reason;
),
TP_printk("c=%08x %s %s r=%08x",
__entry->call,
__print_symbolic(__entry->why, rxrpc_propose_ack_traces),
__print_symbolic(__entry->ack_reason, rxrpc_ack_names),
__entry->serial)
);
TRACE_EVENT(rxrpc_retransmit,
TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq, u8 annotation,
s64 expiry),
......
......@@ -292,6 +292,8 @@ struct rxrpc_local {
struct hlist_node link;
struct socket *socket; /* my UDP socket */
struct work_struct processor;
struct list_head ack_tx_queue; /* List of ACKs that need sending */
spinlock_t ack_tx_lock; /* ACK list lock */
struct rxrpc_sock __rcu *service; /* Service(s) listening on this endpoint */
struct rw_semaphore defrag_sem; /* control re-enablement of IP DF bit */
struct sk_buff_head reject_queue; /* packets awaiting rejection */
......@@ -520,10 +522,8 @@ enum rxrpc_call_flag {
* Events that can be raised on a call.
*/
enum rxrpc_call_event {
RXRPC_CALL_EV_ACK, /* need to generate ACK */
RXRPC_CALL_EV_ABORT, /* need to generate abort */
RXRPC_CALL_EV_RESEND, /* Tx resend required */
RXRPC_CALL_EV_PING, /* Ping send required */
RXRPC_CALL_EV_EXPIRED, /* Expiry occurred */
RXRPC_CALL_EV_ACK_LOST, /* ACK may be lost, send ping */
};
......@@ -782,13 +782,20 @@ struct rxrpc_txbuf {
#define RXRPC_TXBUF_LAST 2 /* Set if last packet in Tx phase */
#define RXRPC_TXBUF_RESENT 3 /* Set if has been resent */
#define RXRPC_TXBUF_RETRANS 4 /* Set if should be retransmitted */
u8 /*enum rxrpc_propose_ack_trace*/ ack_why; /* If ack, why */
struct {
/* The packet for encrypting and DMA'ing. We align it such
* that data[] aligns correctly for any crypto blocksize.
*/
u8 pad[64 - sizeof(struct rxrpc_wire_header)];
struct rxrpc_wire_header wire; /* Network-ready header */
u8 data[RXRPC_JUMBO_DATALEN]; /* Data packet */
union {
u8 data[RXRPC_JUMBO_DATALEN]; /* Data packet */
struct {
struct rxrpc_ackpacket ack;
u8 acks[0];
};
};
} __aligned(64);
};
......@@ -824,8 +831,10 @@ int rxrpc_user_charge_accept(struct rxrpc_sock *, unsigned long);
/*
* call_event.c
*/
void rxrpc_propose_ACK(struct rxrpc_call *, u8, u32, bool, bool,
enum rxrpc_propose_ack_trace);
void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial,
enum rxrpc_propose_ack_trace why);
void rxrpc_send_ACK(struct rxrpc_call *, u8, rxrpc_serial_t, enum rxrpc_propose_ack_trace);
void rxrpc_propose_ACK(struct rxrpc_call *, u8, u32, enum rxrpc_propose_ack_trace);
void rxrpc_process_call(struct work_struct *);
void rxrpc_reduce_call_timer(struct rxrpc_call *call,
......@@ -1030,7 +1039,7 @@ static inline struct rxrpc_net *rxrpc_net(struct net *net)
/*
* output.c
*/
int rxrpc_send_ack_packet(struct rxrpc_call *, bool, rxrpc_serial_t *);
void rxrpc_transmit_ack_packets(struct rxrpc_local *);
int rxrpc_send_abort_packet(struct rxrpc_call *);
int rxrpc_send_data_packet(struct rxrpc_call *, struct sk_buff *, bool);
void rxrpc_reject_packets(struct rxrpc_local *);
......
......@@ -248,9 +248,8 @@ static void rxrpc_send_ping(struct rxrpc_call *call, struct sk_buff *skb)
if (call->peer->rtt_count < 3 ||
ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), now))
rxrpc_propose_ACK(call, RXRPC_ACK_PING, sp->hdr.serial,
true, true,
rxrpc_propose_ack_ping_for_params);
rxrpc_send_ACK(call, RXRPC_ACK_PING, sp->hdr.serial,
rxrpc_propose_ack_ping_for_params);
}
/*
......
......@@ -20,46 +20,39 @@
/*
* Propose a PING ACK be sent.
*/
static void rxrpc_propose_ping(struct rxrpc_call *call,
bool immediate, bool background)
void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial,
enum rxrpc_propose_ack_trace why)
{
if (immediate) {
if (background &&
!test_and_set_bit(RXRPC_CALL_EV_PING, &call->events))
rxrpc_queue_call(call);
} else {
unsigned long now = jiffies;
unsigned long ping_at = now + rxrpc_idle_ack_delay;
unsigned long now = jiffies;
unsigned long ping_at = now + rxrpc_idle_ack_delay;
if (time_before(ping_at, call->ping_at)) {
WRITE_ONCE(call->ping_at, ping_at);
rxrpc_reduce_call_timer(call, ping_at, now,
rxrpc_timer_set_for_ping);
}
spin_lock_bh(&call->lock);
if (time_before(ping_at, call->ping_at)) {
rxrpc_inc_stat(call->rxnet, stat_tx_acks[RXRPC_ACK_PING]);
WRITE_ONCE(call->ping_at, ping_at);
rxrpc_reduce_call_timer(call, ping_at, now,
rxrpc_timer_set_for_ping);
trace_rxrpc_propose_ack(call, why, RXRPC_ACK_PING, serial,
rxrpc_propose_ack_use);
}
spin_unlock_bh(&call->lock);
}
/*
* propose an ACK be sent
*/
static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
u32 serial, bool immediate, bool background,
enum rxrpc_propose_ack_trace why)
u32 serial, enum rxrpc_propose_ack_trace why)
{
enum rxrpc_propose_ack_outcome outcome = rxrpc_propose_ack_use;
unsigned long expiry = rxrpc_soft_ack_delay;
unsigned long now = jiffies, ack_at;
s8 prior = rxrpc_ack_priority[ack_reason];
rxrpc_inc_stat(call->rxnet, stat_tx_acks[ack_reason]);
/* Pings are handled specially because we don't want to accidentally
* lose a ping response by subsuming it into a ping.
*/
if (ack_reason == RXRPC_ACK_PING) {
rxrpc_propose_ping(call, immediate, background);
goto trace;
}
/* Update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial
* numbers, but we don't alter the timeout.
*/
......@@ -71,8 +64,6 @@ static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
outcome = rxrpc_propose_ack_update;
call->ackr_serial = serial;
}
if (!immediate)
goto trace;
} else if (prior > rxrpc_ack_priority[call->ackr_reason]) {
call->ackr_reason = ack_reason;
call->ackr_serial = serial;
......@@ -84,8 +75,6 @@ static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
case RXRPC_ACK_REQUESTED:
if (rxrpc_requested_ack_delay < expiry)
expiry = rxrpc_requested_ack_delay;
if (serial == 1)
immediate = false;
break;
case RXRPC_ACK_DELAY:
......@@ -99,52 +88,89 @@ static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
break;
default:
immediate = true;
break;
WARN_ON(1);
return;
}
if (test_bit(RXRPC_CALL_EV_ACK, &call->events)) {
_debug("already scheduled");
} else if (immediate || expiry == 0) {
_debug("immediate ACK %lx", call->events);
if (!test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events) &&
background)
rxrpc_queue_call(call);
} else {
unsigned long now = jiffies, ack_at;
if (call->peer->srtt_us != 0)
ack_at = usecs_to_jiffies(call->peer->srtt_us >> 3);
else
ack_at = expiry;
ack_at += READ_ONCE(call->tx_backoff);
ack_at += now;
if (time_before(ack_at, call->ack_at)) {
WRITE_ONCE(call->ack_at, ack_at);
rxrpc_reduce_call_timer(call, ack_at, now,
rxrpc_timer_set_for_ack);
}
if (call->peer->srtt_us != 0)
ack_at = usecs_to_jiffies(call->peer->srtt_us >> 3);
else
ack_at = expiry;
ack_at += READ_ONCE(call->tx_backoff);
ack_at += now;
if (time_before(ack_at, call->ack_at)) {
WRITE_ONCE(call->ack_at, ack_at);
rxrpc_reduce_call_timer(call, ack_at, now,
rxrpc_timer_set_for_ack);
}
trace:
trace_rxrpc_propose_ack(call, why, ack_reason, serial, immediate,
background, outcome);
trace_rxrpc_propose_ack(call, why, ack_reason, serial, outcome);
}
/*
* propose an ACK be sent, locking the call structure
*/
void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
u32 serial, bool immediate, bool background,
void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason, u32 serial,
enum rxrpc_propose_ack_trace why)
{
spin_lock_bh(&call->lock);
__rxrpc_propose_ACK(call, ack_reason, serial,
immediate, background, why);
__rxrpc_propose_ACK(call, ack_reason, serial, why);
spin_unlock_bh(&call->lock);
}
/*
* Queue an ACK for immediate transmission.
*/
void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason,
rxrpc_serial_t serial, enum rxrpc_propose_ack_trace why)
{
struct rxrpc_local *local = call->conn->params.local;
struct rxrpc_txbuf *txb;
if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
return;
rxrpc_inc_stat(call->rxnet, stat_tx_acks[ack_reason]);
txb = rxrpc_alloc_txbuf(call, RXRPC_PACKET_TYPE_ACK,
in_softirq() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS);
if (!txb) {
kleave(" = -ENOMEM");
return;
}
txb->ack_why = why;
txb->wire.seq = 0;
txb->wire.type = RXRPC_PACKET_TYPE_ACK;
txb->wire.flags |= RXRPC_SLOW_START_OK;
txb->ack.bufferSpace = 0;
txb->ack.maxSkew = 0;
txb->ack.firstPacket = 0;
txb->ack.previousPacket = 0;
txb->ack.serial = htonl(serial);
txb->ack.reason = ack_reason;
txb->ack.nAcks = 0;
if (!rxrpc_try_get_call(call, rxrpc_call_got)) {
rxrpc_put_txbuf(txb, rxrpc_txbuf_put_nomem);
return;
}
spin_lock_bh(&local->ack_tx_lock);
list_add_tail(&txb->tx_link, &local->ack_tx_queue);
spin_unlock_bh(&local->ack_tx_lock);
trace_rxrpc_send_ack(call, why, ack_reason, serial);
if (in_task()) {
rxrpc_transmit_ack_packets(call->peer->local);
} else {
rxrpc_get_local(local);
rxrpc_queue_local(local);
}
}
/*
* Handle congestion being detected by the retransmit timeout.
*/
......@@ -230,9 +256,8 @@ static void rxrpc_resend(struct rxrpc_call *call, unsigned long now_j)
ack_ts = ktime_sub(now, call->acks_latest_ts);
if (ktime_to_us(ack_ts) < (call->peer->srtt_us >> 3))
goto out;
rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, false,
rxrpc_propose_ack_ping_for_lost_ack);
rxrpc_send_ack_packet(call, true, NULL);
rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
rxrpc_propose_ack_ping_for_lost_ack);
goto out;
}
......@@ -291,9 +316,10 @@ void rxrpc_process_call(struct work_struct *work)
{
struct rxrpc_call *call =
container_of(work, struct rxrpc_call, processor);
rxrpc_serial_t *send_ack;
unsigned long now, next, t;
unsigned int iterations = 0;
rxrpc_serial_t ackr_serial;
u8 ackr_reason;
rxrpc_see_call(call);
......@@ -342,7 +368,15 @@ void rxrpc_process_call(struct work_struct *work)
if (time_after_eq(now, t)) {
trace_rxrpc_timer(call, rxrpc_timer_exp_ack, now);
cmpxchg(&call->ack_at, t, now + MAX_JIFFY_OFFSET);
set_bit(RXRPC_CALL_EV_ACK, &call->events);
spin_lock_bh(&call->lock);
ackr_reason = call->ackr_reason;
ackr_serial = call->ackr_serial;
call->ackr_reason = 0;
call->ackr_serial = 0;
spin_unlock_bh(&call->lock);
if (ackr_reason)
rxrpc_send_ACK(call, ackr_reason, ackr_serial,
rxrpc_propose_ack_ping_for_lost_ack);
}
t = READ_ONCE(call->ack_lost_at);
......@@ -356,16 +390,16 @@ void rxrpc_process_call(struct work_struct *work)
if (time_after_eq(now, t)) {
trace_rxrpc_timer(call, rxrpc_timer_exp_keepalive, now);
cmpxchg(&call->keepalive_at, t, now + MAX_JIFFY_OFFSET);
rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, true,
rxrpc_propose_ack_ping_for_keepalive);
set_bit(RXRPC_CALL_EV_PING, &call->events);
rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
rxrpc_propose_ack_ping_for_keepalive);
}
t = READ_ONCE(call->ping_at);
if (time_after_eq(now, t)) {
trace_rxrpc_timer(call, rxrpc_timer_exp_ping, now);
cmpxchg(&call->ping_at, t, now + MAX_JIFFY_OFFSET);
set_bit(RXRPC_CALL_EV_PING, &call->events);
rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
rxrpc_propose_ack_ping_for_keepalive);
}
t = READ_ONCE(call->resend_at);
......@@ -388,25 +422,10 @@ void rxrpc_process_call(struct work_struct *work)
goto recheck_state;
}
send_ack = NULL;
if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events)) {
call->acks_lost_top = call->tx_top;
rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, false,
rxrpc_propose_ack_ping_for_lost_ack);
send_ack = &call->acks_lost_ping;
}
if (test_and_clear_bit(RXRPC_CALL_EV_ACK, &call->events) ||
send_ack) {
if (call->ackr_reason) {
rxrpc_send_ack_packet(call, false, send_ack);
goto recheck_state;
}
}
if (test_and_clear_bit(RXRPC_CALL_EV_PING, &call->events)) {
rxrpc_send_ack_packet(call, true, NULL);
goto recheck_state;
rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
rxrpc_propose_ack_ping_for_lost_ack);
}
if (test_and_clear_bit(RXRPC_CALL_EV_RESEND, &call->events) &&
......
......@@ -398,8 +398,7 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
unsigned int j, nr_subpackets, nr_unacked = 0;
rxrpc_serial_t serial = sp->hdr.serial, ack_serial = serial;
rxrpc_seq_t seq0 = sp->hdr.seq, hard_ack;
bool immediate_ack = false, jumbo_bad = false;
u8 ack = 0;
bool jumbo_bad = false;
_enter("{%u,%u},{%u,%u}",
call->rx_hard_ack, call->rx_top, skb->len, seq0);
......@@ -447,9 +446,9 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
nr_subpackets = sp->nr_subpackets;
if (nr_subpackets > 1) {
if (call->nr_jumbo_bad > 3) {
ack = RXRPC_ACK_NOSPACE;
ack_serial = serial;
goto ack;
rxrpc_send_ACK(call, RXRPC_ACK_NOSPACE, serial,
rxrpc_propose_ack_input_data);
goto unlock;
}
}
......@@ -459,6 +458,7 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
unsigned int ix = seq & RXRPC_RXTX_BUFF_MASK;
bool terminal = (j == nr_subpackets - 1);
bool last = terminal && (sp->rx_flags & RXRPC_SKB_INCL_LAST);
bool acked = false;
u8 flags, annotation = j;
_proto("Rx DATA+%u %%%u { #%x t=%u l=%u }",
......@@ -488,25 +488,22 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
trace_rxrpc_rx_data(call->debug_id, seq, serial, flags, annotation);
if (before_eq(seq, hard_ack)) {
ack = RXRPC_ACK_DUPLICATE;
ack_serial = serial;
rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial,
rxrpc_propose_ack_input_data);
continue;
}
if (call->rxtx_buffer[ix]) {
rxrpc_input_dup_data(call, seq, nr_subpackets > 1,
&jumbo_bad);
if (ack != RXRPC_ACK_DUPLICATE) {
ack = RXRPC_ACK_DUPLICATE;
ack_serial = serial;
}
immediate_ack = true;
rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial,
rxrpc_propose_ack_input_data);
continue;
}
if (after(seq, hard_ack + call->rx_winsize)) {
ack = RXRPC_ACK_EXCEEDS_WINDOW;
ack_serial = serial;
rxrpc_send_ACK(call, RXRPC_ACK_EXCEEDS_WINDOW, serial,
rxrpc_propose_ack_input_data);
if (flags & RXRPC_JUMBO_PACKET) {
if (!jumbo_bad) {
call->nr_jumbo_bad++;
......@@ -514,12 +511,13 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
}
}
goto ack;
goto unlock;
}
if (flags & RXRPC_REQUEST_ACK && !ack) {
ack = RXRPC_ACK_REQUESTED;
ack_serial = serial;
if (flags & RXRPC_REQUEST_ACK) {
rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, serial,
rxrpc_propose_ack_input_data);
acked = true;
}
if (after(seq0, call->ackr_highest_seq))
......@@ -542,11 +540,11 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
smp_store_release(&call->rx_top, seq);
} else if (before(seq, call->rx_top)) {
/* Send an immediate ACK if we fill in a hole */
if (!ack) {
ack = RXRPC_ACK_DELAY;
ack_serial = serial;
if (!acked) {
rxrpc_send_ACK(call, RXRPC_ACK_DELAY, serial,
rxrpc_propose_ack_input_data);
acked = true;
}
immediate_ack = true;
}
if (terminal) {
......@@ -558,14 +556,8 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
sp = NULL;
}
nr_unacked++;
if (last) {
set_bit(RXRPC_CALL_RX_LAST, &call->flags);
if (!ack) {
ack = RXRPC_ACK_DELAY;
ack_serial = serial;
}
trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq);
} else {
trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq);
......@@ -574,32 +566,30 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
if (after_eq(seq, call->rx_expect_next)) {
if (after(seq, call->rx_expect_next)) {
_net("OOS %u > %u", seq, call->rx_expect_next);
ack = RXRPC_ACK_OUT_OF_SEQUENCE;
ack_serial = serial;
rxrpc_send_ACK(call, RXRPC_ACK_OUT_OF_SEQUENCE, serial,
rxrpc_propose_ack_input_data);
acked = true;
}
call->rx_expect_next = seq + 1;
}
if (!ack)
if (!acked) {
nr_unacked++;
ack_serial = serial;
}
}
ack:
if (atomic_add_return(nr_unacked, &call->ackr_nr_unacked) > 2 && !ack)
ack = RXRPC_ACK_IDLE;
if (ack)
rxrpc_propose_ACK(call, ack, ack_serial,
immediate_ack, true,
rxrpc_propose_ack_input_data);
unlock:
if (atomic_add_return(nr_unacked, &call->ackr_nr_unacked) > 2)
rxrpc_send_ACK(call, RXRPC_ACK_IDLE, ack_serial,
rxrpc_propose_ack_input_data);
else
rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial,
false, true,
rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, ack_serial,
rxrpc_propose_ack_input_data);
trace_rxrpc_notify_socket(call->debug_id, serial);
rxrpc_notify_socket(call);
unlock:
spin_unlock(&call->input_lock);
rxrpc_free_skb(skb, rxrpc_skb_freed);
_leave(" [queued]");
......@@ -893,13 +883,11 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
if (buf.ack.reason == RXRPC_ACK_PING) {
_proto("Rx ACK %%%u PING Request", ack_serial);
rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
ack_serial, true, true,
rxrpc_propose_ack_respond_to_ping);
rxrpc_send_ACK(call, RXRPC_ACK_PING_RESPONSE, ack_serial,
rxrpc_propose_ack_respond_to_ping);
} else if (sp->hdr.flags & RXRPC_REQUEST_ACK) {
rxrpc_propose_ACK(call, RXRPC_ACK_REQUESTED,
ack_serial, true, true,
rxrpc_propose_ack_respond_to_ack);
rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, ack_serial,
rxrpc_propose_ack_respond_to_ack);
}
/* If we get an EXCEEDS_WINDOW ACK from the server, it probably
......@@ -1011,9 +999,8 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
RXRPC_TX_ANNO_LAST &&
summary.nr_acks == call->tx_top - hard_ack &&
rxrpc_is_client_call(call))
rxrpc_propose_ACK(call, RXRPC_ACK_PING, ack_serial,
false, true,
rxrpc_propose_ack_ping_for_lost_reply);
rxrpc_propose_ping(call, ack_serial,
rxrpc_propose_ack_ping_for_lost_reply);
rxrpc_congestion_management(call, skb, &summary, acked_serial);
out:
......
......@@ -97,6 +97,8 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
local->rxnet = rxnet;
INIT_HLIST_NODE(&local->link);
INIT_WORK(&local->processor, rxrpc_local_processor);
INIT_LIST_HEAD(&local->ack_tx_queue);
spin_lock_init(&local->ack_tx_lock);
init_rwsem(&local->defrag_sem);
skb_queue_head_init(&local->reject_queue);
skb_queue_head_init(&local->event_queue);
......@@ -432,6 +434,11 @@ static void rxrpc_local_processor(struct work_struct *work)
break;
}
if (!list_empty(&local->ack_tx_queue)) {
rxrpc_transmit_ack_packets(local);
again = true;
}
if (!skb_queue_empty(&local->reject_queue)) {
rxrpc_reject_packets(local);
again = true;
......
......@@ -16,14 +16,6 @@
#include <net/udp.h>
#include "ar-internal.h"
struct rxrpc_ack_buffer {
struct rxrpc_wire_header whdr;
struct rxrpc_ackpacket ack;
u8 acks[255];
u8 pad[3];
struct rxrpc_ackinfo ackinfo;
};
extern int udpv6_sendmsg(struct sock *sk, struct msghdr *msg, size_t len);
static ssize_t do_udp_sendmsg(struct socket *sk, struct msghdr *msg, size_t len)
......@@ -82,22 +74,21 @@ static void rxrpc_set_keepalive(struct rxrpc_call *call)
*/
static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
struct rxrpc_call *call,
struct rxrpc_ack_buffer *pkt,
struct rxrpc_txbuf *txb,
rxrpc_seq_t *_hard_ack,
rxrpc_seq_t *_top,
u8 reason)
rxrpc_seq_t *_top)
{
rxrpc_serial_t serial;
struct rxrpc_ackinfo ackinfo;
unsigned int tmp;
rxrpc_seq_t hard_ack, top, seq;
int ix;
u32 mtu, jmax;
u8 *ackp = pkt->acks;
u8 *ackp = txb->acks;
tmp = atomic_xchg(&call->ackr_nr_unacked, 0);
tmp |= atomic_xchg(&call->ackr_nr_consumed, 0);
if (!tmp && (reason == RXRPC_ACK_DELAY ||
reason == RXRPC_ACK_IDLE)) {
if (!tmp && (txb->ack.reason == RXRPC_ACK_DELAY ||
txb->ack.reason == RXRPC_ACK_IDLE)) {
rxrpc_inc_stat(call->rxnet, stat_tx_ack_skip);
return 0;
}
......@@ -105,24 +96,16 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
rxrpc_inc_stat(call->rxnet, stat_tx_ack_fill);
/* Barrier against rxrpc_input_data(). */
serial = call->ackr_serial;
hard_ack = READ_ONCE(call->rx_hard_ack);
top = smp_load_acquire(&call->rx_top);
*_hard_ack = hard_ack;
*_top = top;
pkt->ack.bufferSpace = htons(0);
pkt->ack.maxSkew = htons(0);
pkt->ack.firstPacket = htonl(hard_ack + 1);
pkt->ack.previousPacket = htonl(call->ackr_highest_seq);
pkt->ack.serial = htonl(serial);
pkt->ack.reason = reason;
pkt->ack.nAcks = top - hard_ack;
if (reason == RXRPC_ACK_PING)
pkt->whdr.flags |= RXRPC_REQUEST_ACK;
txb->ack.firstPacket = htonl(hard_ack + 1);
txb->ack.previousPacket = htonl(call->ackr_highest_seq);
txb->ack.nAcks = top - hard_ack;
if (after(top, hard_ack)) {
if (txb->ack.nAcks) {
seq = hard_ack + 1;
do {
ix = seq & RXRPC_RXTX_BUFF_MASK;
......@@ -137,15 +120,16 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
mtu = conn->params.peer->if_mtu;
mtu -= conn->params.peer->hdrsize;
jmax = (call->nr_jumbo_bad > 3) ? 1 : rxrpc_rx_jumbo_max;
pkt->ackinfo.rxMTU = htonl(rxrpc_rx_mtu);
pkt->ackinfo.maxMTU = htonl(mtu);
pkt->ackinfo.rwind = htonl(call->rx_winsize);
pkt->ackinfo.jumbo_max = htonl(jmax);
ackinfo.rxMTU = htonl(rxrpc_rx_mtu);
ackinfo.maxMTU = htonl(mtu);
ackinfo.rwind = htonl(call->rx_winsize);
ackinfo.jumbo_max = htonl(jmax);
*ackp++ = 0;
*ackp++ = 0;
*ackp++ = 0;
return top - hard_ack + 3;
memcpy(ackp, &ackinfo, sizeof(ackinfo));
return top - hard_ack + 3 + sizeof(ackinfo);
}
/*
......@@ -194,26 +178,21 @@ static void rxrpc_cancel_rtt_probe(struct rxrpc_call *call,
/*
* Send an ACK call packet.
*/
int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping,
rxrpc_serial_t *_serial)
static int rxrpc_send_ack_packet(struct rxrpc_local *local, struct rxrpc_txbuf *txb)
{
struct rxrpc_connection *conn;
struct rxrpc_ack_buffer *pkt;
struct rxrpc_call *call = txb->call;
struct msghdr msg;
struct kvec iov[2];
struct kvec iov[1];
rxrpc_serial_t serial;
rxrpc_seq_t hard_ack, top;
size_t len, n;
int ret, rtt_slot = -1;
u8 reason;
if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
return -ECONNRESET;
pkt = kzalloc(sizeof(*pkt), GFP_KERNEL);
if (!pkt)
return -ENOMEM;
conn = call->conn;
msg.msg_name = &call->peer->srx.transport;
......@@ -222,85 +201,93 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping,
msg.msg_controllen = 0;
msg.msg_flags = 0;
pkt->whdr.epoch = htonl(conn->proto.epoch);
pkt->whdr.cid = htonl(call->cid);
pkt->whdr.callNumber = htonl(call->call_id);
pkt->whdr.seq = 0;
pkt->whdr.type = RXRPC_PACKET_TYPE_ACK;
pkt->whdr.flags = RXRPC_SLOW_START_OK | conn->out_clientflag;
pkt->whdr.userStatus = 0;
pkt->whdr.securityIndex = call->security_ix;
pkt->whdr._rsvd = 0;
pkt->whdr.serviceId = htons(call->service_id);
if (txb->ack.reason == RXRPC_ACK_PING)
txb->wire.flags |= RXRPC_REQUEST_ACK;
spin_lock_bh(&call->lock);
if (ping) {
reason = RXRPC_ACK_PING;
} else {
reason = call->ackr_reason;
if (!call->ackr_reason) {
spin_unlock_bh(&call->lock);
ret = 0;
goto out;
}
call->ackr_reason = 0;
}
n = rxrpc_fill_out_ack(conn, call, pkt, &hard_ack, &top, reason);
n = rxrpc_fill_out_ack(conn, call, txb, &hard_ack, &top);
spin_unlock_bh(&call->lock);
if (n == 0) {
kfree(pkt);
return 0;
}
iov[0].iov_base = pkt;
iov[0].iov_len = sizeof(pkt->whdr) + sizeof(pkt->ack) + n;
iov[1].iov_base = &pkt->ackinfo;
iov[1].iov_len = sizeof(pkt->ackinfo);
len = iov[0].iov_len + iov[1].iov_len;
iov[0].iov_base = &txb->wire;
iov[0].iov_len = sizeof(txb->wire) + sizeof(txb->ack) + n;
len = iov[0].iov_len;
serial = atomic_inc_return(&conn->serial);
pkt->whdr.serial = htonl(serial);
txb->wire.serial = htonl(serial);
trace_rxrpc_tx_ack(call->debug_id, serial,
ntohl(pkt->ack.firstPacket),
ntohl(pkt->ack.serial),
pkt->ack.reason, pkt->ack.nAcks);
if (_serial)
*_serial = serial;
ntohl(txb->ack.firstPacket),
ntohl(txb->ack.serial), txb->ack.reason, txb->ack.nAcks);
if (txb->ack_why == rxrpc_propose_ack_ping_for_lost_ack)
call->acks_lost_ping = serial;
if (ping)
if (txb->ack.reason == RXRPC_ACK_PING)
rtt_slot = rxrpc_begin_rtt_probe(call, serial, rxrpc_rtt_tx_ping);
rxrpc_inc_stat(call->rxnet, stat_tx_ack_send);
iov_iter_kvec(&msg.msg_iter, WRITE, iov, 2, len);
iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
ret = do_udp_sendmsg(conn->params.local->socket, &msg, len);
call->peer->last_tx_at = ktime_get_seconds();
if (ret < 0)
trace_rxrpc_tx_fail(call->debug_id, serial, ret,
rxrpc_tx_point_call_ack);
else
trace_rxrpc_tx_packet(call->debug_id, &pkt->whdr,
trace_rxrpc_tx_packet(call->debug_id, &txb->wire,
rxrpc_tx_point_call_ack);
rxrpc_tx_backoff(call, ret);
if (call->state < RXRPC_CALL_COMPLETE) {
if (ret < 0) {
if (ret < 0)
rxrpc_cancel_rtt_probe(call, serial, rtt_slot);
rxrpc_propose_ACK(call, pkt->ack.reason,
ntohl(pkt->ack.serial),
false, true,
rxrpc_propose_ack_retry_tx);
}
rxrpc_set_keepalive(call);
}
out:
kfree(pkt);
return ret;
}
/*
* ACK transmitter for a local endpoint. The UDP socket locks around each
* transmission, so we can only transmit one packet at a time, ACK, DATA or
* otherwise.
*/
void rxrpc_transmit_ack_packets(struct rxrpc_local *local)
{
LIST_HEAD(queue);
int ret;
trace_rxrpc_local(local->debug_id, rxrpc_local_tx_ack,
refcount_read(&local->ref), NULL);
if (list_empty(&local->ack_tx_queue))
return;
spin_lock_bh(&local->ack_tx_lock);
list_splice_tail_init(&local->ack_tx_queue, &queue);
spin_unlock_bh(&local->ack_tx_lock);
while (!list_empty(&queue)) {
struct rxrpc_txbuf *txb =
list_entry(queue.next, struct rxrpc_txbuf, tx_link);
ret = rxrpc_send_ack_packet(local, txb);
if (ret < 0 && ret != -ECONNRESET) {
spin_lock_bh(&local->ack_tx_lock);
list_splice_init(&queue, &local->ack_tx_queue);
spin_unlock_bh(&local->ack_tx_lock);
break;
}
list_del_init(&txb->tx_link);
rxrpc_put_call(txb->call, rxrpc_call_put);
rxrpc_put_txbuf(txb, rxrpc_txbuf_put_ack_tx);
}
}
/*
* Send an ABORT call packet.
*/
......
......@@ -189,7 +189,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial, false, true,
rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial,
rxrpc_propose_ack_terminal_ack);
//rxrpc_send_ack_packet(call, false, NULL);
}
......@@ -206,7 +206,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
write_unlock_bh(&call->state_lock);
rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, false, true,
rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial,
rxrpc_propose_ack_processing_op);
break;
default:
......@@ -259,12 +259,11 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
rxrpc_end_rx_phase(call, serial);
} else {
/* Check to see if there's an ACK that needs sending. */
if (atomic_inc_return(&call->ackr_nr_consumed) > 2)
rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial,
true, false,
rxrpc_propose_ack_rotate_rx);
if (call->ackr_reason && call->ackr_reason != RXRPC_ACK_DELAY)
rxrpc_send_ack_packet(call, false, NULL);
if (atomic_inc_return(&call->ackr_nr_consumed) > 2) {
rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial,
rxrpc_propose_ack_rotate_rx);
rxrpc_transmit_ack_packets(call->peer->local);
}
}
}
......@@ -363,10 +362,6 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
unsigned int rx_pkt_offset, rx_pkt_len;
int ix, copy, ret = -EAGAIN, ret2;
if (test_and_clear_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags) &&
call->ackr_reason)
rxrpc_send_ack_packet(call, false, NULL);
rx_pkt_offset = call->rx_pkt_offset;
rx_pkt_len = call->rx_pkt_len;
rx_pkt_last = call->rx_pkt_last;
......@@ -389,6 +384,7 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
if (!skb) {
trace_rxrpc_recvmsg(call, rxrpc_recvmsg_hole, seq,
rx_pkt_offset, rx_pkt_len, 0);
rxrpc_transmit_ack_packets(call->peer->local);
break;
}
smp_rmb();
......@@ -604,6 +600,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
if (ret == -EAGAIN)
ret = 0;
rxrpc_transmit_ack_packets(call->peer->local);
if (after(call->rx_top, call->rx_hard_ack) &&
call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
rxrpc_notify_socket(call);
......@@ -734,17 +731,7 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
read_phase_complete:
ret = 1;
out:
switch (call->ackr_reason) {
case RXRPC_ACK_IDLE:
break;
case RXRPC_ACK_DELAY:
if (ret != -EAGAIN)
break;
fallthrough;
default:
rxrpc_send_ack_packet(call, false, NULL);
}
rxrpc_transmit_ack_packets(call->peer->local);
if (_service)
*_service = call->service_id;
mutex_unlock(&call->user_mutex);
......
......@@ -332,9 +332,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
rxrpc_see_skb(skb, rxrpc_skb_seen);
do {
/* Check to see if there's a ping ACK to reply to. */
if (call->ackr_reason == RXRPC_ACK_PING_RESPONSE)
rxrpc_send_ack_packet(call, false, NULL);
rxrpc_transmit_ack_packets(call->peer->local);
if (!skb) {
size_t remain, bufsize, chunk, offset;
......
......@@ -33,6 +33,7 @@ struct rxrpc_txbuf *rxrpc_alloc_txbuf(struct rxrpc_call *call, u8 packet_type,
txb->len = 0;
txb->offset = 0;
txb->flags = 0;
txb->ack_why = 0;
txb->seq = call->tx_top + 1;
txb->wire.epoch = htonl(call->conn->proto.epoch);
txb->wire.cid = htonl(call->cid);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment