Commit 3136ef49 authored by David Howells's avatar David Howells

rxrpc: Delay terminal ACK transmission on a client call

Delay terminal ACK transmission on a client call by deferring it to the
connection processor.  This allows it to be skipped if we can send the next
call instead, the first DATA packet of which will implicitly ack this call.
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
parent 9faaff59
...@@ -338,8 +338,17 @@ enum rxrpc_conn_flag { ...@@ -338,8 +338,17 @@ enum rxrpc_conn_flag {
RXRPC_CONN_DONT_REUSE, /* Don't reuse this connection */ RXRPC_CONN_DONT_REUSE, /* Don't reuse this connection */
RXRPC_CONN_COUNTED, /* Counted by rxrpc_nr_client_conns */ RXRPC_CONN_COUNTED, /* Counted by rxrpc_nr_client_conns */
RXRPC_CONN_PROBING_FOR_UPGRADE, /* Probing for service upgrade */ RXRPC_CONN_PROBING_FOR_UPGRADE, /* Probing for service upgrade */
RXRPC_CONN_FINAL_ACK_0, /* Need final ACK for channel 0 */
RXRPC_CONN_FINAL_ACK_1, /* Need final ACK for channel 1 */
RXRPC_CONN_FINAL_ACK_2, /* Need final ACK for channel 2 */
RXRPC_CONN_FINAL_ACK_3, /* Need final ACK for channel 3 */
}; };
#define RXRPC_CONN_FINAL_ACK_MASK ((1UL << RXRPC_CONN_FINAL_ACK_0) | \
(1UL << RXRPC_CONN_FINAL_ACK_1) | \
(1UL << RXRPC_CONN_FINAL_ACK_2) | \
(1UL << RXRPC_CONN_FINAL_ACK_3))
/* /*
* Events that can be raised upon a connection. * Events that can be raised upon a connection.
*/ */
...@@ -393,6 +402,7 @@ struct rxrpc_connection { ...@@ -393,6 +402,7 @@ struct rxrpc_connection {
#define RXRPC_ACTIVE_CHANS_MASK ((1 << RXRPC_MAXCALLS) - 1) #define RXRPC_ACTIVE_CHANS_MASK ((1 << RXRPC_MAXCALLS) - 1)
struct list_head waiting_calls; /* Calls waiting for channels */ struct list_head waiting_calls; /* Calls waiting for channels */
struct rxrpc_channel { struct rxrpc_channel {
unsigned long final_ack_at; /* Time at which to issue final ACK */
struct rxrpc_call __rcu *call; /* Active call */ struct rxrpc_call __rcu *call; /* Active call */
u32 call_id; /* ID of current call */ u32 call_id; /* ID of current call */
u32 call_counter; /* Call ID counter */ u32 call_counter; /* Call ID counter */
...@@ -404,6 +414,7 @@ struct rxrpc_connection { ...@@ -404,6 +414,7 @@ struct rxrpc_connection {
}; };
} channels[RXRPC_MAXCALLS]; } channels[RXRPC_MAXCALLS];
struct timer_list timer; /* Conn event timer */
struct work_struct processor; /* connection event processor */ struct work_struct processor; /* connection event processor */
union { union {
struct rb_node client_node; /* Node in local->client_conns */ struct rb_node client_node; /* Node in local->client_conns */
...@@ -861,6 +872,12 @@ static inline void rxrpc_put_connection(struct rxrpc_connection *conn) ...@@ -861,6 +872,12 @@ static inline void rxrpc_put_connection(struct rxrpc_connection *conn)
rxrpc_put_service_conn(conn); rxrpc_put_service_conn(conn);
} }
static inline void rxrpc_reduce_conn_timer(struct rxrpc_connection *conn,
unsigned long expire_at)
{
timer_reduce(&conn->timer, expire_at);
}
/* /*
* conn_service.c * conn_service.c
*/ */
......
...@@ -554,6 +554,11 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn, ...@@ -554,6 +554,11 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
trace_rxrpc_client(conn, channel, rxrpc_client_chan_activate); trace_rxrpc_client(conn, channel, rxrpc_client_chan_activate);
/* Cancel the final ACK on the previous call if it hasn't been sent yet
* as the DATA packet will implicitly ACK it.
*/
clear_bit(RXRPC_CONN_FINAL_ACK_0 + channel, &conn->flags);
write_lock_bh(&call->state_lock); write_lock_bh(&call->state_lock);
if (!test_bit(RXRPC_CALL_TX_LASTQ, &call->flags)) if (!test_bit(RXRPC_CALL_TX_LASTQ, &call->flags))
call->state = RXRPC_CALL_CLIENT_SEND_REQUEST; call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
...@@ -813,6 +818,19 @@ void rxrpc_disconnect_client_call(struct rxrpc_call *call) ...@@ -813,6 +818,19 @@ void rxrpc_disconnect_client_call(struct rxrpc_call *call)
goto out_2; goto out_2;
} }
/* Schedule the final ACK to be transmitted in a short while so that it
* can be skipped if we find a follow-on call. The first DATA packet
* of the follow on call will implicitly ACK this call.
*/
if (test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
unsigned long final_ack_at = jiffies + 2;
WRITE_ONCE(chan->final_ack_at, final_ack_at);
smp_wmb(); /* vs rxrpc_process_delayed_final_acks() */
set_bit(RXRPC_CONN_FINAL_ACK_0 + channel, &conn->flags);
rxrpc_reduce_conn_timer(conn, final_ack_at);
}
/* Things are more complex and we need the cache lock. We might be /* Things are more complex and we need the cache lock. We might be
* able to simply idle the conn or it might now be lurking on the wait * able to simply idle the conn or it might now be lurking on the wait
* list. It might even get moved back to the active list whilst we're * list. It might even get moved back to the active list whilst we're
......
...@@ -24,9 +24,10 @@ ...@@ -24,9 +24,10 @@
* Retransmit terminal ACK or ABORT of the previous call. * Retransmit terminal ACK or ABORT of the previous call.
*/ */
static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn, static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
struct sk_buff *skb) struct sk_buff *skb,
unsigned int channel)
{ {
struct rxrpc_skb_priv *sp = rxrpc_skb(skb); struct rxrpc_skb_priv *sp = skb ? rxrpc_skb(skb) : NULL;
struct rxrpc_channel *chan; struct rxrpc_channel *chan;
struct msghdr msg; struct msghdr msg;
struct kvec iov; struct kvec iov;
...@@ -48,7 +49,7 @@ static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn, ...@@ -48,7 +49,7 @@ static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
_enter("%d", conn->debug_id); _enter("%d", conn->debug_id);
chan = &conn->channels[sp->hdr.cid & RXRPC_CHANNELMASK]; chan = &conn->channels[channel];
/* If the last call got moved on whilst we were waiting to run, just /* If the last call got moved on whilst we were waiting to run, just
* ignore this packet. * ignore this packet.
...@@ -56,7 +57,7 @@ static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn, ...@@ -56,7 +57,7 @@ static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
call_id = READ_ONCE(chan->last_call); call_id = READ_ONCE(chan->last_call);
/* Sync with __rxrpc_disconnect_call() */ /* Sync with __rxrpc_disconnect_call() */
smp_rmb(); smp_rmb();
if (call_id != sp->hdr.callNumber) if (skb && call_id != sp->hdr.callNumber)
return; return;
msg.msg_name = &conn->params.peer->srx.transport; msg.msg_name = &conn->params.peer->srx.transport;
...@@ -65,9 +66,9 @@ static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn, ...@@ -65,9 +66,9 @@ static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
msg.msg_controllen = 0; msg.msg_controllen = 0;
msg.msg_flags = 0; msg.msg_flags = 0;
pkt.whdr.epoch = htonl(sp->hdr.epoch); pkt.whdr.epoch = htonl(conn->proto.epoch);
pkt.whdr.cid = htonl(sp->hdr.cid); pkt.whdr.cid = htonl(conn->proto.cid);
pkt.whdr.callNumber = htonl(sp->hdr.callNumber); pkt.whdr.callNumber = htonl(call_id);
pkt.whdr.seq = 0; pkt.whdr.seq = 0;
pkt.whdr.type = chan->last_type; pkt.whdr.type = chan->last_type;
pkt.whdr.flags = conn->out_clientflag; pkt.whdr.flags = conn->out_clientflag;
...@@ -87,11 +88,11 @@ static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn, ...@@ -87,11 +88,11 @@ static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
mtu = conn->params.peer->if_mtu; mtu = conn->params.peer->if_mtu;
mtu -= conn->params.peer->hdrsize; mtu -= conn->params.peer->hdrsize;
pkt.ack.bufferSpace = 0; pkt.ack.bufferSpace = 0;
pkt.ack.maxSkew = htons(skb->priority); pkt.ack.maxSkew = htons(skb ? skb->priority : 0);
pkt.ack.firstPacket = htonl(chan->last_seq); pkt.ack.firstPacket = htonl(chan->last_seq + 1);
pkt.ack.previousPacket = htonl(chan->last_seq - 1); pkt.ack.previousPacket = htonl(chan->last_seq);
pkt.ack.serial = htonl(sp->hdr.serial); pkt.ack.serial = htonl(skb ? sp->hdr.serial : 0);
pkt.ack.reason = RXRPC_ACK_DUPLICATE; pkt.ack.reason = skb ? RXRPC_ACK_DUPLICATE : RXRPC_ACK_IDLE;
pkt.ack.nAcks = 0; pkt.ack.nAcks = 0;
pkt.info.rxMTU = htonl(rxrpc_rx_mtu); pkt.info.rxMTU = htonl(rxrpc_rx_mtu);
pkt.info.maxMTU = htonl(mtu); pkt.info.maxMTU = htonl(mtu);
...@@ -272,7 +273,8 @@ static int rxrpc_process_event(struct rxrpc_connection *conn, ...@@ -272,7 +273,8 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
switch (sp->hdr.type) { switch (sp->hdr.type) {
case RXRPC_PACKET_TYPE_DATA: case RXRPC_PACKET_TYPE_DATA:
case RXRPC_PACKET_TYPE_ACK: case RXRPC_PACKET_TYPE_ACK:
rxrpc_conn_retransmit_call(conn, skb); rxrpc_conn_retransmit_call(conn, skb,
sp->hdr.cid & RXRPC_CHANNELMASK);
return 0; return 0;
case RXRPC_PACKET_TYPE_BUSY: case RXRPC_PACKET_TYPE_BUSY:
...@@ -378,6 +380,48 @@ static void rxrpc_secure_connection(struct rxrpc_connection *conn) ...@@ -378,6 +380,48 @@ static void rxrpc_secure_connection(struct rxrpc_connection *conn)
_leave(" [aborted]"); _leave(" [aborted]");
} }
/*
* Process delayed final ACKs that we haven't subsumed into a subsequent call.
*/
static void rxrpc_process_delayed_final_acks(struct rxrpc_connection *conn)
{
unsigned long j = jiffies, next_j;
unsigned int channel;
bool set;
again:
next_j = j + LONG_MAX;
set = false;
for (channel = 0; channel < RXRPC_MAXCALLS; channel++) {
struct rxrpc_channel *chan = &conn->channels[channel];
unsigned long ack_at;
if (!test_bit(RXRPC_CONN_FINAL_ACK_0 + channel, &conn->flags))
continue;
smp_rmb(); /* vs rxrpc_disconnect_client_call */
ack_at = READ_ONCE(chan->final_ack_at);
if (time_before(j, ack_at)) {
if (time_before(ack_at, next_j)) {
next_j = ack_at;
set = true;
}
continue;
}
if (test_and_clear_bit(RXRPC_CONN_FINAL_ACK_0 + channel,
&conn->flags))
rxrpc_conn_retransmit_call(conn, NULL, channel);
}
j = jiffies;
if (time_before_eq(next_j, j))
goto again;
if (set)
rxrpc_reduce_conn_timer(conn, next_j);
}
/* /*
* connection-level event processor * connection-level event processor
*/ */
...@@ -394,6 +438,10 @@ void rxrpc_process_connection(struct work_struct *work) ...@@ -394,6 +438,10 @@ void rxrpc_process_connection(struct work_struct *work)
if (test_and_clear_bit(RXRPC_CONN_EV_CHALLENGE, &conn->events)) if (test_and_clear_bit(RXRPC_CONN_EV_CHALLENGE, &conn->events))
rxrpc_secure_connection(conn); rxrpc_secure_connection(conn);
/* Process delayed ACKs whose time has come. */
if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK)
rxrpc_process_delayed_final_acks(conn);
/* go through the conn-level event packets, releasing the ref on this /* go through the conn-level event packets, releasing the ref on this
* connection that each one has when we've finished with it */ * connection that each one has when we've finished with it */
while ((skb = skb_dequeue(&conn->rx_queue))) { while ((skb = skb_dequeue(&conn->rx_queue))) {
......
...@@ -24,6 +24,14 @@ unsigned int rxrpc_connection_expiry = 10 * 60; ...@@ -24,6 +24,14 @@ unsigned int rxrpc_connection_expiry = 10 * 60;
static void rxrpc_destroy_connection(struct rcu_head *); static void rxrpc_destroy_connection(struct rcu_head *);
static void rxrpc_connection_timer(struct timer_list *timer)
{
struct rxrpc_connection *conn =
container_of(timer, struct rxrpc_connection, timer);
rxrpc_queue_conn(conn);
}
/* /*
* allocate a new connection * allocate a new connection
*/ */
...@@ -38,6 +46,7 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp) ...@@ -38,6 +46,7 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
INIT_LIST_HEAD(&conn->cache_link); INIT_LIST_HEAD(&conn->cache_link);
spin_lock_init(&conn->channel_lock); spin_lock_init(&conn->channel_lock);
INIT_LIST_HEAD(&conn->waiting_calls); INIT_LIST_HEAD(&conn->waiting_calls);
timer_setup(&conn->timer, &rxrpc_connection_timer, 0);
INIT_WORK(&conn->processor, &rxrpc_process_connection); INIT_WORK(&conn->processor, &rxrpc_process_connection);
INIT_LIST_HEAD(&conn->proc_link); INIT_LIST_HEAD(&conn->proc_link);
INIT_LIST_HEAD(&conn->link); INIT_LIST_HEAD(&conn->link);
...@@ -332,6 +341,7 @@ static void rxrpc_destroy_connection(struct rcu_head *rcu) ...@@ -332,6 +341,7 @@ static void rxrpc_destroy_connection(struct rcu_head *rcu)
_net("DESTROY CONN %d", conn->debug_id); _net("DESTROY CONN %d", conn->debug_id);
del_timer_sync(&conn->timer);
rxrpc_purge_queue(&conn->rx_queue); rxrpc_purge_queue(&conn->rx_queue);
conn->security->clear(conn); conn->security->clear(conn);
......
...@@ -144,11 +144,13 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial) ...@@ -144,11 +144,13 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top); trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
ASSERTCMP(call->rx_hard_ack, ==, call->rx_top); ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
#if 0 // TODO: May want to transmit final ACK under some circumstances anyway
if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) { if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, serial, true, false, rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, serial, true, false,
rxrpc_propose_ack_terminal_ack); rxrpc_propose_ack_terminal_ack);
rxrpc_send_ack_packet(call, false); rxrpc_send_ack_packet(call, false);
} }
#endif
write_lock_bh(&call->state_lock); write_lock_bh(&call->state_lock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment