Commit 63e7f1ac authored by Allan Stephens's avatar Allan Stephens Committed by Paul Gortmaker

tipc: Prevent loss of fragmented messages over broadcast link

Modifies broadcast link so that an incoming fragmented message is not
lost if reassembly cannot begin because there currently is no buffer
big enough to hold the entire reassembled message. The broadcast link
now ignores the first fragment completely, which causes the sending node
to retransmit the first fragment so that reassembly can be re-attempted.

Previously, the sender would have had no reason to retransmit the 1st
fragment, so we would never have a chance to re-try the allocation.

To do this cleanly without duplicaton, a new bclink_accept_pkt()
function is introduced.
Signed-off-by: default avatarAllan Stephens <allan.stephens@windriver.com>
Signed-off-by: default avatarPaul Gortmaker <paul.gortmaker@windriver.com>
parent b76b27ca
...@@ -389,7 +389,33 @@ int tipc_bclink_send_msg(struct sk_buff *buf) ...@@ -389,7 +389,33 @@ int tipc_bclink_send_msg(struct sk_buff *buf)
return res; return res;
} }
/** /*
* bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
*
* Called with both sending node's lock and bc_lock taken.
*/
static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
{
bclink_update_last_sent(node, seqno);
node->bclink.last_in = seqno;
node->bclink.oos_state = 0;
bcl->stats.recv_info++;
/*
* Unicast an ACK periodically, ensuring that
* all nodes in the cluster don't ACK at the same time
*/
if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
tipc_link_send_proto_msg(
node->active_links[node->addr & 1],
STATE_MSG, 0, 0, 0, 0, 0);
bcl->stats.sent_acks++;
}
}
/*
* tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards
* *
* tipc_net_lock is read_locked, no other locks set * tipc_net_lock is read_locked, no other locks set
...@@ -443,29 +469,12 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) ...@@ -443,29 +469,12 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
next_in = mod(node->bclink.last_in + 1); next_in = mod(node->bclink.last_in + 1);
if (likely(seqno == next_in)) { if (likely(seqno == next_in)) {
bclink_update_last_sent(node, seqno);
receive: receive:
node->bclink.last_in = seqno;
node->bclink.oos_state = 0;
spin_lock_bh(&bc_lock);
bcl->stats.recv_info++;
/*
* Unicast an ACK periodically, ensuring that
* all nodes in the cluster don't ACK at the same time
*/
if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
tipc_link_send_proto_msg(
node->active_links[node->addr & 1],
STATE_MSG, 0, 0, 0, 0, 0);
bcl->stats.sent_acks++;
}
/* Deliver message to destination */ /* Deliver message to destination */
if (likely(msg_isdata(msg))) { if (likely(msg_isdata(msg))) {
spin_lock_bh(&bc_lock);
bclink_accept_pkt(node, seqno);
spin_unlock_bh(&bc_lock); spin_unlock_bh(&bc_lock);
tipc_node_unlock(node); tipc_node_unlock(node);
if (likely(msg_mcast(msg))) if (likely(msg_mcast(msg)))
...@@ -473,24 +482,35 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf) ...@@ -473,24 +482,35 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
else else
buf_discard(buf); buf_discard(buf);
} else if (msg_user(msg) == MSG_BUNDLER) { } else if (msg_user(msg) == MSG_BUNDLER) {
spin_lock_bh(&bc_lock);
bclink_accept_pkt(node, seqno);
bcl->stats.recv_bundles++; bcl->stats.recv_bundles++;
bcl->stats.recv_bundled += msg_msgcnt(msg); bcl->stats.recv_bundled += msg_msgcnt(msg);
spin_unlock_bh(&bc_lock); spin_unlock_bh(&bc_lock);
tipc_node_unlock(node); tipc_node_unlock(node);
tipc_link_recv_bundle(buf); tipc_link_recv_bundle(buf);
} else if (msg_user(msg) == MSG_FRAGMENTER) { } else if (msg_user(msg) == MSG_FRAGMENTER) {
int ret = tipc_link_recv_fragment(&node->bclink.defragm,
&buf, &msg);
if (ret < 0)
goto unlock;
spin_lock_bh(&bc_lock);
bclink_accept_pkt(node, seqno);
bcl->stats.recv_fragments++; bcl->stats.recv_fragments++;
if (tipc_link_recv_fragment(&node->bclink.defragm, if (ret > 0)
&buf, &msg))
bcl->stats.recv_fragmented++; bcl->stats.recv_fragmented++;
spin_unlock_bh(&bc_lock); spin_unlock_bh(&bc_lock);
tipc_node_unlock(node); tipc_node_unlock(node);
tipc_net_route_msg(buf); tipc_net_route_msg(buf);
} else if (msg_user(msg) == NAME_DISTRIBUTOR) { } else if (msg_user(msg) == NAME_DISTRIBUTOR) {
spin_lock_bh(&bc_lock);
bclink_accept_pkt(node, seqno);
spin_unlock_bh(&bc_lock); spin_unlock_bh(&bc_lock);
tipc_node_unlock(node); tipc_node_unlock(node);
tipc_named_recv(buf); tipc_named_recv(buf);
} else { } else {
spin_lock_bh(&bc_lock);
bclink_accept_pkt(node, seqno);
spin_unlock_bh(&bc_lock); spin_unlock_bh(&bc_lock);
tipc_node_unlock(node); tipc_node_unlock(node);
buf_discard(buf); buf_discard(buf);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment