Commit 8e6e85e6 authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc-next'

Erik Hugne says:

====================
tipc: link state processing improvements

Message delivery is separated from the link state processing, and
we fix a bug in receive-path triggered acks.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 86c6a2c7 3f53bd8f
...@@ -88,6 +88,8 @@ static void link_print(struct tipc_link *l_ptr, const char *str); ...@@ -88,6 +88,8 @@ static void link_print(struct tipc_link *l_ptr, const char *str);
static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf); static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
static void tipc_link_sync_xmit(struct tipc_link *l); static void tipc_link_sync_xmit(struct tipc_link *l);
static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf);
static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf);
/* /*
* Simple link routines * Simple link routines
...@@ -1420,11 +1422,6 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) ...@@ -1420,11 +1422,6 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
if (unlikely(!list_empty(&l_ptr->waiting_ports))) if (unlikely(!list_empty(&l_ptr->waiting_ports)))
tipc_link_wakeup_ports(l_ptr, 0); tipc_link_wakeup_ports(l_ptr, 0);
if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
l_ptr->stats.sent_acks++;
tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
}
/* Process the incoming packet */ /* Process the incoming packet */
if (unlikely(!link_working_working(l_ptr))) { if (unlikely(!link_working_working(l_ptr))) {
if (msg_user(msg) == LINK_PROTOCOL) { if (msg_user(msg) == LINK_PROTOCOL) {
...@@ -1458,54 +1455,19 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) ...@@ -1458,54 +1455,19 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
if (unlikely(l_ptr->oldest_deferred_in)) if (unlikely(l_ptr->oldest_deferred_in))
head = link_insert_deferred_queue(l_ptr, head); head = link_insert_deferred_queue(l_ptr, head);
/* Deliver packet/message to correct user: */ if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) { l_ptr->stats.sent_acks++;
if (!tipc_link_tunnel_rcv(n_ptr, &buf)) { tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
tipc_node_unlock(n_ptr);
continue;
}
msg = buf_msg(buf);
} else if (msg_user(msg) == MSG_FRAGMENTER) {
l_ptr->stats.recv_fragments++;
if (tipc_buf_append(&l_ptr->reasm_buf, &buf)) {
l_ptr->stats.recv_fragmented++;
msg = buf_msg(buf);
} else {
if (!l_ptr->reasm_buf)
tipc_link_reset(l_ptr);
tipc_node_unlock(n_ptr);
continue;
}
} }
switch (msg_user(msg)) { if (tipc_link_prepare_input(l_ptr, &buf)) {
case TIPC_LOW_IMPORTANCE:
case TIPC_MEDIUM_IMPORTANCE:
case TIPC_HIGH_IMPORTANCE:
case TIPC_CRITICAL_IMPORTANCE:
case CONN_MANAGER:
tipc_node_unlock(n_ptr);
tipc_sk_rcv(buf);
continue;
case MSG_BUNDLER:
l_ptr->stats.recv_bundles++;
l_ptr->stats.recv_bundled += msg_msgcnt(msg);
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
tipc_link_bundle_rcv(buf);
continue; continue;
case NAME_DISTRIBUTOR:
n_ptr->bclink.recv_permitted = true;
tipc_node_unlock(n_ptr);
tipc_named_rcv(buf);
continue;
case BCAST_PROTOCOL:
tipc_link_sync_rcv(n_ptr, buf);
break;
default:
kfree_skb(buf);
break;
} }
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
msg = buf_msg(buf);
if (tipc_link_input(l_ptr, buf) != 0)
goto discard;
continue; continue;
unlock_discard: unlock_discard:
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
...@@ -1514,6 +1476,80 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) ...@@ -1514,6 +1476,80 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
} }
} }
/**
* tipc_link_prepare_input - process TIPC link messages
*
* returns nonzero if the message was consumed
*
* Node lock must be held
*/
static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf)
{
struct tipc_node *n;
struct tipc_msg *msg;
int res = -EINVAL;
n = l->owner;
msg = buf_msg(*buf);
switch (msg_user(msg)) {
case CHANGEOVER_PROTOCOL:
if (tipc_link_tunnel_rcv(n, buf))
res = 0;
break;
case MSG_FRAGMENTER:
l->stats.recv_fragments++;
if (tipc_buf_append(&l->reasm_buf, buf)) {
l->stats.recv_fragmented++;
res = 0;
} else if (!l->reasm_buf) {
tipc_link_reset(l);
}
break;
case MSG_BUNDLER:
l->stats.recv_bundles++;
l->stats.recv_bundled += msg_msgcnt(msg);
res = 0;
break;
case NAME_DISTRIBUTOR:
n->bclink.recv_permitted = true;
res = 0;
break;
case BCAST_PROTOCOL:
tipc_link_sync_rcv(n, *buf);
break;
default:
res = 0;
}
return res;
}
/**
* tipc_link_input - Deliver message too higher layers
*/
static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
int res = 0;
switch (msg_user(msg)) {
case TIPC_LOW_IMPORTANCE:
case TIPC_MEDIUM_IMPORTANCE:
case TIPC_HIGH_IMPORTANCE:
case TIPC_CRITICAL_IMPORTANCE:
case CONN_MANAGER:
tipc_sk_rcv(buf);
break;
case NAME_DISTRIBUTOR:
tipc_named_rcv(buf);
break;
case MSG_BUNDLER:
tipc_link_bundle_rcv(buf);
break;
default:
res = -EINVAL;
}
return res;
}
/** /**
* tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
* *
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment