Commit b591c6f6 authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc-link-changeover-issues'

Tuong Lien says:

====================
tipc: link changeover issues

This patch series is to resolve some issues found with the current link
changeover mechanism, it also includes an optimization for the link
synching.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 37f7c66f 2320bcda
......@@ -180,6 +180,7 @@ struct tipc_link {
/* Fragmentation/reassembly */
struct sk_buff *reasm_buf;
struct sk_buff *reasm_tnlmsg;
/* Broadcast */
u16 ackers;
......@@ -897,8 +898,10 @@ void tipc_link_reset(struct tipc_link *l)
l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
kfree_skb(l->reasm_buf);
kfree_skb(l->reasm_tnlmsg);
kfree_skb(l->failover_reasm_skb);
l->reasm_buf = NULL;
l->reasm_tnlmsg = NULL;
l->failover_reasm_skb = NULL;
l->rcv_unacked = 0;
l->snd_nxt = 1;
......@@ -940,6 +943,9 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
int rc = 0;
if (unlikely(msg_size(hdr) > mtu)) {
pr_warn("Too large msg, purging xmit list %d %d %d %d %d!\n",
skb_queue_len(list), msg_user(hdr),
msg_type(hdr), msg_size(hdr), mtu);
skb_queue_purge(list);
return -EMSGSIZE;
}
......@@ -1233,6 +1239,7 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *inputq)
{
struct sk_buff **reasm_skb = &l->failover_reasm_skb;
struct sk_buff **reasm_tnlmsg = &l->reasm_tnlmsg;
struct sk_buff_head *fdefq = &l->failover_deferdq;
struct tipc_msg *hdr = buf_msg(skb);
struct sk_buff *iskb;
......@@ -1240,40 +1247,56 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb,
int rc = 0;
u16 seqno;
/* SYNCH_MSG */
if (msg_type(hdr) == SYNCH_MSG)
goto drop;
if (msg_type(hdr) == SYNCH_MSG) {
kfree_skb(skb);
return 0;
}
/* FAILOVER_MSG */
if (!tipc_msg_extract(skb, &iskb, &ipos)) {
pr_warn_ratelimited("Cannot extract FAILOVER_MSG, defq: %d\n",
skb_queue_len(fdefq));
return rc;
/* Not a fragment? */
if (likely(!msg_nof_fragms(hdr))) {
if (unlikely(!tipc_msg_extract(skb, &iskb, &ipos))) {
pr_warn_ratelimited("Unable to extract msg, defq: %d\n",
skb_queue_len(fdefq));
return 0;
}
kfree_skb(skb);
} else {
/* Set fragment type for buf_append */
if (msg_fragm_no(hdr) == 1)
msg_set_type(hdr, FIRST_FRAGMENT);
else if (msg_fragm_no(hdr) < msg_nof_fragms(hdr))
msg_set_type(hdr, FRAGMENT);
else
msg_set_type(hdr, LAST_FRAGMENT);
if (!tipc_buf_append(reasm_tnlmsg, &skb)) {
/* Successful but non-complete reassembly? */
if (*reasm_tnlmsg || link_is_bc_rcvlink(l))
return 0;
pr_warn_ratelimited("Unable to reassemble tunnel msg\n");
return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
}
iskb = skb;
}
do {
seqno = buf_seqno(iskb);
if (unlikely(less(seqno, l->drop_point))) {
kfree_skb(iskb);
continue;
}
if (unlikely(seqno != l->drop_point)) {
__tipc_skb_queue_sorted(fdefq, seqno, iskb);
continue;
}
l->drop_point++;
if (!tipc_data_input(l, iskb, inputq))
rc |= tipc_link_input(l, iskb, inputq, reasm_skb);
if (unlikely(rc))
break;
} while ((iskb = __tipc_skb_dequeue(fdefq, l->drop_point)));
drop:
kfree_skb(skb);
return rc;
}
......@@ -1663,14 +1686,18 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
struct sk_buff *skb, *tnlskb;
struct tipc_msg *hdr, tnlhdr;
struct sk_buff_head *queue = &l->transmq;
struct sk_buff_head tmpxq, tnlq;
struct sk_buff_head tmpxq, tnlq, frags;
u16 pktlen, pktcnt, seqno = l->snd_nxt;
bool pktcnt_need_update = false;
u16 syncpt;
int rc;
if (!tnl)
return;
skb_queue_head_init(&tnlq);
skb_queue_head_init(&tmpxq);
skb_queue_head_init(&frags);
/* At least one packet required for safe algorithm => add dummy */
skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG,
......@@ -1684,6 +1711,31 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
tipc_link_xmit(l, &tnlq, &tmpxq);
__skb_queue_purge(&tmpxq);
/* Link Synching:
* From now on, send only one single ("dummy") SYNCH message
* to peer. The SYNCH message does not contain any data, just
* a header conveying the synch point to the peer.
*/
if (mtyp == SYNCH_MSG && (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) {
tnlskb = tipc_msg_create(TUNNEL_PROTOCOL, SYNCH_MSG,
INT_H_SIZE, 0, l->addr,
tipc_own_addr(l->net),
0, 0, 0);
if (!tnlskb) {
pr_warn("%sunable to create dummy SYNCH_MSG\n",
link_co_err);
return;
}
hdr = buf_msg(tnlskb);
syncpt = l->snd_nxt + skb_queue_len(&l->backlogq) - 1;
msg_set_syncpt(hdr, syncpt);
msg_set_bearer_id(hdr, l->peer_bearer_id);
__skb_queue_tail(&tnlq, tnlskb);
tipc_link_xmit(tnl, &tnlq, xmitq);
return;
}
/* Initialize reusable tunnel packet header */
tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL,
mtyp, INT_H_SIZE, l->addr);
......@@ -1701,6 +1753,39 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
if (queue == &l->backlogq)
msg_set_seqno(hdr, seqno++);
pktlen = msg_size(hdr);
/* Tunnel link MTU is not large enough? This could be
* due to:
* 1) Link MTU has just changed or set differently;
* 2) Or FAILOVER on the top of a SYNCH message
*
* The 2nd case should not happen if peer supports
* TIPC_TUNNEL_ENHANCED
*/
if (pktlen > tnl->mtu - INT_H_SIZE) {
if (mtyp == FAILOVER_MSG &&
(tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) {
rc = tipc_msg_fragment(skb, &tnlhdr, tnl->mtu,
&frags);
if (rc) {
pr_warn("%sunable to frag msg: rc %d\n",
link_co_err, rc);
return;
}
pktcnt += skb_queue_len(&frags) - 1;
pktcnt_need_update = true;
skb_queue_splice_tail_init(&frags, &tnlq);
continue;
}
/* Unluckily, peer doesn't have TIPC_TUNNEL_ENHANCED
* => Just warn it and return!
*/
pr_warn_ratelimited("%stoo large msg <%d, %d>: %d!\n",
link_co_err, msg_user(hdr),
msg_type(hdr), msg_size(hdr));
return;
}
msg_set_size(&tnlhdr, pktlen + INT_H_SIZE);
tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE, GFP_ATOMIC);
if (!tnlskb) {
......@@ -1716,6 +1801,12 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
goto tnl;
}
if (pktcnt_need_update)
skb_queue_walk(&tnlq, skb) {
hdr = buf_msg(skb);
msg_set_msgcnt(hdr, pktcnt);
}
tipc_link_xmit(tnl, &tnlq, xmitq);
if (mtyp == FAILOVER_MSG) {
......
......@@ -243,6 +243,65 @@ bool tipc_msg_validate(struct sk_buff **_skb)
return true;
}
/**
* tipc_msg_fragment - build a fragment skb list for TIPC message
*
* @skb: TIPC message skb
* @hdr: internal msg header to be put on the top of the fragments
* @pktmax: max size of a fragment incl. the header
* @frags: returned fragment skb list
*
* Returns 0 if the fragmentation is successful, otherwise: -EINVAL
* or -ENOMEM
*/
int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr,
int pktmax, struct sk_buff_head *frags)
{
int pktno, nof_fragms, dsz, dmax, eat;
struct tipc_msg *_hdr;
struct sk_buff *_skb;
u8 *data;
/* Non-linear buffer? */
if (skb_linearize(skb))
return -ENOMEM;
data = (u8 *)skb->data;
dsz = msg_size(buf_msg(skb));
dmax = pktmax - INT_H_SIZE;
if (dsz <= dmax || !dmax)
return -EINVAL;
nof_fragms = dsz / dmax + 1;
for (pktno = 1; pktno <= nof_fragms; pktno++) {
if (pktno < nof_fragms)
eat = dmax;
else
eat = dsz % dmax;
/* Allocate a new fragment */
_skb = tipc_buf_acquire(INT_H_SIZE + eat, GFP_ATOMIC);
if (!_skb)
goto error;
skb_orphan(_skb);
__skb_queue_tail(frags, _skb);
/* Copy header & data to the fragment */
skb_copy_to_linear_data(_skb, hdr, INT_H_SIZE);
skb_copy_to_linear_data_offset(_skb, INT_H_SIZE, data, eat);
data += eat;
/* Update the fragment's header */
_hdr = buf_msg(_skb);
msg_set_fragm_no(_hdr, pktno);
msg_set_nof_fragms(_hdr, nof_fragms);
msg_set_size(_hdr, INT_H_SIZE + eat);
}
return 0;
error:
__skb_queue_purge(frags);
__skb_queue_head_init(frags);
return -ENOMEM;
}
/**
* tipc_msg_build - create buffer chain containing specified header and data
* @mhdr: Message header, to be prepended to data
......
......@@ -721,12 +721,26 @@ static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n)
msg_set_bits(m, 4, 16, 0xffff, n);
}
static inline u32 msg_nof_fragms(struct tipc_msg *m)
{
return msg_bits(m, 4, 0, 0xffff);
}
static inline void msg_set_nof_fragms(struct tipc_msg *m, u32 n)
{
msg_set_bits(m, 4, 0, 0xffff, n);
}
static inline u32 msg_fragm_no(struct tipc_msg *m)
{
return msg_bits(m, 4, 16, 0xffff);
}
static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n)
{
msg_set_bits(m, 4, 16, 0xffff, n);
}
static inline u16 msg_next_sent(struct tipc_msg *m)
{
return msg_bits(m, 4, 0, 0xffff);
......@@ -877,6 +891,16 @@ static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n)
msg_set_bits(m, 9, 16, 0xffff, n);
}
static inline u16 msg_syncpt(struct tipc_msg *m)
{
return msg_bits(m, 9, 16, 0xffff);
}
static inline void msg_set_syncpt(struct tipc_msg *m, u16 n)
{
msg_set_bits(m, 9, 16, 0xffff, n);
}
static inline u32 msg_conn_ack(struct tipc_msg *m)
{
return msg_bits(m, 9, 16, 0xffff);
......@@ -1035,6 +1059,8 @@ bool tipc_msg_bundle(struct sk_buff *skb, struct tipc_msg *msg, u32 mtu);
bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg,
u32 mtu, u32 dnode);
bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr,
int pktmax, struct sk_buff_head *frags);
int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
int offset, int dsz, int mtu, struct sk_buff_head *list);
bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
......
......@@ -1649,7 +1649,6 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
int usr = msg_user(hdr);
int mtyp = msg_type(hdr);
u16 oseqno = msg_seqno(hdr);
u16 iseqno = msg_seqno(msg_inner_hdr(hdr));
u16 exp_pkts = msg_msgcnt(hdr);
u16 rcv_nxt, syncpt, dlv_nxt, inputq_len;
int state = n->state;
......@@ -1748,7 +1747,10 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
/* Initiate synch mode if applicable */
if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG) && (oseqno == 1)) {
syncpt = iseqno + exp_pkts - 1;
if (n->capabilities & TIPC_TUNNEL_ENHANCED)
syncpt = msg_syncpt(hdr);
else
syncpt = msg_seqno(msg_inner_hdr(hdr)) + exp_pkts - 1;
if (!tipc_link_is_up(l))
__tipc_node_link_up(n, bearer_id, xmitq);
if (n->state == SELF_UP_PEER_UP) {
......
......@@ -53,7 +53,8 @@ enum {
TIPC_NODE_ID128 = (1 << 5),
TIPC_LINK_PROTO_SEQNO = (1 << 6),
TIPC_MCAST_RBCTL = (1 << 7),
TIPC_GAP_ACK_BLOCK = (1 << 8)
TIPC_GAP_ACK_BLOCK = (1 << 8),
TIPC_TUNNEL_ENHANCED = (1 << 9)
};
#define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \
......@@ -64,7 +65,8 @@ enum {
TIPC_NODE_ID128 | \
TIPC_LINK_PROTO_SEQNO | \
TIPC_MCAST_RBCTL | \
TIPC_GAP_ACK_BLOCK)
TIPC_GAP_ACK_BLOCK | \
TIPC_TUNNEL_ENHANCED)
#define INVALID_BEARER_ID -1
void tipc_node_stop(struct net *net);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment