Commit e34b1638 authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc-next'

Jon Maloy says:

====================
tipc: redesign socket-level flow control

The socket-level flow control in TIPC has long been due for a major
overhaul. This series fixes this.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 2b84af94 10724cc7
...@@ -112,11 +112,9 @@ static int __init tipc_init(void) ...@@ -112,11 +112,9 @@ static int __init tipc_init(void)
pr_info("Activated (version " TIPC_MOD_VER ")\n"); pr_info("Activated (version " TIPC_MOD_VER ")\n");
sysctl_tipc_rmem[0] = TIPC_CONN_OVERLOAD_LIMIT >> 4 << sysctl_tipc_rmem[0] = RCVBUF_MIN;
TIPC_LOW_IMPORTANCE; sysctl_tipc_rmem[1] = RCVBUF_DEF;
sysctl_tipc_rmem[1] = TIPC_CONN_OVERLOAD_LIMIT >> 4 << sysctl_tipc_rmem[2] = RCVBUF_MAX;
TIPC_CRITICAL_IMPORTANCE;
sysctl_tipc_rmem[2] = TIPC_CONN_OVERLOAD_LIMIT;
err = tipc_netlink_start(); err = tipc_netlink_start();
if (err) if (err)
......
...@@ -743,16 +743,26 @@ static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n) ...@@ -743,16 +743,26 @@ static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n)
msg_set_bits(m, 9, 16, 0xffff, n); msg_set_bits(m, 9, 16, 0xffff, n);
} }
static inline u32 msg_bcast_tag(struct tipc_msg *m) static inline u32 msg_conn_ack(struct tipc_msg *m)
{ {
return msg_bits(m, 9, 16, 0xffff); return msg_bits(m, 9, 16, 0xffff);
} }
static inline void msg_set_bcast_tag(struct tipc_msg *m, u32 n) static inline void msg_set_conn_ack(struct tipc_msg *m, u32 n)
{ {
msg_set_bits(m, 9, 16, 0xffff, n); msg_set_bits(m, 9, 16, 0xffff, n);
} }
static inline u32 msg_adv_win(struct tipc_msg *m)
{
return msg_bits(m, 9, 0, 0xffff);
}
static inline void msg_set_adv_win(struct tipc_msg *m, u32 n)
{
msg_set_bits(m, 9, 0, 0xffff, n);
}
static inline u32 msg_max_pkt(struct tipc_msg *m) static inline u32 msg_max_pkt(struct tipc_msg *m)
{ {
return msg_bits(m, 9, 16, 0xffff) * 4; return msg_bits(m, 9, 16, 0xffff) * 4;
......
/* /*
* net/tipc/node.c: TIPC node management routines * net/tipc/node.c: TIPC node management routines
* *
* Copyright (c) 2000-2006, 2012-2015, Ericsson AB * Copyright (c) 2000-2006, 2012-2016, Ericsson AB
* Copyright (c) 2005-2006, 2010-2014, Wind River Systems * Copyright (c) 2005-2006, 2010-2014, Wind River Systems
* All rights reserved. * All rights reserved.
* *
...@@ -191,6 +191,20 @@ int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel) ...@@ -191,6 +191,20 @@ int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel)
tipc_node_put(n); tipc_node_put(n);
return mtu; return mtu;
} }
u16 tipc_node_get_capabilities(struct net *net, u32 addr)
{
struct tipc_node *n;
u16 caps;
n = tipc_node_find(net, addr);
if (unlikely(!n))
return TIPC_NODE_CAPABILITIES;
caps = n->capabilities;
tipc_node_put(n);
return caps;
}
/* /*
* A trivial power-of-two bitmask technique is used for speed, since this * A trivial power-of-two bitmask technique is used for speed, since this
* operation is done for every incoming TIPC packet. The number of hash table * operation is done for every incoming TIPC packet. The number of hash table
...@@ -304,8 +318,11 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities) ...@@ -304,8 +318,11 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
spin_lock_bh(&tn->node_list_lock); spin_lock_bh(&tn->node_list_lock);
n = tipc_node_find(net, addr); n = tipc_node_find(net, addr);
if (n) if (n) {
/* Same node may come back with new capabilities */
n->capabilities = capabilities;
goto exit; goto exit;
}
n = kzalloc(sizeof(*n), GFP_ATOMIC); n = kzalloc(sizeof(*n), GFP_ATOMIC);
if (!n) { if (!n) {
pr_warn("Node creation failed, no memory\n"); pr_warn("Node creation failed, no memory\n");
......
...@@ -45,10 +45,11 @@ ...@@ -45,10 +45,11 @@
/* Optional capabilities supported by this code version /* Optional capabilities supported by this code version
*/ */
enum { enum {
TIPC_BCAST_SYNCH = (1 << 1) TIPC_BCAST_SYNCH = (1 << 1),
TIPC_BLOCK_FLOWCTL = (2 << 1)
}; };
#define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH #define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | TIPC_BLOCK_FLOWCTL)
#define INVALID_BEARER_ID -1 #define INVALID_BEARER_ID -1
void tipc_node_stop(struct net *net); void tipc_node_stop(struct net *net);
...@@ -70,6 +71,7 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb); ...@@ -70,6 +71,7 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb);
int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port); int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port); void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel); int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel);
u16 tipc_node_get_capabilities(struct net *net, u32 addr);
int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb); int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb);
int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb); int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb);
int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info); int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info);
......
...@@ -96,8 +96,11 @@ struct tipc_sock { ...@@ -96,8 +96,11 @@ struct tipc_sock {
uint conn_timeout; uint conn_timeout;
atomic_t dupl_rcvcnt; atomic_t dupl_rcvcnt;
bool link_cong; bool link_cong;
uint sent_unacked; u16 snt_unacked;
uint rcv_unacked; u16 snd_win;
u16 peer_caps;
u16 rcv_unacked;
u16 rcv_win;
struct sockaddr_tipc remote; struct sockaddr_tipc remote;
struct rhash_head node; struct rhash_head node;
struct rcu_head rcu; struct rcu_head rcu;
...@@ -227,9 +230,29 @@ static struct tipc_sock *tipc_sk(const struct sock *sk) ...@@ -227,9 +230,29 @@ static struct tipc_sock *tipc_sk(const struct sock *sk)
return container_of(sk, struct tipc_sock, sk); return container_of(sk, struct tipc_sock, sk);
} }
static int tsk_conn_cong(struct tipc_sock *tsk) static bool tsk_conn_cong(struct tipc_sock *tsk)
{ {
return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN; return tsk->snt_unacked >= tsk->snd_win;
}
/* tsk_blocks(): translate a buffer size in bytes to number of
* advertisable blocks, taking into account the ratio truesize(len)/len
* We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
*/
static u16 tsk_adv_blocks(int len)
{
return len / FLOWCTL_BLK_SZ / 4;
}
/* tsk_inc(): increment counter for sent or received data
* - If block based flow control is not supported by peer we
* fall back to message based ditto, incrementing the counter
*/
static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
{
if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
return ((msglen / FLOWCTL_BLK_SZ) + 1);
return 1;
} }
/** /**
...@@ -377,9 +400,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock, ...@@ -377,9 +400,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
sk->sk_write_space = tipc_write_space; sk->sk_write_space = tipc_write_space;
sk->sk_destruct = tipc_sock_destruct; sk->sk_destruct = tipc_sock_destruct;
tsk->conn_timeout = CONN_TIMEOUT_DEFAULT; tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
tsk->sent_unacked = 0;
atomic_set(&tsk->dupl_rcvcnt, 0); atomic_set(&tsk->dupl_rcvcnt, 0);
/* Start out with safe limits until we receive an advertised window */
tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
tsk->rcv_win = tsk->snd_win;
if (sock->state == SS_READY) { if (sock->state == SS_READY) {
tsk_set_unreturnable(tsk, true); tsk_set_unreturnable(tsk, true);
if (sock->type == SOCK_DGRAM) if (sock->type == SOCK_DGRAM)
...@@ -775,7 +801,7 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb) ...@@ -775,7 +801,7 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
struct sock *sk = &tsk->sk; struct sock *sk = &tsk->sk;
struct tipc_msg *hdr = buf_msg(skb); struct tipc_msg *hdr = buf_msg(skb);
int mtyp = msg_type(hdr); int mtyp = msg_type(hdr);
int conn_cong; bool conn_cong;
/* Ignore if connection cannot be validated: */ /* Ignore if connection cannot be validated: */
if (!tsk_peer_msg(tsk, hdr)) if (!tsk_peer_msg(tsk, hdr))
...@@ -789,7 +815,9 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb) ...@@ -789,7 +815,9 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
return; return;
} else if (mtyp == CONN_ACK) { } else if (mtyp == CONN_ACK) {
conn_cong = tsk_conn_cong(tsk); conn_cong = tsk_conn_cong(tsk);
tsk->sent_unacked -= msg_msgcnt(hdr); tsk->snt_unacked -= msg_conn_ack(hdr);
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
tsk->snd_win = msg_adv_win(hdr);
if (conn_cong) if (conn_cong)
sk->sk_write_space(sk); sk->sk_write_space(sk);
} else if (mtyp != CONN_PROBE_REPLY) { } else if (mtyp != CONN_PROBE_REPLY) {
...@@ -1020,12 +1048,14 @@ static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) ...@@ -1020,12 +1048,14 @@ static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
u32 dnode; u32 dnode;
uint mtu, send, sent = 0; uint mtu, send, sent = 0;
struct iov_iter save; struct iov_iter save;
int hlen = MIN_H_SIZE;
/* Handle implied connection establishment */ /* Handle implied connection establishment */
if (unlikely(dest)) { if (unlikely(dest)) {
rc = __tipc_sendmsg(sock, m, dsz); rc = __tipc_sendmsg(sock, m, dsz);
hlen = msg_hdr_sz(mhdr);
if (dsz && (dsz == rc)) if (dsz && (dsz == rc))
tsk->sent_unacked = 1; tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
return rc; return rc;
} }
if (dsz > (uint)INT_MAX) if (dsz > (uint)INT_MAX)
...@@ -1054,7 +1084,7 @@ static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) ...@@ -1054,7 +1084,7 @@ static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
if (likely(!tsk_conn_cong(tsk))) { if (likely(!tsk_conn_cong(tsk))) {
rc = tipc_node_xmit(net, &pktchain, dnode, portid); rc = tipc_node_xmit(net, &pktchain, dnode, portid);
if (likely(!rc)) { if (likely(!rc)) {
tsk->sent_unacked++; tsk->snt_unacked += tsk_inc(tsk, send + hlen);
sent += send; sent += send;
if (sent == dsz) if (sent == dsz)
return dsz; return dsz;
...@@ -1118,6 +1148,13 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port, ...@@ -1118,6 +1148,13 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv); sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
tipc_node_add_conn(net, peer_node, tsk->portid, peer_port); tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid); tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
return;
/* Fall back to message based flow control */
tsk->rcv_win = FLOWCTL_MSG_WIN;
tsk->snd_win = FLOWCTL_MSG_WIN;
} }
/** /**
...@@ -1214,7 +1251,7 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg, ...@@ -1214,7 +1251,7 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
return 0; return 0;
} }
static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) static void tipc_sk_send_ack(struct tipc_sock *tsk)
{ {
struct net *net = sock_net(&tsk->sk); struct net *net = sock_net(&tsk->sk);
struct sk_buff *skb = NULL; struct sk_buff *skb = NULL;
...@@ -1230,7 +1267,14 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) ...@@ -1230,7 +1267,14 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
if (!skb) if (!skb)
return; return;
msg = buf_msg(skb); msg = buf_msg(skb);
msg_set_msgcnt(msg, ack); msg_set_conn_ack(msg, tsk->rcv_unacked);
tsk->rcv_unacked = 0;
/* Adjust to and advertize the correct window limit */
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
msg_set_adv_win(msg, tsk->rcv_win);
}
tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg)); tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
} }
...@@ -1288,7 +1332,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len, ...@@ -1288,7 +1332,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
long timeo; long timeo;
unsigned int sz; unsigned int sz;
u32 err; u32 err;
int res; int res, hlen;
/* Catch invalid receive requests */ /* Catch invalid receive requests */
if (unlikely(!buf_len)) if (unlikely(!buf_len))
...@@ -1313,6 +1357,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len, ...@@ -1313,6 +1357,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
buf = skb_peek(&sk->sk_receive_queue); buf = skb_peek(&sk->sk_receive_queue);
msg = buf_msg(buf); msg = buf_msg(buf);
sz = msg_data_sz(msg); sz = msg_data_sz(msg);
hlen = msg_hdr_sz(msg);
err = msg_errcode(msg); err = msg_errcode(msg);
/* Discard an empty non-errored message & try again */ /* Discard an empty non-errored message & try again */
...@@ -1335,7 +1380,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len, ...@@ -1335,7 +1380,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
sz = buf_len; sz = buf_len;
m->msg_flags |= MSG_TRUNC; m->msg_flags |= MSG_TRUNC;
} }
res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz); res = skb_copy_datagram_msg(buf, hlen, m, sz);
if (res) if (res)
goto exit; goto exit;
res = sz; res = sz;
...@@ -1347,15 +1392,15 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len, ...@@ -1347,15 +1392,15 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
res = -ECONNRESET; res = -ECONNRESET;
} }
/* Consume received message (optional) */ if (unlikely(flags & MSG_PEEK))
if (likely(!(flags & MSG_PEEK))) { goto exit;
if ((sock->state != SS_READY) &&
(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) { if (likely(sock->state != SS_READY)) {
tipc_sk_send_ack(tsk, tsk->rcv_unacked); tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
tsk->rcv_unacked = 0; if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
} tipc_sk_send_ack(tsk);
tsk_advance_rx_queue(sk);
} }
tsk_advance_rx_queue(sk);
exit: exit:
release_sock(sk); release_sock(sk);
return res; return res;
...@@ -1384,7 +1429,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m, ...@@ -1384,7 +1429,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
int sz_to_copy, target, needed; int sz_to_copy, target, needed;
int sz_copied = 0; int sz_copied = 0;
u32 err; u32 err;
int res = 0; int res = 0, hlen;
/* Catch invalid receive attempts */ /* Catch invalid receive attempts */
if (unlikely(!buf_len)) if (unlikely(!buf_len))
...@@ -1410,6 +1455,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m, ...@@ -1410,6 +1455,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
buf = skb_peek(&sk->sk_receive_queue); buf = skb_peek(&sk->sk_receive_queue);
msg = buf_msg(buf); msg = buf_msg(buf);
sz = msg_data_sz(msg); sz = msg_data_sz(msg);
hlen = msg_hdr_sz(msg);
err = msg_errcode(msg); err = msg_errcode(msg);
/* Discard an empty non-errored message & try again */ /* Discard an empty non-errored message & try again */
...@@ -1434,8 +1480,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m, ...@@ -1434,8 +1480,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
needed = (buf_len - sz_copied); needed = (buf_len - sz_copied);
sz_to_copy = (sz <= needed) ? sz : needed; sz_to_copy = (sz <= needed) ? sz : needed;
res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset, res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy);
m, sz_to_copy);
if (res) if (res)
goto exit; goto exit;
...@@ -1457,20 +1502,18 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m, ...@@ -1457,20 +1502,18 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
res = -ECONNRESET; res = -ECONNRESET;
} }
/* Consume received message (optional) */ if (unlikely(flags & MSG_PEEK))
if (likely(!(flags & MSG_PEEK))) { goto exit;
if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
tipc_sk_send_ack(tsk, tsk->rcv_unacked); tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
tsk->rcv_unacked = 0; if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
} tipc_sk_send_ack(tsk);
tsk_advance_rx_queue(sk); tsk_advance_rx_queue(sk);
}
/* Loop around if more data is required */ /* Loop around if more data is required */
if ((sz_copied < buf_len) && /* didn't get all requested data */ if ((sz_copied < buf_len) && /* didn't get all requested data */
(!skb_queue_empty(&sk->sk_receive_queue) || (!skb_queue_empty(&sk->sk_receive_queue) ||
(sz_copied < target)) && /* and more is ready or required */ (sz_copied < target)) && /* and more is ready or required */
(!(flags & MSG_PEEK)) && /* and aren't just peeking at data */
(!err)) /* and haven't reached a FIN */ (!err)) /* and haven't reached a FIN */
goto restart; goto restart;
...@@ -1602,30 +1645,33 @@ static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) ...@@ -1602,30 +1645,33 @@ static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
/** /**
* rcvbuf_limit - get proper overload limit of socket receive queue * rcvbuf_limit - get proper overload limit of socket receive queue
* @sk: socket * @sk: socket
* @buf: message * @skb: message
* *
* For all connection oriented messages, irrespective of importance, * For connection oriented messages, irrespective of importance,
* the default overload value (i.e. 67MB) is set as limit. * default queue limit is 2 MB.
* *
* For all connectionless messages, by default new queue limits are * For connectionless messages, queue limits are based on message
* as belows: * importance as follows:
* *
* TIPC_LOW_IMPORTANCE (4 MB) * TIPC_LOW_IMPORTANCE (2 MB)
* TIPC_MEDIUM_IMPORTANCE (8 MB) * TIPC_MEDIUM_IMPORTANCE (4 MB)
* TIPC_HIGH_IMPORTANCE (16 MB) * TIPC_HIGH_IMPORTANCE (8 MB)
* TIPC_CRITICAL_IMPORTANCE (32 MB) * TIPC_CRITICAL_IMPORTANCE (16 MB)
* *
* Returns overload limit according to corresponding message importance * Returns overload limit according to corresponding message importance
*/ */
static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf) static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
{ {
struct tipc_msg *msg = buf_msg(buf); struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = buf_msg(skb);
if (unlikely(!msg_connected(hdr)))
return sk->sk_rcvbuf << msg_importance(hdr);
if (msg_connected(msg)) if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
return sysctl_tipc_rmem[2]; return sk->sk_rcvbuf;
return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE << return FLOWCTL_MSG_LIM;
msg_importance(msg);
} }
/** /**
...@@ -1748,7 +1794,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, ...@@ -1748,7 +1794,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
/* Try backlog, compensating for double-counted bytes */ /* Try backlog, compensating for double-counted bytes */
dcnt = &tipc_sk(sk)->dupl_rcvcnt; dcnt = &tipc_sk(sk)->dupl_rcvcnt;
if (sk->sk_backlog.len) if (!sk->sk_backlog.len)
atomic_set(dcnt, 0); atomic_set(dcnt, 0);
lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
if (likely(!sk_add_backlog(sk, skb, lim))) if (likely(!sk_add_backlog(sk, skb, lim)))
......
/* net/tipc/socket.h: Include file for TIPC socket code /* net/tipc/socket.h: Include file for TIPC socket code
* *
* Copyright (c) 2014-2015, Ericsson AB * Copyright (c) 2014-2016, Ericsson AB
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
...@@ -38,10 +38,17 @@ ...@@ -38,10 +38,17 @@
#include <net/sock.h> #include <net/sock.h>
#include <net/genetlink.h> #include <net/genetlink.h>
#define TIPC_CONNACK_INTV 256 /* Compatibility values for deprecated message based flow control */
#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2) #define FLOWCTL_MSG_WIN 512
#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \ #define FLOWCTL_MSG_LIM ((FLOWCTL_MSG_WIN * 2 + 1) * SKB_TRUESIZE(MAX_MSG_SIZE))
SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
#define FLOWCTL_BLK_SZ 1024
/* Socket receive buffer sizes */
#define RCVBUF_MIN (FLOWCTL_BLK_SZ * 512)
#define RCVBUF_DEF (FLOWCTL_BLK_SZ * 1024 * 2)
#define RCVBUF_MAX (FLOWCTL_BLK_SZ * 1024 * 16)
int tipc_socket_init(void); int tipc_socket_init(void);
void tipc_socket_stop(void); void tipc_socket_stop(void);
void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq); void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment