Commit 69cad59d authored by David S. Miller's avatar David S. Miller

Merge branch 'tipc-fixes'

Tuong Lien says:

====================
tipc: add some patches

This series adds patches to fix some issues in TIPC streaming & service
subscription.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents f3fbc5a3 88690b10
...@@ -1739,22 +1739,21 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct sk_buff *skb, ...@@ -1739,22 +1739,21 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct sk_buff *skb,
return 0; return 0;
} }
static void tipc_sk_send_ack(struct tipc_sock *tsk) static struct sk_buff *tipc_sk_build_ack(struct tipc_sock *tsk)
{ {
struct sock *sk = &tsk->sk; struct sock *sk = &tsk->sk;
struct net *net = sock_net(sk);
struct sk_buff *skb = NULL; struct sk_buff *skb = NULL;
struct tipc_msg *msg; struct tipc_msg *msg;
u32 peer_port = tsk_peer_port(tsk); u32 peer_port = tsk_peer_port(tsk);
u32 dnode = tsk_peer_node(tsk); u32 dnode = tsk_peer_node(tsk);
if (!tipc_sk_connected(sk)) if (!tipc_sk_connected(sk))
return; return NULL;
skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
dnode, tsk_own_node(tsk), peer_port, dnode, tsk_own_node(tsk), peer_port,
tsk->portid, TIPC_OK); tsk->portid, TIPC_OK);
if (!skb) if (!skb)
return; return NULL;
msg = buf_msg(skb); msg = buf_msg(skb);
msg_set_conn_ack(msg, tsk->rcv_unacked); msg_set_conn_ack(msg, tsk->rcv_unacked);
tsk->rcv_unacked = 0; tsk->rcv_unacked = 0;
...@@ -1764,7 +1763,19 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk) ...@@ -1764,7 +1763,19 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk)
tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf); tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
msg_set_adv_win(msg, tsk->rcv_win); msg_set_adv_win(msg, tsk->rcv_win);
} }
tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg)); return skb;
}
static void tipc_sk_send_ack(struct tipc_sock *tsk)
{
struct sk_buff *skb;
skb = tipc_sk_build_ack(tsk);
if (!skb)
return;
tipc_node_xmit_skb(sock_net(&tsk->sk), skb, tsk_peer_node(tsk),
msg_link_selector(buf_msg(skb)));
} }
static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
...@@ -1938,7 +1949,6 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m, ...@@ -1938,7 +1949,6 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
bool peek = flags & MSG_PEEK; bool peek = flags & MSG_PEEK;
int offset, required, copy, copied = 0; int offset, required, copy, copied = 0;
int hlen, dlen, err, rc; int hlen, dlen, err, rc;
bool ack = false;
long timeout; long timeout;
/* Catch invalid receive attempts */ /* Catch invalid receive attempts */
...@@ -1983,7 +1993,6 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m, ...@@ -1983,7 +1993,6 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
/* Copy data if msg ok, otherwise return error/partial data */ /* Copy data if msg ok, otherwise return error/partial data */
if (likely(!err)) { if (likely(!err)) {
ack = msg_ack_required(hdr);
offset = skb_cb->bytes_read; offset = skb_cb->bytes_read;
copy = min_t(int, dlen - offset, buflen - copied); copy = min_t(int, dlen - offset, buflen - copied);
rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy); rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
...@@ -2011,7 +2020,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m, ...@@ -2011,7 +2020,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
/* Send connection flow control advertisement when applicable */ /* Send connection flow control advertisement when applicable */
tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen); tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
if (ack || tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE) if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
tipc_sk_send_ack(tsk); tipc_sk_send_ack(tsk);
/* Exit if all requested data or FIN/error received */ /* Exit if all requested data or FIN/error received */
...@@ -2105,9 +2114,11 @@ static void tipc_sk_proto_rcv(struct sock *sk, ...@@ -2105,9 +2114,11 @@ static void tipc_sk_proto_rcv(struct sock *sk,
* tipc_sk_filter_connect - check incoming message for a connection-based socket * tipc_sk_filter_connect - check incoming message for a connection-based socket
* @tsk: TIPC socket * @tsk: TIPC socket
* @skb: pointer to message buffer. * @skb: pointer to message buffer.
* @xmitq: for Nagle ACK if any
* Returns true if message should be added to receive queue, false otherwise * Returns true if message should be added to receive queue, false otherwise
*/ */
static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb,
struct sk_buff_head *xmitq)
{ {
struct sock *sk = &tsk->sk; struct sock *sk = &tsk->sk;
struct net *net = sock_net(sk); struct net *net = sock_net(sk);
...@@ -2171,8 +2182,17 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) ...@@ -2171,8 +2182,17 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
if (!skb_queue_empty(&sk->sk_write_queue)) if (!skb_queue_empty(&sk->sk_write_queue))
tipc_sk_push_backlog(tsk); tipc_sk_push_backlog(tsk);
/* Accept only connection-based messages sent by peer */ /* Accept only connection-based messages sent by peer */
if (likely(con_msg && !err && pport == oport && pnode == onode)) if (likely(con_msg && !err && pport == oport &&
pnode == onode)) {
if (msg_ack_required(hdr)) {
struct sk_buff *skb;
skb = tipc_sk_build_ack(tsk);
if (skb)
__skb_queue_tail(xmitq, skb);
}
return true; return true;
}
if (!tsk_peer_msg(tsk, hdr)) if (!tsk_peer_msg(tsk, hdr))
return false; return false;
if (!err) if (!err)
...@@ -2267,7 +2287,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, ...@@ -2267,7 +2287,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
while ((skb = __skb_dequeue(&inputq))) { while ((skb = __skb_dequeue(&inputq))) {
hdr = buf_msg(skb); hdr = buf_msg(skb);
limit = rcvbuf_limit(sk, skb); limit = rcvbuf_limit(sk, skb);
if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) || if ((sk_conn && !tipc_sk_filter_connect(tsk, skb, xmitq)) ||
(!sk_conn && msg_connected(hdr)) || (!sk_conn && msg_connected(hdr)) ||
(!grp && msg_in_group(hdr))) (!grp && msg_in_group(hdr)))
err = TIPC_ERR_NO_PORT; err = TIPC_ERR_NO_PORT;
......
...@@ -96,6 +96,16 @@ void tipc_sub_get(struct tipc_subscription *subscription); ...@@ -96,6 +96,16 @@ void tipc_sub_get(struct tipc_subscription *subscription);
(swap_ ? swab32(val__) : val__); \ (swap_ ? swab32(val__) : val__); \
}) })
/* tipc_sub_write - write val_ to field_ of struct sub_ in user endian format
*/
#define tipc_sub_write(sub_, field_, val_) \
({ \
struct tipc_subscr *sub__ = sub_; \
u32 val__ = val_; \
int swap_ = !((sub__)->filter & TIPC_FILTER_MASK); \
(sub__)->field_ = swap_ ? swab32(val__) : val__; \
})
/* tipc_evt_write - write val_ to field_ of struct evt_ in user endian format /* tipc_evt_write - write val_ to field_ of struct evt_ in user endian format
*/ */
#define tipc_evt_write(evt_, field_, val_) \ #define tipc_evt_write(evt_, field_, val_) \
......
...@@ -237,8 +237,8 @@ static void tipc_conn_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) ...@@ -237,8 +237,8 @@ static void tipc_conn_delete_sub(struct tipc_conn *con, struct tipc_subscr *s)
if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) { if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) {
tipc_sub_unsubscribe(sub); tipc_sub_unsubscribe(sub);
atomic_dec(&tn->subscription_count); atomic_dec(&tn->subscription_count);
} else if (s) { if (s)
break; break;
} }
} }
spin_unlock_bh(&con->sub_lock); spin_unlock_bh(&con->sub_lock);
...@@ -362,9 +362,10 @@ static int tipc_conn_rcv_sub(struct tipc_topsrv *srv, ...@@ -362,9 +362,10 @@ static int tipc_conn_rcv_sub(struct tipc_topsrv *srv,
{ {
struct tipc_net *tn = tipc_net(srv->net); struct tipc_net *tn = tipc_net(srv->net);
struct tipc_subscription *sub; struct tipc_subscription *sub;
u32 s_filter = tipc_sub_read(s, filter);
if (tipc_sub_read(s, filter) & TIPC_SUB_CANCEL) { if (s_filter & TIPC_SUB_CANCEL) {
s->filter &= __constant_ntohl(~TIPC_SUB_CANCEL); tipc_sub_write(s, filter, s_filter & ~TIPC_SUB_CANCEL);
tipc_conn_delete_sub(con, s); tipc_conn_delete_sub(con, s);
return 0; return 0;
} }
...@@ -400,7 +401,9 @@ static int tipc_conn_rcv_from_sock(struct tipc_conn *con) ...@@ -400,7 +401,9 @@ static int tipc_conn_rcv_from_sock(struct tipc_conn *con)
return -EWOULDBLOCK; return -EWOULDBLOCK;
if (ret == sizeof(s)) { if (ret == sizeof(s)) {
read_lock_bh(&sk->sk_callback_lock); read_lock_bh(&sk->sk_callback_lock);
ret = tipc_conn_rcv_sub(srv, con, &s); /* RACE: the connection can be closed in the meantime */
if (likely(connected(con)))
ret = tipc_conn_rcv_sub(srv, con, &s);
read_unlock_bh(&sk->sk_callback_lock); read_unlock_bh(&sk->sk_callback_lock);
if (!ret) if (!ret)
return 0; return 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment