Commit 64ac5f59 authored by Jon Maloy's avatar Jon Maloy Committed by David S. Miller

tipc: refactor function filter_rcv()

In the following commits we will need to handle multiple incoming and
rejected/returned buffers in the function socket.c::filter_rcv().
As a preparation for this, we generalize the function by handling
buffer queues instead of individual buffers. We also introduce a
help function tipc_skb_reject(), and rename filter_rcv() to
tipc_sk_filter_rcv() in line with other functions in socket.c.
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Acked-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 38077b8e
...@@ -666,3 +666,10 @@ void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, ...@@ -666,3 +666,10 @@ void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
} }
kfree_skb(skb); kfree_skb(skb);
} }
void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb,
struct sk_buff_head *xmitq)
{
if (tipc_msg_reverse(tipc_own_addr(net), &skb, err))
__skb_queue_tail(xmitq, skb);
}
...@@ -819,6 +819,8 @@ static inline bool msg_is_reset(struct tipc_msg *hdr) ...@@ -819,6 +819,8 @@ static inline bool msg_is_reset(struct tipc_msg *hdr)
struct sk_buff *tipc_buf_acquire(u32 size, gfp_t gfp); struct sk_buff *tipc_buf_acquire(u32 size, gfp_t gfp);
bool tipc_msg_validate(struct sk_buff *skb); bool tipc_msg_validate(struct sk_buff *skb);
bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, int err); bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, int err);
void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb,
struct sk_buff_head *xmitq);
void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type, void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type,
u32 hsize, u32 destnode); u32 hsize, u32 destnode);
struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz, struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
......
...@@ -111,7 +111,7 @@ struct tipc_sock { ...@@ -111,7 +111,7 @@ struct tipc_sock {
struct rcu_head rcu; struct rcu_head rcu;
}; };
static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb); static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb);
static void tipc_data_ready(struct sock *sk); static void tipc_data_ready(struct sock *sk);
static void tipc_write_space(struct sock *sk); static void tipc_write_space(struct sock *sk);
static void tipc_sock_destruct(struct sock *sk); static void tipc_sock_destruct(struct sock *sk);
...@@ -453,7 +453,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, ...@@ -453,7 +453,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
msg_set_origport(msg, tsk->portid); msg_set_origport(msg, tsk->portid);
setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk); setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
sk->sk_shutdown = 0; sk->sk_shutdown = 0;
sk->sk_backlog_rcv = tipc_backlog_rcv; sk->sk_backlog_rcv = tipc_sk_backlog_rcv;
sk->sk_rcvbuf = sysctl_tipc_rmem[1]; sk->sk_rcvbuf = sysctl_tipc_rmem[1];
sk->sk_data_ready = tipc_data_ready; sk->sk_data_ready = tipc_data_ready;
sk->sk_write_space = tipc_write_space; sk->sk_write_space = tipc_write_space;
...@@ -850,16 +850,16 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, ...@@ -850,16 +850,16 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
} }
/** /**
* tipc_sk_proto_rcv - receive a connection mng protocol message * tipc_sk_conn_proto_rcv - receive a connection mng protocol message
* @tsk: receiving socket * @tsk: receiving socket
* @skb: pointer to message buffer. * @skb: pointer to message buffer.
*/ */
static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb, static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
struct sk_buff_head *xmitq) struct sk_buff_head *xmitq)
{ {
struct sock *sk = &tsk->sk;
u32 onode = tsk_own_node(tsk);
struct tipc_msg *hdr = buf_msg(skb); struct tipc_msg *hdr = buf_msg(skb);
u32 onode = tsk_own_node(tsk);
struct sock *sk = &tsk->sk;
int mtyp = msg_type(hdr); int mtyp = msg_type(hdr);
bool conn_cong; bool conn_cong;
...@@ -1536,14 +1536,41 @@ static void tipc_sock_destruct(struct sock *sk) ...@@ -1536,14 +1536,41 @@ static void tipc_sock_destruct(struct sock *sk)
__skb_queue_purge(&sk->sk_receive_queue); __skb_queue_purge(&sk->sk_receive_queue);
} }
static void tipc_sk_proto_rcv(struct sock *sk,
struct sk_buff_head *inputq,
struct sk_buff_head *xmitq)
{
struct sk_buff *skb = __skb_dequeue(inputq);
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = buf_msg(skb);
switch (msg_user(hdr)) {
case CONN_MANAGER:
tipc_sk_conn_proto_rcv(tsk, skb, xmitq);
return;
case SOCK_WAKEUP:
u32_del(&tsk->cong_links, msg_orignode(hdr));
tsk->cong_link_cnt--;
sk->sk_write_space(sk);
break;
case TOP_SRV:
tipc_sk_top_evt(tsk, (void *)msg_data(hdr));
break;
default:
break;
}
kfree_skb(skb);
}
/** /**
* filter_connect - Handle all incoming messages for a connection-based socket * tipc_filter_connect - Handle incoming message for a connection-based socket
* @tsk: TIPC socket * @tsk: TIPC socket
* @skb: pointer to message buffer. Set to NULL if buffer is consumed * @skb: pointer to message buffer. Set to NULL if buffer is consumed
* *
* Returns true if everything ok, false otherwise * Returns true if everything ok, false otherwise
*/ */
static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
{ {
struct sock *sk = &tsk->sk; struct sock *sk = &tsk->sk;
struct net *net = sock_net(sk); struct net *net = sock_net(sk);
...@@ -1657,7 +1684,7 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb) ...@@ -1657,7 +1684,7 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
} }
/** /**
* filter_rcv - validate incoming message * tipc_sk_filter_rcv - validate incoming message
* @sk: socket * @sk: socket
* @skb: pointer to message. * @skb: pointer to message.
* *
...@@ -1666,75 +1693,49 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb) ...@@ -1666,75 +1693,49 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
* *
* Called with socket lock already taken * Called with socket lock already taken
* *
* Returns true if message was added to socket receive queue, otherwise false
*/ */
static bool filter_rcv(struct sock *sk, struct sk_buff *skb, static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
struct sk_buff_head *xmitq) struct sk_buff_head *xmitq)
{ {
bool sk_conn = !tipc_sk_type_connectionless(sk);
struct tipc_sock *tsk = tipc_sk(sk); struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = buf_msg(skb); struct tipc_msg *hdr = buf_msg(skb);
unsigned int limit = rcvbuf_limit(sk, skb); struct net *net = sock_net(sk);
int err = TIPC_OK; struct sk_buff_head inputq;
int limit, err = TIPC_OK;
if (unlikely(!msg_isdata(hdr))) { TIPC_SKB_CB(skb)->bytes_read = 0;
switch (msg_user(hdr)) { __skb_queue_head_init(&inputq);
case CONN_MANAGER: __skb_queue_tail(&inputq, skb);
tipc_sk_proto_rcv(tsk, skb, xmitq);
return false;
case SOCK_WAKEUP:
u32_del(&tsk->cong_links, msg_orignode(hdr));
tsk->cong_link_cnt--;
sk->sk_write_space(sk);
break;
case TOP_SRV:
tipc_sk_top_evt(tsk, (void *)msg_data(hdr));
break;
default:
break;
}
kfree_skb(skb);
return false;
}
/* Drop if illegal message type */ if (unlikely(!msg_isdata(hdr)))
if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) { tipc_sk_proto_rcv(sk, &inputq, xmitq);
kfree_skb(skb); else if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG))
return false; return kfree_skb(skb);
}
/* Reject if wrong message type for current socket state */ /* Validate and add to receive buffer if there is space */
if (tipc_sk_type_connectionless(sk)) { while ((skb = __skb_dequeue(&inputq))) {
if (msg_connected(hdr)) { hdr = buf_msg(skb);
err = TIPC_ERR_NO_PORT; limit = rcvbuf_limit(sk, skb);
goto reject; if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) ||
} (!sk_conn && msg_connected(hdr)))
} else if (unlikely(!filter_connect(tsk, skb))) {
err = TIPC_ERR_NO_PORT; err = TIPC_ERR_NO_PORT;
goto reject; else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit)
}
/* Reject message if there isn't room to queue it */
if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) {
err = TIPC_ERR_OVERLOAD; err = TIPC_ERR_OVERLOAD;
goto reject;
}
/* Enqueue message */ if (unlikely(err)) {
TIPC_SKB_CB(skb)->bytes_read = 0; tipc_skb_reject(net, err, skb, xmitq);
err = TIPC_OK;
continue;
}
__skb_queue_tail(&sk->sk_receive_queue, skb); __skb_queue_tail(&sk->sk_receive_queue, skb);
skb_set_owner_r(skb, sk); skb_set_owner_r(skb, sk);
sk->sk_data_ready(sk); sk->sk_data_ready(sk);
return true; }
reject:
if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
__skb_queue_tail(xmitq, skb);
return false;
} }
/** /**
* tipc_backlog_rcv - handle incoming message from backlog queue * tipc_sk_backlog_rcv - handle incoming message from backlog queue
* @sk: socket * @sk: socket
* @skb: message * @skb: message
* *
...@@ -1742,27 +1743,25 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb, ...@@ -1742,27 +1743,25 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
* *
* Returns 0 * Returns 0
*/ */
static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
{ {
unsigned int truesize = skb->truesize; unsigned int before = sk_rmem_alloc_get(sk);
struct sk_buff_head xmitq; struct sk_buff_head xmitq;
u32 dnode, selector; u32 dnode, selector;
unsigned int added;
__skb_queue_head_init(&xmitq); __skb_queue_head_init(&xmitq);
if (likely(filter_rcv(sk, skb, &xmitq))) { tipc_sk_filter_rcv(sk, skb, &xmitq);
atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt); added = sk_rmem_alloc_get(sk) - before;
return 0; atomic_add(added, &tipc_sk(sk)->dupl_rcvcnt);
}
if (skb_queue_empty(&xmitq)) /* Send pending response/rejected messages, if any */
return 0; while ((skb = __skb_dequeue(&xmitq))) {
/* Send response/rejected message */
skb = __skb_dequeue(&xmitq);
dnode = msg_destnode(buf_msg(skb));
selector = msg_origport(buf_msg(skb)); selector = msg_origport(buf_msg(skb));
dnode = msg_destnode(buf_msg(skb));
tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector); tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
}
return 0; return 0;
} }
...@@ -1794,7 +1793,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, ...@@ -1794,7 +1793,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
/* Add message directly to receive queue if possible */ /* Add message directly to receive queue if possible */
if (!sock_owned_by_user(sk)) { if (!sock_owned_by_user(sk)) {
filter_rcv(sk, skb, xmitq); tipc_sk_filter_rcv(sk, skb, xmitq);
continue; continue;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment