Commit e2de6c40 authored by David Howells's avatar David Howells

rxrpc: Use info in skbuff instead of reparsing a jumbo packet

Use the information now cached in the skbuff private data to avoid the need
to reparse a jumbo packet.  We can find all the subpackets by dead
reckoning, so it's only necessary to note how many there are, whether the
last one is flagged as LAST_PACKET and whether any have the REQUEST_ACK
flag set.

This is necessary as once recvmsg() can see the packet, it can start
modifying it, such as doing in-place decryption.

Fixes: 248f219c ("rxrpc: Rewrite the data and ack handling code")
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
parent c3c9e3df
...@@ -617,8 +617,7 @@ struct rxrpc_call { ...@@ -617,8 +617,7 @@ struct rxrpc_call {
#define RXRPC_TX_ANNO_LAST 0x04 #define RXRPC_TX_ANNO_LAST 0x04
#define RXRPC_TX_ANNO_RESENT 0x08 #define RXRPC_TX_ANNO_RESENT 0x08
#define RXRPC_RX_ANNO_JUMBO 0x3f /* Jumbo subpacket number + 1 if not zero */ #define RXRPC_RX_ANNO_SUBPACKET 0x3f /* Subpacket number in jumbogram */
#define RXRPC_RX_ANNO_JLAST 0x40 /* Set if last element of a jumbo packet */
#define RXRPC_RX_ANNO_VERIFIED 0x80 /* Set if verified and decrypted */ #define RXRPC_RX_ANNO_VERIFIED 0x80 /* Set if verified and decrypted */
rxrpc_seq_t tx_hard_ack; /* Dead slot in buffer; the first transmitted but rxrpc_seq_t tx_hard_ack; /* Dead slot in buffer; the first transmitted but
* not hard-ACK'd packet follows this. * not hard-ACK'd packet follows this.
......
...@@ -405,10 +405,10 @@ static bool rxrpc_validate_data(struct sk_buff *skb) ...@@ -405,10 +405,10 @@ static bool rxrpc_validate_data(struct sk_buff *skb)
* (that information is encoded in the ACK packet). * (that information is encoded in the ACK packet).
*/ */
static void rxrpc_input_dup_data(struct rxrpc_call *call, rxrpc_seq_t seq, static void rxrpc_input_dup_data(struct rxrpc_call *call, rxrpc_seq_t seq,
u8 annotation, bool *_jumbo_bad) bool is_jumbo, bool *_jumbo_bad)
{ {
/* Discard normal packets that are duplicates. */ /* Discard normal packets that are duplicates. */
if (annotation == 0) if (is_jumbo)
return; return;
/* Skip jumbo subpackets that are duplicates. When we've had three or /* Skip jumbo subpackets that are duplicates. When we've had three or
...@@ -428,19 +428,17 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb) ...@@ -428,19 +428,17 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
{ {
struct rxrpc_skb_priv *sp = rxrpc_skb(skb); struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
enum rxrpc_call_state state; enum rxrpc_call_state state;
unsigned int offset = sizeof(struct rxrpc_wire_header); unsigned int j;
unsigned int ix;
rxrpc_serial_t serial = sp->hdr.serial, ack_serial = 0; rxrpc_serial_t serial = sp->hdr.serial, ack_serial = 0;
rxrpc_seq_t seq = sp->hdr.seq, hard_ack; rxrpc_seq_t seq0 = sp->hdr.seq, hard_ack;
bool immediate_ack = false, jumbo_bad = false, queued; bool immediate_ack = false, jumbo_bad = false;
u16 len; u8 ack = 0;
u8 ack = 0, flags, annotation = 0;
_enter("{%u,%u},{%u,%u}", _enter("{%u,%u},{%u,%u}",
call->rx_hard_ack, call->rx_top, skb->len, seq); call->rx_hard_ack, call->rx_top, skb->len, seq0);
_proto("Rx DATA %%%u { #%u f=%02x }", _proto("Rx DATA %%%u { #%u f=%02x n=%u }",
sp->hdr.serial, seq, sp->hdr.flags); sp->hdr.serial, seq0, sp->hdr.flags, sp->nr_subpackets);
state = READ_ONCE(call->state); state = READ_ONCE(call->state);
if (state >= RXRPC_CALL_COMPLETE) if (state >= RXRPC_CALL_COMPLETE)
...@@ -469,137 +467,136 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb) ...@@ -469,137 +467,136 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
!rxrpc_receiving_reply(call)) !rxrpc_receiving_reply(call))
goto unlock; goto unlock;
call->ackr_prev_seq = seq; call->ackr_prev_seq = seq0;
hard_ack = READ_ONCE(call->rx_hard_ack); hard_ack = READ_ONCE(call->rx_hard_ack);
if (after(seq, hard_ack + call->rx_winsize)) {
ack = RXRPC_ACK_EXCEEDS_WINDOW;
ack_serial = serial;
goto ack;
}
flags = sp->hdr.flags; if (sp->nr_subpackets > 1) {
if (flags & RXRPC_JUMBO_PACKET) {
if (call->nr_jumbo_bad > 3) { if (call->nr_jumbo_bad > 3) {
ack = RXRPC_ACK_NOSPACE; ack = RXRPC_ACK_NOSPACE;
ack_serial = serial; ack_serial = serial;
goto ack; goto ack;
} }
annotation = 1;
} }
next_subpacket: for (j = 0; j < sp->nr_subpackets; j++) {
queued = false; rxrpc_serial_t serial = sp->hdr.serial + j;
ix = seq & RXRPC_RXTX_BUFF_MASK; rxrpc_seq_t seq = seq0 + j;
len = skb->len; unsigned int ix = seq & RXRPC_RXTX_BUFF_MASK;
if (flags & RXRPC_JUMBO_PACKET) bool terminal = (j == sp->nr_subpackets - 1);
len = RXRPC_JUMBO_DATALEN; bool last = terminal && (sp->rx_flags & RXRPC_SKB_INCL_LAST);
u8 flags, annotation = j;
if (flags & RXRPC_LAST_PACKET) {
if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) && _proto("Rx DATA+%u %%%u { #%x t=%u l=%u }",
seq != call->rx_top) { j, serial, seq, terminal, last);
rxrpc_proto_abort("LSN", call, seq);
goto unlock; if (last) {
} if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
} else { seq != call->rx_top) {
if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) && rxrpc_proto_abort("LSN", call, seq);
after_eq(seq, call->rx_top)) { goto unlock;
rxrpc_proto_abort("LSA", call, seq); }
goto unlock; } else {
if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
after_eq(seq, call->rx_top)) {
rxrpc_proto_abort("LSA", call, seq);
goto unlock;
}
} }
}
trace_rxrpc_rx_data(call->debug_id, seq, serial, flags, annotation);
if (before_eq(seq, hard_ack)) {
ack = RXRPC_ACK_DUPLICATE;
ack_serial = serial;
goto skip;
}
if (flags & RXRPC_REQUEST_ACK && !ack) { flags = 0;
ack = RXRPC_ACK_REQUESTED; if (last)
ack_serial = serial; flags |= RXRPC_LAST_PACKET;
} if (!terminal)
flags |= RXRPC_JUMBO_PACKET;
if (test_bit(j, sp->rx_req_ack))
flags |= RXRPC_REQUEST_ACK;
trace_rxrpc_rx_data(call->debug_id, seq, serial, flags, annotation);
if (call->rxtx_buffer[ix]) { if (before_eq(seq, hard_ack)) {
rxrpc_input_dup_data(call, seq, annotation, &jumbo_bad);
if (ack != RXRPC_ACK_DUPLICATE) {
ack = RXRPC_ACK_DUPLICATE; ack = RXRPC_ACK_DUPLICATE;
ack_serial = serial; ack_serial = serial;
continue;
} }
immediate_ack = true;
goto skip;
}
/* Queue the packet. We use a couple of memory barriers here as need
* to make sure that rx_top is perceived to be set after the buffer
* pointer and that the buffer pointer is set after the annotation and
* the skb data.
*
* Barriers against rxrpc_recvmsg_data() and rxrpc_rotate_rx_window()
* and also rxrpc_fill_out_ack().
*/
rxrpc_get_skb(skb, rxrpc_skb_rx_got);
call->rxtx_annotations[ix] = annotation;
smp_wmb();
call->rxtx_buffer[ix] = skb;
if (after(seq, call->rx_top)) {
smp_store_release(&call->rx_top, seq);
} else if (before(seq, call->rx_top)) {
/* Send an immediate ACK if we fill in a hole */
if (!ack) {
ack = RXRPC_ACK_DELAY;
ack_serial = serial;
}
immediate_ack = true;
}
if (flags & RXRPC_LAST_PACKET) {
set_bit(RXRPC_CALL_RX_LAST, &call->flags);
trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq);
} else {
trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq);
}
queued = true;
if (after_eq(seq, call->rx_expect_next)) { if (call->rxtx_buffer[ix]) {
if (after(seq, call->rx_expect_next)) { rxrpc_input_dup_data(call, seq, sp->nr_subpackets > 1,
_net("OOS %u > %u", seq, call->rx_expect_next); &jumbo_bad);
ack = RXRPC_ACK_OUT_OF_SEQUENCE; if (ack != RXRPC_ACK_DUPLICATE) {
ack_serial = serial; ack = RXRPC_ACK_DUPLICATE;
ack_serial = serial;
}
immediate_ack = true;
continue;
} }
call->rx_expect_next = seq + 1;
}
skip:
offset += len;
if (flags & RXRPC_JUMBO_PACKET) {
if (skb_copy_bits(skb, offset, &flags, 1) < 0) {
rxrpc_proto_abort("XJF", call, seq);
goto unlock;
}
offset += sizeof(struct rxrpc_jumbo_header);
seq++;
serial++;
annotation++;
if (flags & RXRPC_JUMBO_PACKET)
annotation |= RXRPC_RX_ANNO_JLAST;
if (after(seq, hard_ack + call->rx_winsize)) { if (after(seq, hard_ack + call->rx_winsize)) {
ack = RXRPC_ACK_EXCEEDS_WINDOW; ack = RXRPC_ACK_EXCEEDS_WINDOW;
ack_serial = serial; ack_serial = serial;
if (!jumbo_bad) { if (flags & RXRPC_JUMBO_PACKET) {
call->nr_jumbo_bad++; if (!jumbo_bad) {
jumbo_bad = true; call->nr_jumbo_bad++;
jumbo_bad = true;
}
} }
goto ack; goto ack;
} }
_proto("Rx DATA Jumbo %%%u", serial); if (flags & RXRPC_REQUEST_ACK && !ack) {
goto next_subpacket; ack = RXRPC_ACK_REQUESTED;
} ack_serial = serial;
}
/* Queue the packet. We use a couple of memory barriers here as need
* to make sure that rx_top is perceived to be set after the buffer
* pointer and that the buffer pointer is set after the annotation and
* the skb data.
*
* Barriers against rxrpc_recvmsg_data() and rxrpc_rotate_rx_window()
* and also rxrpc_fill_out_ack().
*/
rxrpc_get_skb(skb, rxrpc_skb_rx_got);
call->rxtx_annotations[ix] = annotation;
smp_wmb();
call->rxtx_buffer[ix] = skb;
if (after(seq, call->rx_top)) {
smp_store_release(&call->rx_top, seq);
} else if (before(seq, call->rx_top)) {
/* Send an immediate ACK if we fill in a hole */
if (!ack) {
ack = RXRPC_ACK_DELAY;
ack_serial = serial;
}
immediate_ack = true;
}
if (terminal) {
/* From this point on, we're not allowed to touch the
* packet any longer as its ref now belongs to the Rx
* ring.
*/
skb = NULL;
}
if (queued && flags & RXRPC_LAST_PACKET && !ack) { if (last) {
ack = RXRPC_ACK_DELAY; set_bit(RXRPC_CALL_RX_LAST, &call->flags);
ack_serial = serial; if (!ack) {
ack = RXRPC_ACK_DELAY;
ack_serial = serial;
}
trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq);
} else {
trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq);
}
if (after_eq(seq, call->rx_expect_next)) {
if (after(seq, call->rx_expect_next)) {
_net("OOS %u > %u", seq, call->rx_expect_next);
ack = RXRPC_ACK_OUT_OF_SEQUENCE;
ack_serial = serial;
}
call->rx_expect_next = seq + 1;
}
} }
ack: ack:
...@@ -612,7 +609,7 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb) ...@@ -612,7 +609,7 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
false, true, false, true,
rxrpc_propose_ack_input_data); rxrpc_propose_ack_input_data);
if (sp->hdr.seq == READ_ONCE(call->rx_hard_ack) + 1) { if (seq0 == READ_ONCE(call->rx_hard_ack) + 1) {
trace_rxrpc_notify_socket(call->debug_id, serial); trace_rxrpc_notify_socket(call->debug_id, serial);
rxrpc_notify_socket(call); rxrpc_notify_socket(call);
} }
......
...@@ -177,7 +177,8 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call) ...@@ -177,7 +177,8 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
struct sk_buff *skb; struct sk_buff *skb;
rxrpc_serial_t serial; rxrpc_serial_t serial;
rxrpc_seq_t hard_ack, top; rxrpc_seq_t hard_ack, top;
u8 flags; bool last = false;
u8 subpacket;
int ix; int ix;
_enter("%d", call->debug_id); _enter("%d", call->debug_id);
...@@ -191,10 +192,13 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call) ...@@ -191,10 +192,13 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
skb = call->rxtx_buffer[ix]; skb = call->rxtx_buffer[ix];
rxrpc_see_skb(skb, rxrpc_skb_rx_rotated); rxrpc_see_skb(skb, rxrpc_skb_rx_rotated);
sp = rxrpc_skb(skb); sp = rxrpc_skb(skb);
flags = sp->hdr.flags;
serial = sp->hdr.serial; subpacket = call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
if (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO) serial = sp->hdr.serial + subpacket;
serial += (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO) - 1;
if (subpacket == sp->nr_subpackets - 1 &&
sp->rx_flags & RXRPC_SKB_INCL_LAST)
last = true;
call->rxtx_buffer[ix] = NULL; call->rxtx_buffer[ix] = NULL;
call->rxtx_annotations[ix] = 0; call->rxtx_annotations[ix] = 0;
...@@ -203,9 +207,8 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call) ...@@ -203,9 +207,8 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
rxrpc_free_skb(skb, rxrpc_skb_rx_freed); rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
_debug("%u,%u,%02x", hard_ack, top, flags);
trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack); trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
if (flags & RXRPC_LAST_PACKET) { if (last) {
rxrpc_end_rx_phase(call, serial); rxrpc_end_rx_phase(call, serial);
} else { } else {
/* Check to see if there's an ACK that needs sending. */ /* Check to see if there's an ACK that needs sending. */
...@@ -233,18 +236,19 @@ static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb, ...@@ -233,18 +236,19 @@ static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb,
struct rxrpc_skb_priv *sp = rxrpc_skb(skb); struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
rxrpc_seq_t seq = sp->hdr.seq; rxrpc_seq_t seq = sp->hdr.seq;
u16 cksum = sp->hdr.cksum; u16 cksum = sp->hdr.cksum;
u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
_enter(""); _enter("");
/* For all but the head jumbo subpacket, the security checksum is in a /* For all but the head jumbo subpacket, the security checksum is in a
* jumbo header immediately prior to the data. * jumbo header immediately prior to the data.
*/ */
if ((annotation & RXRPC_RX_ANNO_JUMBO) > 1) { if (subpacket > 0) {
__be16 tmp; __be16 tmp;
if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0) if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0)
BUG(); BUG();
cksum = ntohs(tmp); cksum = ntohs(tmp);
seq += (annotation & RXRPC_RX_ANNO_JUMBO) - 1; seq += subpacket;
} }
return call->conn->security->verify_packet(call, skb, offset, len, return call->conn->security->verify_packet(call, skb, offset, len,
...@@ -265,19 +269,18 @@ static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb, ...@@ -265,19 +269,18 @@ static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
u8 *_annotation, u8 *_annotation,
unsigned int *_offset, unsigned int *_len) unsigned int *_offset, unsigned int *_len)
{ {
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
unsigned int offset = sizeof(struct rxrpc_wire_header); unsigned int offset = sizeof(struct rxrpc_wire_header);
unsigned int len; unsigned int len;
int ret; int ret;
u8 annotation = *_annotation; u8 annotation = *_annotation;
u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
/* Locate the subpacket */ /* Locate the subpacket */
offset += subpacket * RXRPC_JUMBO_SUBPKTLEN;
len = skb->len - offset; len = skb->len - offset;
if ((annotation & RXRPC_RX_ANNO_JUMBO) > 0) { if (subpacket < sp->nr_subpackets - 1)
offset += (((annotation & RXRPC_RX_ANNO_JUMBO) - 1) * len = RXRPC_JUMBO_DATALEN;
RXRPC_JUMBO_SUBPKTLEN);
len = (annotation & RXRPC_RX_ANNO_JLAST) ?
skb->len - offset : RXRPC_JUMBO_SUBPKTLEN;
}
if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) { if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) {
ret = rxrpc_verify_packet(call, skb, annotation, offset, len); ret = rxrpc_verify_packet(call, skb, annotation, offset, len);
...@@ -303,6 +306,7 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, ...@@ -303,6 +306,7 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
{ {
struct rxrpc_skb_priv *sp; struct rxrpc_skb_priv *sp;
struct sk_buff *skb; struct sk_buff *skb;
rxrpc_serial_t serial;
rxrpc_seq_t hard_ack, top, seq; rxrpc_seq_t hard_ack, top, seq;
size_t remain; size_t remain;
bool last; bool last;
...@@ -339,9 +343,12 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, ...@@ -339,9 +343,12 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
rxrpc_see_skb(skb, rxrpc_skb_rx_seen); rxrpc_see_skb(skb, rxrpc_skb_rx_seen);
sp = rxrpc_skb(skb); sp = rxrpc_skb(skb);
if (!(flags & MSG_PEEK)) if (!(flags & MSG_PEEK)) {
serial = sp->hdr.serial;
serial += call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
trace_rxrpc_receive(call, rxrpc_receive_front, trace_rxrpc_receive(call, rxrpc_receive_front,
sp->hdr.serial, seq); serial, seq);
}
if (msg) if (msg)
sock_recv_timestamp(msg, sock->sk, skb); sock_recv_timestamp(msg, sock->sk, skb);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment