Commit 7a6a6cbc authored by Paolo Abeni's avatar Paolo Abeni Committed by David S. Miller

mptcp: recvmsg() can drain data from multiple subflows

With the previous patch in place, the msk can detect which subflow
has the current map with a simple walk, let's update the main
loop to always select the 'current' subflow. The exit conditions now
closely mirror tcp_recvmsg() to get expected timeout and signal
behavior.
Co-developed-by: default avatarPeter Krystad <peter.krystad@linux.intel.com>
Signed-off-by: default avatarPeter Krystad <peter.krystad@linux.intel.com>
Co-developed-by: default avatarDavide Caratti <dcaratti@redhat.com>
Signed-off-by: default avatarDavide Caratti <dcaratti@redhat.com>
Co-developed-by: default avatarMatthieu Baerts <matthieu.baerts@tessares.net>
Signed-off-by: default avatarMatthieu Baerts <matthieu.baerts@tessares.net>
Co-developed-by: default avatarMat Martineau <mathew.j.martineau@linux.intel.com>
Signed-off-by: default avatarMat Martineau <mathew.j.martineau@linux.intel.com>
Co-developed-by: default avatarFlorian Westphal <fw@strlen.de>
Signed-off-by: default avatarFlorian Westphal <fw@strlen.de>
Signed-off-by: default avatarPaolo Abeni <pabeni@redhat.com>
Signed-off-by: default avatarChristoph Paasch <cpaasch@apple.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 1891c4a0
...@@ -9,6 +9,8 @@ ...@@ -9,6 +9,8 @@
#include <linux/kernel.h> #include <linux/kernel.h>
#include <linux/module.h> #include <linux/module.h>
#include <linux/netdevice.h> #include <linux/netdevice.h>
#include <linux/sched/signal.h>
#include <linux/atomic.h>
#include <net/sock.h> #include <net/sock.h>
#include <net/inet_common.h> #include <net/inet_common.h>
#include <net/inet_hashtables.h> #include <net/inet_hashtables.h>
...@@ -105,6 +107,21 @@ static bool mptcp_ext_cache_refill(struct mptcp_sock *msk) ...@@ -105,6 +107,21 @@ static bool mptcp_ext_cache_refill(struct mptcp_sock *msk)
return !!msk->cached_ext; return !!msk->cached_ext;
} }
static struct sock *mptcp_subflow_recv_lookup(const struct mptcp_sock *msk)
{
struct mptcp_subflow_context *subflow;
struct sock *sk = (struct sock *)msk;
sock_owned_by_me(sk);
mptcp_for_each_subflow(msk, subflow) {
if (subflow->data_avail)
return mptcp_subflow_tcp_sock(subflow);
}
return NULL;
}
static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk, static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
struct msghdr *msg, long *timeo) struct msghdr *msg, long *timeo)
{ {
...@@ -269,13 +286,37 @@ int mptcp_read_actor(read_descriptor_t *desc, struct sk_buff *skb, ...@@ -269,13 +286,37 @@ int mptcp_read_actor(read_descriptor_t *desc, struct sk_buff *skb,
return copy_len; return copy_len;
} }
static void mptcp_wait_data(struct sock *sk, long *timeo)
{
DEFINE_WAIT_FUNC(wait, woken_wake_function);
struct mptcp_sock *msk = mptcp_sk(sk);
add_wait_queue(sk_sleep(sk), &wait);
sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
sk_wait_event(sk, timeo,
test_and_clear_bit(MPTCP_DATA_READY, &msk->flags), &wait);
sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
remove_wait_queue(sk_sleep(sk), &wait);
}
static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
int nonblock, int flags, int *addr_len) int nonblock, int flags, int *addr_len)
{ {
struct mptcp_sock *msk = mptcp_sk(sk); struct mptcp_sock *msk = mptcp_sk(sk);
struct mptcp_subflow_context *subflow;
bool more_data_avail = false;
struct mptcp_read_arg arg;
read_descriptor_t desc;
bool wait_data = false;
struct socket *ssock; struct socket *ssock;
struct tcp_sock *tp;
bool done = false;
struct sock *ssk; struct sock *ssk;
int copied = 0; int copied = 0;
int target;
long timeo;
if (msg->msg_flags & ~(MSG_WAITALL | MSG_DONTWAIT)) if (msg->msg_flags & ~(MSG_WAITALL | MSG_DONTWAIT))
return -EOPNOTSUPP; return -EOPNOTSUPP;
...@@ -290,16 +331,124 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, ...@@ -290,16 +331,124 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
return copied; return copied;
} }
ssk = mptcp_subflow_get(msk); arg.msg = msg;
if (!ssk) { desc.arg.data = &arg;
release_sock(sk); desc.error = 0;
return -ENOTCONN;
timeo = sock_rcvtimeo(sk, nonblock);
len = min_t(size_t, len, INT_MAX);
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
while (!done) {
u32 map_remaining;
int bytes_read;
ssk = mptcp_subflow_recv_lookup(msk);
pr_debug("msk=%p ssk=%p", msk, ssk);
if (!ssk)
goto wait_for_data;
subflow = mptcp_subflow_ctx(ssk);
tp = tcp_sk(ssk);
lock_sock(ssk);
do {
/* try to read as much data as available */
map_remaining = subflow->map_data_len -
mptcp_subflow_get_map_offset(subflow);
desc.count = min_t(size_t, len - copied, map_remaining);
pr_debug("reading %zu bytes, copied %d", desc.count,
copied);
bytes_read = tcp_read_sock(ssk, &desc,
mptcp_read_actor);
if (bytes_read < 0) {
if (!copied)
copied = bytes_read;
done = true;
goto next;
}
pr_debug("msk ack_seq=%llx -> %llx", msk->ack_seq,
msk->ack_seq + bytes_read);
msk->ack_seq += bytes_read;
copied += bytes_read;
if (copied >= len) {
done = true;
goto next;
}
if (tp->urg_data && tp->urg_seq == tp->copied_seq) {
pr_err("Urgent data present, cannot proceed");
done = true;
goto next;
}
next:
more_data_avail = mptcp_subflow_data_available(ssk);
} while (more_data_avail && !done);
release_sock(ssk);
continue;
wait_for_data:
more_data_avail = false;
/* only the master socket status is relevant here. The exit
* conditions mirror closely tcp_recvmsg()
*/
if (copied >= target)
break;
if (copied) {
if (sk->sk_err ||
sk->sk_state == TCP_CLOSE ||
(sk->sk_shutdown & RCV_SHUTDOWN) ||
!timeo ||
signal_pending(current))
break;
} else {
if (sk->sk_err) {
copied = sock_error(sk);
break;
}
if (sk->sk_shutdown & RCV_SHUTDOWN)
break;
if (sk->sk_state == TCP_CLOSE) {
copied = -ENOTCONN;
break;
}
if (!timeo) {
copied = -EAGAIN;
break;
}
if (signal_pending(current)) {
copied = sock_intr_errno(timeo);
break;
}
}
pr_debug("block timeout %ld", timeo);
wait_data = true;
mptcp_wait_data(sk, &timeo);
} }
copied = sock_recvmsg(ssk->sk_socket, msg, flags); if (more_data_avail) {
if (!test_bit(MPTCP_DATA_READY, &msk->flags))
set_bit(MPTCP_DATA_READY, &msk->flags);
} else if (!wait_data) {
clear_bit(MPTCP_DATA_READY, &msk->flags);
release_sock(sk); /* .. race-breaker: ssk might get new data after last
* data_available() returns false.
*/
ssk = mptcp_subflow_recv_lookup(msk);
if (unlikely(ssk))
set_bit(MPTCP_DATA_READY, &msk->flags);
}
release_sock(sk);
return copied; return copied;
} }
...@@ -460,10 +609,6 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err, ...@@ -460,10 +609,6 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
msk->write_seq = subflow->idsn + 1; msk->write_seq = subflow->idsn + 1;
ack_seq++; ack_seq++;
msk->ack_seq = ack_seq; msk->ack_seq = ack_seq;
subflow->map_seq = ack_seq;
subflow->map_subflow_seq = 1;
subflow->rel_write_seq = 1;
subflow->tcp_sock = ssk;
newsk = new_mptcp_sock; newsk = new_mptcp_sock;
mptcp_copy_inaddrs(newsk, ssk); mptcp_copy_inaddrs(newsk, ssk);
list_add(&subflow->node, &msk->conn_list); list_add(&subflow->node, &msk->conn_list);
...@@ -475,6 +620,19 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err, ...@@ -475,6 +620,19 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
bh_unlock_sock(new_mptcp_sock); bh_unlock_sock(new_mptcp_sock);
local_bh_enable(); local_bh_enable();
release_sock(sk); release_sock(sk);
/* the subflow can already receive packet, avoid racing with
* the receive path and process the pending ones
*/
lock_sock(ssk);
subflow->map_seq = ack_seq;
subflow->map_subflow_seq = 1;
subflow->rel_write_seq = 1;
subflow->tcp_sock = ssk;
subflow->conn = new_mptcp_sock;
if (unlikely(!skb_queue_empty(&ssk->sk_receive_queue)))
mptcp_subflow_data_available(ssk);
release_sock(ssk);
} }
return newsk; return newsk;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment