Commit 6a862a44 authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc-add-some-improvements'

Tuong Lien says:

====================
tipc: add some improvements

This series adds some improvements to TIPC.

The first patch improves the TIPC broadcast's performance with the 'Gap
ACK blocks' mechanism similar to unicast before, while the others give
support on tracing & statistics for broadcast links, and an alternative
to carry broadcast retransmissions via unicast which might be useful in
some cases.

Besides, the Nagle algorithm can now automatically 'adjust' itself
depending on the specific network condition a stream connection runs by
the last patch.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents ff937b91 0a3e060f
......@@ -46,6 +46,7 @@
#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */
const char tipc_bclink_name[] = "broadcast-link";
unsigned long sysctl_tipc_bc_retruni __read_mostly;
/**
* struct tipc_bc_base - base structure for keeping broadcast send state
......@@ -474,7 +475,7 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
__skb_queue_head_init(&xmitq);
tipc_bcast_lock(net);
tipc_link_bc_ack_rcv(l, acked, &xmitq);
tipc_link_bc_ack_rcv(l, acked, 0, NULL, &xmitq, NULL);
tipc_bcast_unlock(net);
tipc_bcbase_xmit(net, &xmitq);
......@@ -489,9 +490,11 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
* RCU is locked, no other locks set
*/
int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
struct tipc_msg *hdr)
struct tipc_msg *hdr,
struct sk_buff_head *retrq)
{
struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
struct tipc_gap_ack_blks *ga;
struct sk_buff_head xmitq;
int rc = 0;
......@@ -501,8 +504,13 @@ int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
if (msg_type(hdr) != STATE_MSG) {
tipc_link_bc_init_rcv(l, hdr);
} else if (!msg_bc_ack_invalid(hdr)) {
tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq);
rc = tipc_link_bc_sync_rcv(l, hdr, &xmitq);
tipc_get_gap_ack_blks(&ga, l, hdr, false);
if (!sysctl_tipc_bc_retruni)
retrq = &xmitq;
rc = tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr),
msg_bc_gap(hdr), ga, &xmitq,
retrq);
rc |= tipc_link_bc_sync_rcv(l, hdr, &xmitq);
}
tipc_bcast_unlock(net);
......@@ -555,10 +563,8 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
tipc_sk_rcv(net, inputq);
}
int tipc_bclink_reset_stats(struct net *net)
int tipc_bclink_reset_stats(struct net *net, struct tipc_link *l)
{
struct tipc_link *l = tipc_bc_sndlink(net);
if (!l)
return -ENOPROTOOPT;
......@@ -686,7 +692,7 @@ int tipc_bcast_init(struct net *net)
tn->bcbase = bb;
spin_lock_init(&tipc_net(net)->bclock);
if (!tipc_link_bc_create(net, 0, 0,
if (!tipc_link_bc_create(net, 0, 0, NULL,
FB_MTU,
BCLINK_WIN_DEFAULT,
BCLINK_WIN_DEFAULT,
......
......@@ -45,6 +45,7 @@ struct tipc_nl_msg;
struct tipc_nlist;
struct tipc_nitem;
extern const char tipc_bclink_name[];
extern unsigned long sysctl_tipc_bc_retruni;
#define TIPC_METHOD_EXPIRE msecs_to_jiffies(5000)
......@@ -93,10 +94,12 @@ int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
struct tipc_msg *hdr);
int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
struct tipc_msg *hdr);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
struct tipc_msg *hdr,
struct sk_buff_head *retrq);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg,
struct tipc_link *bcl);
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
int tipc_bclink_reset_stats(struct net *net);
int tipc_bclink_reset_stats(struct net *net, struct tipc_link *l);
u32 tipc_bcast_get_broadcast_mode(struct net *net);
u32 tipc_bcast_get_broadcast_ratio(struct net *net);
......
This diff is collapsed.
......@@ -80,7 +80,7 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
struct sk_buff_head *inputq,
struct sk_buff_head *namedq,
struct tipc_link **link);
bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer, u8 *peer_id,
int mtu, u32 min_win, u32 max_win, u16 peer_caps,
struct sk_buff_head *inputq,
struct sk_buff_head *namedq,
......@@ -111,7 +111,6 @@ u16 tipc_link_rcv_nxt(struct tipc_link *l);
u16 tipc_link_acked(struct tipc_link *l);
u32 tipc_link_id(struct tipc_link *l);
char *tipc_link_name(struct tipc_link *l);
char *tipc_link_name_ext(struct tipc_link *l, char *buf);
u32 tipc_link_state(struct tipc_link *l);
char tipc_link_plane(struct tipc_link *l);
int tipc_link_prio(struct tipc_link *l);
......@@ -143,8 +142,12 @@ int tipc_link_bc_peers(struct tipc_link *l);
void tipc_link_set_mtu(struct tipc_link *l, int mtu);
int tipc_link_mtu(struct tipc_link *l);
int tipc_link_mss(struct tipc_link *l);
void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
struct sk_buff_head *xmitq);
u16 tipc_get_gap_ack_blks(struct tipc_gap_ack_blks **ga, struct tipc_link *l,
struct tipc_msg *hdr, bool uc);
int tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, u16 gap,
struct tipc_gap_ack_blks *ga,
struct sk_buff_head *xmitq,
struct sk_buff_head *retrq);
void tipc_link_build_bc_sync_msg(struct tipc_link *l,
struct sk_buff_head *xmitq);
void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr);
......
......@@ -235,9 +235,6 @@ int tipc_msg_append(struct tipc_msg *_hdr, struct msghdr *m, int dlen,
msg_set_size(hdr, MIN_H_SIZE);
__skb_queue_tail(txq, skb);
total += 1;
if (prev)
msg_set_ack_required(buf_msg(prev), 0);
msg_set_ack_required(hdr, 1);
}
hdr = buf_msg(skb);
curr = msg_blocks(hdr);
......@@ -825,19 +822,19 @@ bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
* @seqno: sequence number of buffer to add
* @skb: buffer to add
*/
void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
bool __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
struct sk_buff *skb)
{
struct sk_buff *_skb, *tmp;
if (skb_queue_empty(list) || less(seqno, buf_seqno(skb_peek(list)))) {
__skb_queue_head(list, skb);
return;
return true;
}
if (more(seqno, buf_seqno(skb_peek_tail(list)))) {
__skb_queue_tail(list, skb);
return;
return true;
}
skb_queue_walk_safe(list, _skb, tmp) {
......@@ -846,9 +843,10 @@ void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
if (seqno == buf_seqno(_skb))
break;
__skb_queue_before(list, _skb, skb);
return;
return true;
}
kfree_skb(skb);
return false;
}
void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb,
......
......@@ -160,20 +160,39 @@ struct tipc_gap_ack {
/* struct tipc_gap_ack_blks
* @len: actual length of the record
* @gack_cnt: number of Gap ACK blocks in the record
* @ugack_cnt: number of Gap ACK blocks for unicast (following the broadcast
* ones)
* @start_index: starting index for "valid" broadcast Gap ACK blocks
* @bgack_cnt: number of Gap ACK blocks for broadcast in the record
* @gacks: array of Gap ACK blocks
*
* 31 16 15 0
* +-------------+-------------+-------------+-------------+
* | bgack_cnt | ugack_cnt | len |
* +-------------+-------------+-------------+-------------+ -
* | gap | ack | |
* +-------------+-------------+-------------+-------------+ > bc gacks
* : : : |
* +-------------+-------------+-------------+-------------+ -
* | gap | ack | |
* +-------------+-------------+-------------+-------------+ > uc gacks
* : : : |
* +-------------+-------------+-------------+-------------+ -
*/
struct tipc_gap_ack_blks {
__be16 len;
u8 gack_cnt;
u8 reserved;
union {
u8 ugack_cnt;
u8 start_index;
};
u8 bgack_cnt;
struct tipc_gap_ack gacks[];
};
#define tipc_gap_ack_blks_sz(n) (sizeof(struct tipc_gap_ack_blks) + \
sizeof(struct tipc_gap_ack) * (n))
#define MAX_GAP_ACK_BLKS 32
#define MAX_GAP_ACK_BLKS 128
#define MAX_GAP_ACK_BLKS_SZ tipc_gap_ack_blks_sz(MAX_GAP_ACK_BLKS)
static inline struct tipc_msg *buf_msg(struct sk_buff *skb)
......@@ -321,9 +340,19 @@ static inline int msg_ack_required(struct tipc_msg *m)
return msg_bits(m, 0, 18, 1);
}
static inline void msg_set_ack_required(struct tipc_msg *m, u32 d)
static inline void msg_set_ack_required(struct tipc_msg *m)
{
msg_set_bits(m, 0, 18, 1, d);
msg_set_bits(m, 0, 18, 1, 1);
}
static inline int msg_nagle_ack(struct tipc_msg *m)
{
return msg_bits(m, 0, 18, 1);
}
static inline void msg_set_nagle_ack(struct tipc_msg *m)
{
msg_set_bits(m, 0, 18, 1, 1);
}
static inline bool msg_is_rcast(struct tipc_msg *m)
......@@ -1126,7 +1155,7 @@ bool tipc_msg_assemble(struct sk_buff_head *list);
bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);
bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
struct sk_buff_head *cpy);
void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
bool __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
struct sk_buff *skb);
bool tipc_msg_skb_clone(struct sk_buff_head *msg, struct sk_buff_head *cpy);
......
......@@ -188,7 +188,7 @@ static const struct genl_ops tipc_genl_v2_ops[] = {
},
{
.cmd = TIPC_NL_LINK_GET,
.validate = GENL_DONT_VALIDATE_STRICT | GENL_DONT_VALIDATE_DUMP,
.validate = GENL_DONT_VALIDATE_STRICT,
.doit = tipc_nl_node_get_link,
.dumpit = tipc_nl_node_dump_link,
},
......
......@@ -1138,7 +1138,7 @@ void tipc_node_check_dest(struct net *net, u32 addr,
if (unlikely(!n->bc_entry.link)) {
snd_l = tipc_bc_sndlink(net);
if (!tipc_link_bc_create(net, tipc_own_addr(net),
addr, U16_MAX,
addr, peer_id, U16_MAX,
tipc_link_min_win(snd_l),
tipc_link_max_win(snd_l),
n->capabilities,
......@@ -1772,7 +1772,7 @@ static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr,
struct tipc_link *ucl;
int rc;
rc = tipc_bcast_sync_rcv(n->net, n->bc_entry.link, hdr);
rc = tipc_bcast_sync_rcv(n->net, n->bc_entry.link, hdr, xmitq);
if (rc & TIPC_LINK_DOWN_EVT) {
tipc_node_reset_links(n);
......@@ -2071,10 +2071,16 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
le = &n->links[bearer_id];
/* Ensure broadcast reception is in synch with peer's send state */
if (unlikely(usr == LINK_PROTOCOL))
if (unlikely(usr == LINK_PROTOCOL)) {
if (unlikely(skb_linearize(skb))) {
tipc_node_put(n);
goto discard;
}
hdr = buf_msg(skb);
tipc_node_bc_sync_rcv(n, hdr, bearer_id, &xmitq);
else if (unlikely(tipc_link_acked(n->bc_entry.link) != bc_ack))
} else if (unlikely(tipc_link_acked(n->bc_entry.link) != bc_ack)) {
tipc_bcast_ack_rcv(net, n->bc_entry.link, hdr);
}
/* Receive packet directly if conditions permit */
tipc_node_read_lock(n);
......@@ -2429,7 +2435,7 @@ int tipc_nl_node_get_link(struct sk_buff *skb, struct genl_info *info)
return -ENOMEM;
if (strcmp(name, tipc_bclink_name) == 0) {
err = tipc_nl_add_bc_link(net, &msg);
err = tipc_nl_add_bc_link(net, &msg, tipc_net(net)->bcl);
if (err)
goto err_free;
} else {
......@@ -2473,6 +2479,7 @@ int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info)
struct tipc_node *node;
struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
struct net *net = sock_net(skb->sk);
struct tipc_net *tn = tipc_net(net);
struct tipc_link_entry *le;
if (!info->attrs[TIPC_NLA_LINK])
......@@ -2489,11 +2496,26 @@ int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info)
link_name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
if (strcmp(link_name, tipc_bclink_name) == 0) {
err = tipc_bclink_reset_stats(net);
err = -EINVAL;
if (!strcmp(link_name, tipc_bclink_name)) {
err = tipc_bclink_reset_stats(net, tipc_bc_sndlink(net));
if (err)
return err;
return 0;
} else if (strstr(link_name, tipc_bclink_name)) {
rcu_read_lock();
list_for_each_entry_rcu(node, &tn->node_list, list) {
tipc_node_read_lock(node);
link = node->bc_entry.link;
if (link && !strcmp(link_name, tipc_link_name(link))) {
err = tipc_bclink_reset_stats(net, link);
tipc_node_read_unlock(node);
break;
}
tipc_node_read_unlock(node);
}
rcu_read_unlock();
return err;
}
node = tipc_node_find_by_name(net, link_name, &bearer_id);
......@@ -2517,7 +2539,8 @@ int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info)
/* Caller should hold node lock */
static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,
struct tipc_node *node, u32 *prev_link)
struct tipc_node *node, u32 *prev_link,
bool bc_link)
{
u32 i;
int err;
......@@ -2533,6 +2556,14 @@ static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,
if (err)
return err;
}
if (bc_link) {
*prev_link = i;
err = tipc_nl_add_bc_link(net, msg, node->bc_entry.link);
if (err)
return err;
}
*prev_link = 0;
return 0;
......@@ -2541,17 +2572,36 @@ static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,
int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb)
{
struct net *net = sock_net(skb->sk);
struct nlattr **attrs = genl_dumpit_info(cb)->attrs;
struct nlattr *link[TIPC_NLA_LINK_MAX + 1];
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_node *node;
struct tipc_nl_msg msg;
u32 prev_node = cb->args[0];
u32 prev_link = cb->args[1];
int done = cb->args[2];
bool bc_link = cb->args[3];
int err;
if (done)
return 0;
if (!prev_node) {
/* Check if broadcast-receiver links dumping is needed */
if (attrs && attrs[TIPC_NLA_LINK]) {
err = nla_parse_nested_deprecated(link,
TIPC_NLA_LINK_MAX,
attrs[TIPC_NLA_LINK],
tipc_nl_link_policy,
NULL);
if (unlikely(err))
return err;
if (unlikely(!link[TIPC_NLA_LINK_BROADCAST]))
return -EINVAL;
bc_link = true;
}
}
msg.skb = skb;
msg.portid = NETLINK_CB(cb->skb).portid;
msg.seq = cb->nlh->nlmsg_seq;
......@@ -2575,7 +2625,7 @@ int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb)
list) {
tipc_node_read_lock(node);
err = __tipc_nl_add_node_links(net, &msg, node,
&prev_link);
&prev_link, bc_link);
tipc_node_read_unlock(node);
if (err)
goto out;
......@@ -2583,14 +2633,14 @@ int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb)
prev_node = node->addr;
}
} else {
err = tipc_nl_add_bc_link(net, &msg);
err = tipc_nl_add_bc_link(net, &msg, tn->bcl);
if (err)
goto out;
list_for_each_entry_rcu(node, &tn->node_list, list) {
tipc_node_read_lock(node);
err = __tipc_nl_add_node_links(net, &msg, node,
&prev_link);
&prev_link, bc_link);
tipc_node_read_unlock(node);
if (err)
goto out;
......@@ -2605,6 +2655,7 @@ int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb)
cb->args[0] = prev_node;
cb->args[1] = prev_link;
cb->args[2] = done;
cb->args[3] = bc_link;
return skb->len;
}
......
......@@ -48,6 +48,8 @@
#include "group.h"
#include "trace.h"
#define NAGLE_START_INIT 4
#define NAGLE_START_MAX 1024
#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
#define CONN_PROBING_INTV msecs_to_jiffies(3600000) /* [ms] => 1 h */
#define TIPC_FWD_MSG 1
......@@ -119,7 +121,10 @@ struct tipc_sock {
struct rcu_head rcu;
struct tipc_group *group;
u32 oneway;
u32 nagle_start;
u16 snd_backlog;
u16 msg_acc;
u16 pkt_cnt;
bool expect_ack;
bool nodelay;
bool group_is_open;
......@@ -143,7 +148,7 @@ static int tipc_sk_insert(struct tipc_sock *tsk);
static void tipc_sk_remove(struct tipc_sock *tsk);
static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
static void tipc_sk_push_backlog(struct tipc_sock *tsk);
static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack);
static const struct proto_ops packet_ops;
static const struct proto_ops stream_ops;
......@@ -474,6 +479,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
tsk = tipc_sk(sk);
tsk->max_pkt = MAX_PKT_DEFAULT;
tsk->maxnagle = 0;
tsk->nagle_start = NAGLE_START_INIT;
INIT_LIST_HEAD(&tsk->publications);
INIT_LIST_HEAD(&tsk->cong_links);
msg = &tsk->phdr;
......@@ -541,7 +547,7 @@ static void __tipc_shutdown(struct socket *sock, int error)
!tsk_conn_cong(tsk)));
/* Push out delayed messages if in Nagle mode */
tipc_sk_push_backlog(tsk);
tipc_sk_push_backlog(tsk, false);
/* Remove pending SYN */
__skb_queue_purge(&sk->sk_write_queue);
......@@ -1252,14 +1258,37 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
/* tipc_sk_push_backlog(): send accumulated buffers in socket write queue
* when socket is in Nagle mode
*/
static void tipc_sk_push_backlog(struct tipc_sock *tsk)
static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack)
{
struct sk_buff_head *txq = &tsk->sk.sk_write_queue;
struct sk_buff *skb = skb_peek_tail(txq);
struct net *net = sock_net(&tsk->sk);
u32 dnode = tsk_peer_node(tsk);
struct sk_buff *skb = skb_peek(txq);
int rc;
if (nagle_ack) {
tsk->pkt_cnt += skb_queue_len(txq);
if (!tsk->pkt_cnt || tsk->msg_acc / tsk->pkt_cnt < 2) {
tsk->oneway = 0;
if (tsk->nagle_start < NAGLE_START_MAX)
tsk->nagle_start *= 2;
tsk->expect_ack = false;
pr_debug("tsk %10u: bad nagle %u -> %u, next start %u!\n",
tsk->portid, tsk->msg_acc, tsk->pkt_cnt,
tsk->nagle_start);
} else {
tsk->nagle_start = NAGLE_START_INIT;
if (skb) {
msg_set_ack_required(buf_msg(skb));
tsk->expect_ack = true;
} else {
tsk->expect_ack = false;
}
}
tsk->msg_acc = 0;
tsk->pkt_cnt = 0;
}
if (!skb || tsk->cong_link_cnt)
return;
......@@ -1267,9 +1296,10 @@ static void tipc_sk_push_backlog(struct tipc_sock *tsk)
if (msg_is_syn(buf_msg(skb)))
return;
if (tsk->msg_acc)
tsk->pkt_cnt += skb_queue_len(txq);
tsk->snt_unacked += tsk->snd_backlog;
tsk->snd_backlog = 0;
tsk->expect_ack = true;
rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
if (rc == -ELINKCONG)
tsk->cong_link_cnt = 1;
......@@ -1322,8 +1352,7 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
return;
} else if (mtyp == CONN_ACK) {
was_cong = tsk_conn_cong(tsk);
tsk->expect_ack = false;
tipc_sk_push_backlog(tsk);
tipc_sk_push_backlog(tsk, msg_nagle_ack(hdr));
tsk->snt_unacked -= msg_conn_ack(hdr);
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
tsk->snd_win = msg_adv_win(hdr);
......@@ -1516,6 +1545,7 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = &tsk->phdr;
struct net *net = sock_net(sk);
struct sk_buff *skb;
u32 dnode = tsk_peer_node(tsk);
int maxnagle = tsk->maxnagle;
int maxpkt = tsk->max_pkt;
......@@ -1544,17 +1574,25 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
break;
send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
blocks = tsk->snd_backlog;
if (tsk->oneway++ >= 4 && send <= maxnagle) {
if (tsk->oneway++ >= tsk->nagle_start && send <= maxnagle) {
rc = tipc_msg_append(hdr, m, send, maxnagle, txq);
if (unlikely(rc < 0))
break;
blocks += rc;
tsk->msg_acc++;
if (blocks <= 64 && tsk->expect_ack) {
tsk->snd_backlog = blocks;
sent += send;
break;
}
} else if (blocks > 64) {
tsk->pkt_cnt += skb_queue_len(txq);
} else {
skb = skb_peek_tail(txq);
msg_set_ack_required(buf_msg(skb));
tsk->expect_ack = true;
tsk->msg_acc = 0;
tsk->pkt_cnt = 0;
}
} else {
rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq);
if (unlikely(rc != send))
......@@ -2091,7 +2129,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
smp_wmb();
tsk->cong_link_cnt--;
wakeup = true;
tipc_sk_push_backlog(tsk);
tipc_sk_push_backlog(tsk, false);
break;
case GROUP_PROTOCOL:
tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
......@@ -2180,7 +2218,7 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb,
return false;
case TIPC_ESTABLISHED:
if (!skb_queue_empty(&sk->sk_write_queue))
tipc_sk_push_backlog(tsk);
tipc_sk_push_backlog(tsk, false);
/* Accept only connection-based messages sent by peer */
if (likely(con_msg && !err && pport == oport &&
pnode == onode)) {
......@@ -2188,9 +2226,11 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb,
struct sk_buff *skb;
skb = tipc_sk_build_ack(tsk);
if (skb)
if (skb) {
msg_set_nagle_ack(buf_msg(skb));
__skb_queue_tail(xmitq, skb);
}
}
return true;
}
if (!tsk_peer_msg(tsk, hdr))
......
......@@ -36,7 +36,7 @@
#include "core.h"
#include "trace.h"
#include "crypto.h"
#include "bcast.h"
#include <linux/sysctl.h>
static struct ctl_table_header *tipc_ctl_hdr;
......@@ -75,6 +75,13 @@ static struct ctl_table tipc_table[] = {
.extra1 = SYSCTL_ONE,
},
#endif
{
.procname = "bc_retruni",
.data = &sysctl_tipc_bc_retruni,
.maxlen = sizeof(sysctl_tipc_bc_retruni),
.mode = 0644,
.proc_handler = proc_doulongvec_minmax,
},
{}
};
......
......@@ -255,7 +255,7 @@ DECLARE_EVENT_CLASS(tipc_link_class,
TP_fast_assign(
__assign_str(header, header);
tipc_link_name_ext(l, __entry->name);
memcpy(__entry->name, tipc_link_name(l), TIPC_MAX_LINK_NAME);
tipc_link_dump(l, dqueues, __get_str(buf));
),
......@@ -295,12 +295,14 @@ DECLARE_EVENT_CLASS(tipc_link_transmq_class,
),
TP_fast_assign(
tipc_link_name_ext(r, __entry->name);
memcpy(__entry->name, tipc_link_name(r), TIPC_MAX_LINK_NAME);
__entry->from = f;
__entry->to = t;
__entry->len = skb_queue_len(tq);
__entry->fseqno = msg_seqno(buf_msg(skb_peek(tq)));
__entry->lseqno = msg_seqno(buf_msg(skb_peek_tail(tq)));
__entry->fseqno = __entry->len ?
msg_seqno(buf_msg(skb_peek(tq))) : 0;
__entry->lseqno = __entry->len ?
msg_seqno(buf_msg(skb_peek_tail(tq))) : 0;
),
TP_printk("<%s> retrans req: [%u-%u] transmq: %u [%u-%u]\n",
......@@ -308,15 +310,16 @@ DECLARE_EVENT_CLASS(tipc_link_transmq_class,
__entry->len, __entry->fseqno, __entry->lseqno)
);
DEFINE_EVENT(tipc_link_transmq_class, tipc_link_retrans,
DEFINE_EVENT_CONDITION(tipc_link_transmq_class, tipc_link_retrans,
TP_PROTO(struct tipc_link *r, u16 f, u16 t, struct sk_buff_head *tq),
TP_ARGS(r, f, t, tq)
TP_ARGS(r, f, t, tq),
TP_CONDITION(less_eq(f, t))
);
DEFINE_EVENT_PRINT(tipc_link_transmq_class, tipc_link_bc_ack,
TP_PROTO(struct tipc_link *r, u16 f, u16 t, struct sk_buff_head *tq),
TP_ARGS(r, f, t, tq),
TP_printk("<%s> acked: [%u-%u] transmq: %u [%u-%u]\n",
TP_printk("<%s> acked: %u gap: %u transmq: %u [%u-%u]\n",
__entry->name, __entry->from, __entry->to,
__entry->len, __entry->fseqno, __entry->lseqno)
);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment