Commit 5f3e915c authored by Jakub Kicinski's avatar Jakub Kicinski

Merge branch 'mptcp-avoid-workqueue-usage-for-data'

Paolo Abeni says:

====================
mptcp: avoid workqueue usage for data

The current locking schema used to protect the MPTCP data-path
requires the usage of the MPTCP workqueue to process the incoming
data, depending on trylock result.

The above poses scalability limits and introduces random delays
in MPTCP-level acks.

With this series we use a single spinlock to protect the MPTCP
data-path, removing the need for workqueue and delayed ack usage.

This additionally reduces the number of atomic operations required
per packet and cleans-up considerably the poll/wake-up code.
====================

Link: https://lore.kernel.org/r/cover.1606413118.git.pabeni@redhat.comSigned-off-by: default avatarJakub Kicinski <kuba@kernel.org>
parents be572424 6e628cd3
......@@ -1590,6 +1590,7 @@ static inline void lock_sock(struct sock *sk)
lock_sock_nested(sk, 0);
}
void __lock_sock(struct sock *sk);
void __release_sock(struct sock *sk);
void release_sock(struct sock *sk);
......
......@@ -2486,7 +2486,7 @@ bool sk_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
}
EXPORT_SYMBOL(sk_page_frag_refill);
static void __lock_sock(struct sock *sk)
void __lock_sock(struct sock *sk)
__releases(&sk->sk_lock.slock)
__acquires(&sk->sk_lock.slock)
{
......
......@@ -140,7 +140,7 @@ static void mptcp_diag_get_info(struct sock *sk, struct inet_diag_msg *r,
info->mptcpi_flags = flags;
info->mptcpi_token = READ_ONCE(msk->token);
info->mptcpi_write_seq = READ_ONCE(msk->write_seq);
info->mptcpi_snd_una = atomic64_read(&msk->snd_una);
info->mptcpi_snd_una = READ_ONCE(msk->snd_una);
info->mptcpi_rcv_nxt = READ_ONCE(msk->ack_seq);
unlock_sock_fast(sk, slow);
}
......
......@@ -830,18 +830,20 @@ static u64 expand_ack(u64 old_ack, u64 cur_ack, bool use_64bit)
}
static void ack_update_msk(struct mptcp_sock *msk,
const struct sock *ssk,
struct sock *ssk,
struct mptcp_options_received *mp_opt)
{
u64 new_snd_una, snd_una, old_snd_una = atomic64_read(&msk->snd_una);
u64 new_wnd_end, wnd_end, old_wnd_end = atomic64_read(&msk->wnd_end);
u64 snd_nxt = READ_ONCE(msk->snd_nxt);
u64 new_wnd_end, new_snd_una, snd_nxt = READ_ONCE(msk->snd_nxt);
struct sock *sk = (struct sock *)msk;
u64 old_snd_una;
mptcp_data_lock(sk);
/* avoid ack expansion on update conflict, to reduce the risk of
* wrongly expanding to a future ack sequence number, which is way
* more dangerous than missing an ack
*/
old_snd_una = msk->snd_una;
new_snd_una = expand_ack(old_snd_una, mp_opt->data_ack, mp_opt->ack64);
/* ACK for data not even sent yet? Ignore. */
......@@ -850,26 +852,16 @@ static void ack_update_msk(struct mptcp_sock *msk,
new_wnd_end = new_snd_una + tcp_sk(ssk)->snd_wnd;
while (after64(new_wnd_end, old_wnd_end)) {
wnd_end = old_wnd_end;
old_wnd_end = atomic64_cmpxchg(&msk->wnd_end, wnd_end,
new_wnd_end);
if (old_wnd_end == wnd_end) {
if (mptcp_send_head(sk))
mptcp_schedule_work(sk);
break;
}
if (after64(new_wnd_end, msk->wnd_end)) {
msk->wnd_end = new_wnd_end;
__mptcp_wnd_updated(sk, ssk);
}
while (after64(new_snd_una, old_snd_una)) {
snd_una = old_snd_una;
old_snd_una = atomic64_cmpxchg(&msk->snd_una, snd_una,
new_snd_una);
if (old_snd_una == snd_una) {
mptcp_data_acked(sk);
break;
}
if (after64(new_snd_una, old_snd_una)) {
msk->snd_una = new_snd_una;
__mptcp_data_acked(sk);
}
mptcp_data_unlock(sk);
}
bool mptcp_update_rcv_data_fin(struct mptcp_sock *msk, u64 data_fin_seq, bool use_64bit)
......@@ -922,8 +914,19 @@ void mptcp_incoming_options(struct sock *sk, struct sk_buff *skb)
struct mptcp_options_received mp_opt;
struct mptcp_ext *mpext;
if (__mptcp_check_fallback(msk))
if (__mptcp_check_fallback(msk)) {
/* Keep it simple and unconditionally trigger send data cleanup and
* pending queue spooling. We will need to acquire the data lock
* for more accurate checks, and once the lock is acquired, such
* helpers are cheap.
*/
mptcp_data_lock(subflow->conn);
if (mptcp_send_head(subflow->conn))
__mptcp_wnd_updated(subflow->conn, sk);
__mptcp_data_acked(subflow->conn);
mptcp_data_unlock(subflow->conn);
return;
}
mptcp_get_options(skb, &mp_opt);
if (!check_fully_established(msk, sk, subflow, skb, &mp_opt))
......
......@@ -60,7 +60,7 @@ static struct socket *__mptcp_nmpc_socket(const struct mptcp_sock *msk)
/* Returns end sequence number of the receiver's advertised window */
static u64 mptcp_wnd_end(const struct mptcp_sock *msk)
{
return atomic64_read(&msk->wnd_end);
return READ_ONCE(msk->wnd_end);
}
static bool mptcp_is_tcpsk(struct sock *sk)
......@@ -348,17 +348,22 @@ static void mptcp_close_wake_up(struct sock *sk)
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
}
static void mptcp_check_data_fin_ack(struct sock *sk)
static bool mptcp_pending_data_fin_ack(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
if (__mptcp_check_fallback(msk))
return;
return !__mptcp_check_fallback(msk) &&
((1 << sk->sk_state) &
(TCPF_FIN_WAIT1 | TCPF_CLOSING | TCPF_LAST_ACK)) &&
msk->write_seq == READ_ONCE(msk->snd_una);
}
static void mptcp_check_data_fin_ack(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
/* Look for an acknowledged DATA_FIN */
if (((1 << sk->sk_state) &
(TCPF_FIN_WAIT1 | TCPF_CLOSING | TCPF_LAST_ACK)) &&
msk->write_seq == atomic64_read(&msk->snd_una)) {
if (mptcp_pending_data_fin_ack(sk)) {
mptcp_stop_timer(sk);
WRITE_ONCE(msk->snd_data_fin_enable, 0);
......@@ -453,15 +458,15 @@ static bool mptcp_subflow_cleanup_rbuf(struct sock *ssk)
static void mptcp_cleanup_rbuf(struct mptcp_sock *msk)
{
struct sock *ack_hint = READ_ONCE(msk->ack_hint);
struct mptcp_subflow_context *subflow;
/* if the hinted ssk is still active, try to use it */
if (likely(msk->ack_hint)) {
if (likely(ack_hint)) {
mptcp_for_each_subflow(msk, subflow) {
struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
if (msk->ack_hint == ssk &&
mptcp_subflow_cleanup_rbuf(ssk))
if (ack_hint == ssk && mptcp_subflow_cleanup_rbuf(ssk))
return;
}
}
......@@ -614,13 +619,13 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,
break;
}
} while (more_data_avail);
msk->ack_hint = ssk;
WRITE_ONCE(msk->ack_hint, ssk);
*bytes += moved;
return done;
}
static bool mptcp_ofo_queue(struct mptcp_sock *msk)
static bool __mptcp_ofo_queue(struct mptcp_sock *msk)
{
struct sock *sk = (struct sock *)msk;
struct sk_buff *skb, *tail;
......@@ -666,34 +671,27 @@ static bool mptcp_ofo_queue(struct mptcp_sock *msk)
/* In most cases we will be able to lock the mptcp socket. If its already
* owned, we need to defer to the work queue to avoid ABBA deadlock.
*/
static bool move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk)
static void move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk)
{
struct sock *sk = (struct sock *)msk;
unsigned int moved = 0;
if (READ_ONCE(sk->sk_lock.owned))
return false;
if (unlikely(!spin_trylock_bh(&sk->sk_lock.slock)))
return false;
/* must re-check after taking the lock */
if (!READ_ONCE(sk->sk_lock.owned)) {
__mptcp_move_skbs_from_subflow(msk, ssk, &moved);
mptcp_ofo_queue(msk);
if (inet_sk_state_load(sk) == TCP_CLOSE)
return;
/* If the moves have caught up with the DATA_FIN sequence number
* it's time to ack the DATA_FIN and change socket state, but
* this is not a good place to change state. Let the workqueue
* do it.
*/
if (mptcp_pending_data_fin(sk, NULL))
mptcp_schedule_work(sk);
}
mptcp_data_lock(sk);
spin_unlock_bh(&sk->sk_lock.slock);
__mptcp_move_skbs_from_subflow(msk, ssk, &moved);
__mptcp_ofo_queue(msk);
return moved > 0;
/* If the moves have caught up with the DATA_FIN sequence number
* it's time to ack the DATA_FIN and change socket state, but
* this is not a good place to change state. Let the workqueue
* do it.
*/
if (mptcp_pending_data_fin(sk, NULL))
mptcp_schedule_work(sk);
mptcp_data_unlock(sk);
}
void mptcp_data_ready(struct sock *sk, struct sock *ssk)
......@@ -771,16 +769,6 @@ bool mptcp_schedule_work(struct sock *sk)
return false;
}
void mptcp_data_acked(struct sock *sk)
{
mptcp_reset_timer(sk);
if ((test_bit(MPTCP_NOSPACE, &mptcp_sk(sk)->flags) ||
mptcp_send_head(sk) ||
(inet_sk_state_load(sk) != TCP_ESTABLISHED)))
mptcp_schedule_work(sk);
}
void mptcp_subflow_eof(struct sock *sk)
{
if (!test_and_set_bit(MPTCP_WORK_EOF, &mptcp_sk(sk)->flags))
......@@ -825,16 +813,6 @@ static void mptcp_check_for_eof(struct mptcp_sock *msk)
mptcp_close_wake_up(sk);
}
static bool mptcp_ext_cache_refill(struct mptcp_sock *msk)
{
const struct sock *sk = (const struct sock *)msk;
if (!msk->cached_ext)
msk->cached_ext = __skb_ext_alloc(sk->sk_allocation);
return !!msk->cached_ext;
}
static struct sock *mptcp_subflow_recv_lookup(const struct mptcp_sock *msk)
{
struct mptcp_subflow_context *subflow;
......@@ -873,6 +851,121 @@ static bool mptcp_frag_can_collapse_to(const struct mptcp_sock *msk,
df->data_seq + df->data_len == msk->write_seq;
}
static int mptcp_wmem_with_overhead(struct sock *sk, int size)
{
struct mptcp_sock *msk = mptcp_sk(sk);
int ret, skbs;
ret = size + ((sizeof(struct mptcp_data_frag) * size) >> PAGE_SHIFT);
skbs = (msk->tx_pending_data + size) / msk->size_goal_cache;
if (skbs < msk->skb_tx_cache.qlen)
return ret;
return ret + (skbs - msk->skb_tx_cache.qlen) * SKB_TRUESIZE(MAX_TCP_HEADER);
}
static void __mptcp_wmem_reserve(struct sock *sk, int size)
{
int amount = mptcp_wmem_with_overhead(sk, size);
struct mptcp_sock *msk = mptcp_sk(sk);
WARN_ON_ONCE(msk->wmem_reserved);
if (amount <= sk->sk_forward_alloc)
goto reserve;
/* under memory pressure try to reserve at most a single page
* otherwise try to reserve the full estimate and fallback
* to a single page before entering the error path
*/
if ((tcp_under_memory_pressure(sk) && amount > PAGE_SIZE) ||
!sk_wmem_schedule(sk, amount)) {
if (amount <= PAGE_SIZE)
goto nomem;
amount = PAGE_SIZE;
if (!sk_wmem_schedule(sk, amount))
goto nomem;
}
reserve:
msk->wmem_reserved = amount;
sk->sk_forward_alloc -= amount;
return;
nomem:
/* we will wait for memory on next allocation */
msk->wmem_reserved = -1;
}
static void __mptcp_update_wmem(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
if (!msk->wmem_reserved)
return;
if (msk->wmem_reserved < 0)
msk->wmem_reserved = 0;
if (msk->wmem_reserved > 0) {
sk->sk_forward_alloc += msk->wmem_reserved;
msk->wmem_reserved = 0;
}
}
static bool mptcp_wmem_alloc(struct sock *sk, int size)
{
struct mptcp_sock *msk = mptcp_sk(sk);
/* check for pre-existing error condition */
if (msk->wmem_reserved < 0)
return false;
if (msk->wmem_reserved >= size)
goto account;
mptcp_data_lock(sk);
if (!sk_wmem_schedule(sk, size)) {
mptcp_data_unlock(sk);
return false;
}
sk->sk_forward_alloc -= size;
msk->wmem_reserved += size;
mptcp_data_unlock(sk);
account:
msk->wmem_reserved -= size;
return true;
}
static void mptcp_wmem_uncharge(struct sock *sk, int size)
{
struct mptcp_sock *msk = mptcp_sk(sk);
if (msk->wmem_reserved < 0)
msk->wmem_reserved = 0;
msk->wmem_reserved += size;
}
static void mptcp_mem_reclaim_partial(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
/* if we are experiencing a transint allocation error,
* the forward allocation memory has been already
* released
*/
if (msk->wmem_reserved < 0)
return;
mptcp_data_lock(sk);
sk->sk_forward_alloc += msk->wmem_reserved;
sk_mem_reclaim_partial(sk);
msk->wmem_reserved = sk->sk_forward_alloc;
sk->sk_forward_alloc = 0;
mptcp_data_unlock(sk);
}
static void dfrag_uncharge(struct sock *sk, int len)
{
sk_mem_uncharge(sk, len);
......@@ -888,7 +981,7 @@ static void dfrag_clear(struct sock *sk, struct mptcp_data_frag *dfrag)
put_page(dfrag->page);
}
static void mptcp_clean_una(struct sock *sk)
static void __mptcp_clean_una(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
struct mptcp_data_frag *dtmp, *dfrag;
......@@ -899,10 +992,9 @@ static void mptcp_clean_una(struct sock *sk)
* plain TCP
*/
if (__mptcp_check_fallback(msk))
atomic64_set(&msk->snd_una, msk->snd_nxt);
snd_una = atomic64_read(&msk->snd_una);
msk->snd_una = READ_ONCE(msk->snd_nxt);
snd_una = msk->snd_una;
list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list) {
if (after64(dfrag->data_seq + dfrag->data_len, snd_una))
break;
......@@ -930,36 +1022,34 @@ static void mptcp_clean_una(struct sock *sk)
}
out:
if (cleaned)
sk_mem_reclaim_partial(sk);
}
static void mptcp_clean_una_wakeup(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
if (cleaned) {
if (tcp_under_memory_pressure(sk)) {
__mptcp_update_wmem(sk);
sk_mem_reclaim_partial(sk);
}
mptcp_clean_una(sk);
if (sk_stream_is_writeable(sk)) {
/* pairs with memory barrier in mptcp_poll */
smp_mb();
if (test_and_clear_bit(MPTCP_NOSPACE, &msk->flags))
sk_stream_write_space(sk);
}
}
/* Only wake up writers if a subflow is ready */
if (sk_stream_is_writeable(sk)) {
clear_bit(MPTCP_NOSPACE, &msk->flags);
sk_stream_write_space(sk);
if (snd_una == READ_ONCE(msk->snd_nxt)) {
if (msk->timer_ival)
mptcp_stop_timer(sk);
} else {
mptcp_reset_timer(sk);
}
}
/* ensure we get enough memory for the frag hdr, beyond some minimal amount of
* data
*/
static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
static void mptcp_enter_memory_pressure(struct sock *sk)
{
struct mptcp_subflow_context *subflow;
struct mptcp_sock *msk = mptcp_sk(sk);
bool first = true;
if (likely(skb_page_frag_refill(32U + sizeof(struct mptcp_data_frag),
pfrag, sk->sk_allocation)))
return true;
sk_stream_moderate_sndbuf(sk);
mptcp_for_each_subflow(msk, subflow) {
struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
......@@ -969,6 +1059,18 @@ static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
sk_stream_moderate_sndbuf(ssk);
first = false;
}
}
/* ensure we get enough memory for the frag hdr, beyond some minimal amount of
* data
*/
static bool mptcp_page_frag_refill(struct sock *sk, struct page_frag *pfrag)
{
if (likely(skb_page_frag_refill(32U + sizeof(struct mptcp_data_frag),
pfrag, sk->sk_allocation)))
return true;
mptcp_enter_memory_pressure(sk);
return false;
}
......@@ -1015,6 +1117,128 @@ static int mptcp_check_allowed_size(struct mptcp_sock *msk, u64 data_seq,
return avail_size;
}
static bool __mptcp_add_ext(struct sk_buff *skb, gfp_t gfp)
{
struct skb_ext *mpext = __skb_ext_alloc(gfp);
if (!mpext)
return false;
__skb_ext_set(skb, SKB_EXT_MPTCP, mpext);
return true;
}
static struct sk_buff *__mptcp_do_alloc_tx_skb(struct sock *sk, gfp_t gfp)
{
struct sk_buff *skb;
skb = alloc_skb_fclone(MAX_TCP_HEADER, gfp);
if (likely(skb)) {
if (likely(__mptcp_add_ext(skb, gfp))) {
skb_reserve(skb, MAX_TCP_HEADER);
skb->reserved_tailroom = skb->end - skb->tail;
return skb;
}
__kfree_skb(skb);
} else {
mptcp_enter_memory_pressure(sk);
}
return NULL;
}
static bool mptcp_tx_cache_refill(struct sock *sk, int size,
struct sk_buff_head *skbs, int *total_ts)
{
struct mptcp_sock *msk = mptcp_sk(sk);
struct sk_buff *skb;
int space_needed;
if (unlikely(tcp_under_memory_pressure(sk))) {
mptcp_mem_reclaim_partial(sk);
/* under pressure pre-allocate at most a single skb */
if (msk->skb_tx_cache.qlen)
return true;
space_needed = msk->size_goal_cache;
} else {
space_needed = msk->tx_pending_data + size -
msk->skb_tx_cache.qlen * msk->size_goal_cache;
}
while (space_needed > 0) {
skb = __mptcp_do_alloc_tx_skb(sk, sk->sk_allocation);
if (unlikely(!skb)) {
/* under memory pressure, try to pass the caller a
* single skb to allow forward progress
*/
while (skbs->qlen > 1) {
skb = __skb_dequeue_tail(skbs);
__kfree_skb(skb);
}
return skbs->qlen > 0;
}
*total_ts += skb->truesize;
__skb_queue_tail(skbs, skb);
space_needed -= msk->size_goal_cache;
}
return true;
}
static bool __mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk, gfp_t gfp)
{
struct mptcp_sock *msk = mptcp_sk(sk);
struct sk_buff *skb;
if (ssk->sk_tx_skb_cache) {
skb = ssk->sk_tx_skb_cache;
if (unlikely(!skb_ext_find(skb, SKB_EXT_MPTCP) &&
!__mptcp_add_ext(skb, gfp)))
return false;
return true;
}
skb = skb_peek(&msk->skb_tx_cache);
if (skb) {
if (likely(sk_wmem_schedule(ssk, skb->truesize))) {
skb = __skb_dequeue(&msk->skb_tx_cache);
if (WARN_ON_ONCE(!skb))
return false;
mptcp_wmem_uncharge(sk, skb->truesize);
ssk->sk_tx_skb_cache = skb;
return true;
}
/* over memory limit, no point to try to allocate a new skb */
return false;
}
skb = __mptcp_do_alloc_tx_skb(sk, gfp);
if (!skb)
return false;
if (likely(sk_wmem_schedule(ssk, skb->truesize))) {
ssk->sk_tx_skb_cache = skb;
return true;
}
kfree_skb(skb);
return false;
}
static bool mptcp_must_reclaim_memory(struct sock *sk, struct sock *ssk)
{
return !ssk->sk_tx_skb_cache &&
!skb_peek(&mptcp_sk(sk)->skb_tx_cache) &&
tcp_under_memory_pressure(sk);
}
static bool mptcp_alloc_tx_skb(struct sock *sk, struct sock *ssk)
{
if (unlikely(mptcp_must_reclaim_memory(sk, ssk)))
mptcp_mem_reclaim_partial(sk);
return __mptcp_alloc_tx_skb(sk, ssk, sk->sk_allocation);
}
static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
struct mptcp_data_frag *dfrag,
struct mptcp_sendmsg_info *info)
......@@ -1026,7 +1250,7 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
struct sk_buff *skb, *tail;
bool can_collapse = false;
int avail_size;
size_t ret;
size_t ret = 0;
pr_debug("msk=%p ssk=%p sending dfrag at seq=%lld len=%d already sent=%d",
msk, ssk, dfrag->data_seq, dfrag->data_len, info->sent);
......@@ -1034,6 +1258,7 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
/* compute send limit */
info->mss_now = tcp_send_mss(ssk, &info->size_goal, info->flags);
avail_size = info->size_goal;
msk->size_goal_cache = info->size_goal;
skb = tcp_write_queue_tail(ssk);
if (skb) {
/* Limit the write to the size available in the
......@@ -1054,10 +1279,12 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
/* Zero window and all data acked? Probe. */
avail_size = mptcp_check_allowed_size(msk, data_seq, avail_size);
if (avail_size == 0) {
if (skb || atomic64_read(&msk->snd_una) != msk->snd_nxt)
u64 snd_una = READ_ONCE(msk->snd_una);
if (skb || snd_una != msk->snd_nxt)
return 0;
zero_window_probe = true;
data_seq = atomic64_read(&msk->snd_una) - 1;
data_seq = snd_una - 1;
avail_size = 1;
}
......@@ -1082,8 +1309,11 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
goto out;
}
mpext = __skb_ext_set(tail, SKB_EXT_MPTCP, msk->cached_ext);
msk->cached_ext = NULL;
mpext = skb_ext_find(tail, SKB_EXT_MPTCP);
if (WARN_ON_ONCE(!mpext)) {
/* should never reach here, stream corrupted */
return -EINVAL;
}
memset(mpext, 0, sizeof(*mpext));
mpext->data_seq = data_seq;
......@@ -1107,31 +1337,6 @@ static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
return ret;
}
static void mptcp_nospace(struct mptcp_sock *msk)
{
struct mptcp_subflow_context *subflow;
set_bit(MPTCP_NOSPACE, &msk->flags);
smp_mb__after_atomic(); /* msk->flags is changed by write_space cb */
mptcp_for_each_subflow(msk, subflow) {
struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
bool ssk_writeable = sk_stream_is_writeable(ssk);
struct socket *sock = READ_ONCE(ssk->sk_socket);
if (ssk_writeable || !sock)
continue;
/* enables ssk->write_space() callbacks */
set_bit(SOCK_NOSPACE, &sock->flags);
}
/* mptcp_data_acked() could run just before we set the NOSPACE bit,
* so explicitly check for snd_una value
*/
mptcp_clean_una((struct sock *)msk);
}
#define MPTCP_SEND_BURST_SIZE ((1 << 16) - \
sizeof(struct tcphdr) - \
MAX_TCP_OPTION_SPACE - \
......@@ -1156,9 +1361,6 @@ static struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk,
sock_owned_by_me((struct sock *)msk);
*sndbuf = 0;
if (!mptcp_ext_cache_refill(msk))
return NULL;
if (__mptcp_check_fallback(msk)) {
if (!msk->first)
return NULL;
......@@ -1267,6 +1469,15 @@ static void mptcp_push_pending(struct sock *sk, unsigned int flags)
if (ssk != prev_ssk || !prev_ssk)
lock_sock(ssk);
/* keep it simple and always provide a new skb for the
* subflow, even if we will not use it when collapsing
* on the pending one
*/
if (!mptcp_alloc_tx_skb(sk, ssk)) {
mptcp_push_release(sk, ssk, &info);
goto out;
}
ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
if (ret <= 0) {
mptcp_push_release(sk, ssk, &info);
......@@ -1277,6 +1488,7 @@ static void mptcp_push_pending(struct sock *sk, unsigned int flags)
dfrag->already_sent += ret;
msk->snd_nxt += ret;
msk->snd_burst -= ret;
msk->tx_pending_data -= ret;
copied += ret;
len -= ret;
}
......@@ -1296,6 +1508,63 @@ static void mptcp_push_pending(struct sock *sk, unsigned int flags)
}
}
static void __mptcp_subflow_push_pending(struct sock *sk, struct sock *ssk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
struct mptcp_sendmsg_info info;
struct mptcp_data_frag *dfrag;
int len, copied = 0;
info.flags = 0;
while ((dfrag = mptcp_send_head(sk))) {
info.sent = dfrag->already_sent;
info.limit = dfrag->data_len;
len = dfrag->data_len - dfrag->already_sent;
while (len > 0) {
int ret = 0;
/* do auto tuning */
if (!(sk->sk_userlocks & SOCK_SNDBUF_LOCK) &&
ssk->sk_sndbuf > READ_ONCE(sk->sk_sndbuf))
WRITE_ONCE(sk->sk_sndbuf, ssk->sk_sndbuf);
if (unlikely(mptcp_must_reclaim_memory(sk, ssk))) {
__mptcp_update_wmem(sk);
sk_mem_reclaim_partial(sk);
}
if (!__mptcp_alloc_tx_skb(sk, ssk, GFP_ATOMIC))
goto out;
ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
if (ret <= 0)
goto out;
info.sent += ret;
dfrag->already_sent += ret;
msk->snd_nxt += ret;
msk->snd_burst -= ret;
msk->tx_pending_data -= ret;
copied += ret;
len -= ret;
}
WRITE_ONCE(msk->first_pending, mptcp_send_next(sk));
}
out:
/* __mptcp_alloc_tx_skb could have released some wmem and we are
* not going to flush it via release_sock()
*/
__mptcp_update_wmem(sk);
if (copied) {
mptcp_set_timeout(sk, ssk);
tcp_push(ssk, 0, info.mss_now, tcp_sk(ssk)->nonagle,
info.size_goal);
if (msk->snd_data_fin_enable &&
msk->snd_nxt + 1 == msk->write_seq)
mptcp_schedule_work(sk);
}
}
static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
{
struct mptcp_sock *msk = mptcp_sk(sk);
......@@ -1307,7 +1576,7 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
if (msg->msg_flags & ~(MSG_MORE | MSG_DONTWAIT | MSG_NOSIGNAL))
return -EOPNOTSUPP;
lock_sock(sk);
mptcp_lock_sock(sk, __mptcp_wmem_reserve(sk, len));
timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
......@@ -1318,11 +1587,11 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
}
pfrag = sk_page_frag(sk);
mptcp_clean_una(sk);
while (msg_data_left(msg)) {
int total_ts, frag_truesize = 0;
struct mptcp_data_frag *dfrag;
int frag_truesize = 0;
struct sk_buff_head skbs;
bool dfrag_collapsed;
size_t psize, offset;
......@@ -1337,11 +1606,9 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
dfrag = mptcp_pending_tail(sk);
dfrag_collapsed = mptcp_frag_can_collapse_to(msk, pfrag, dfrag);
if (!dfrag_collapsed) {
if (!sk_stream_memory_free(sk)) {
mptcp_push_pending(sk, msg->msg_flags);
if (!sk_stream_memory_free(sk))
goto wait_for_memory;
}
if (!sk_stream_memory_free(sk))
goto wait_for_memory;
if (!mptcp_page_frag_refill(sk, pfrag))
goto wait_for_memory;
......@@ -1356,11 +1623,20 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
offset = dfrag->offset + dfrag->data_len;
psize = pfrag->size - offset;
psize = min_t(size_t, psize, msg_data_left(msg));
if (!sk_wmem_schedule(sk, psize + frag_truesize))
total_ts = psize + frag_truesize;
__skb_queue_head_init(&skbs);
if (!mptcp_tx_cache_refill(sk, psize, &skbs, &total_ts))
goto wait_for_memory;
if (!mptcp_wmem_alloc(sk, total_ts)) {
__skb_queue_purge(&skbs);
goto wait_for_memory;
}
skb_queue_splice_tail(&skbs, &msk->skb_tx_cache);
if (copy_page_from_iter(dfrag->page, offset, psize,
&msg->msg_iter) != psize) {
mptcp_wmem_uncharge(sk, psize + frag_truesize);
ret = -EFAULT;
goto out;
}
......@@ -1376,7 +1652,6 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
* Note: we charge such data both to sk and ssk
*/
sk_wmem_queued_add(sk, frag_truesize);
sk->sk_forward_alloc -= frag_truesize;
if (!dfrag_collapsed) {
get_page(dfrag->page);
list_add_tail(&dfrag->list, &msk->rtx_queue);
......@@ -1387,21 +1662,20 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
dfrag->data_seq, dfrag->data_len, dfrag->already_sent,
!dfrag_collapsed);
if (!mptcp_ext_cache_refill(msk))
goto wait_for_memory;
continue;
wait_for_memory:
mptcp_nospace(msk);
if (mptcp_timer_pending(sk))
mptcp_reset_timer(sk);
set_bit(MPTCP_NOSPACE, &msk->flags);
mptcp_push_pending(sk, msg->msg_flags);
ret = sk_stream_wait_memory(sk, &timeo);
if (ret)
goto out;
}
if (copied)
if (copied) {
msk->tx_pending_data += copied;
mptcp_push_pending(sk, msg->msg_flags);
}
out:
release_sock(sk);
......@@ -1427,11 +1701,10 @@ static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk,
struct msghdr *msg,
size_t len)
{
struct sock *sk = (struct sock *)msk;
struct sk_buff *skb;
int copied = 0;
while ((skb = skb_peek(&sk->sk_receive_queue)) != NULL) {
while ((skb = skb_peek(&msk->receive_queue)) != NULL) {
u32 offset = MPTCP_SKB_CB(skb)->offset;
u32 data_len = skb->len - offset;
u32 count = min_t(size_t, len - copied, data_len);
......@@ -1451,7 +1724,10 @@ static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk,
break;
}
__skb_unlink(skb, &sk->sk_receive_queue);
/* we will bulk release the skb memory later */
skb->destructor = NULL;
msk->rmem_released += skb->truesize;
__skb_unlink(skb, &msk->receive_queue);
__kfree_skb(skb);
if (copied >= len)
......@@ -1559,25 +1835,47 @@ static void mptcp_rcv_space_adjust(struct mptcp_sock *msk, int copied)
msk->rcvq_space.time = mstamp;
}
static void __mptcp_update_rmem(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
if (!msk->rmem_released)
return;
atomic_sub(msk->rmem_released, &sk->sk_rmem_alloc);
sk_mem_uncharge(sk, msk->rmem_released);
msk->rmem_released = 0;
}
static void __mptcp_splice_receive_queue(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
skb_queue_splice_tail_init(&sk->sk_receive_queue, &msk->receive_queue);
}
static bool __mptcp_move_skbs(struct mptcp_sock *msk, unsigned int rcv)
{
struct sock *sk = (struct sock *)msk;
unsigned int moved = 0;
bool done;
/* avoid looping forever below on racing close */
if (((struct sock *)msk)->sk_state == TCP_CLOSE)
return false;
bool ret, done;
__mptcp_flush_join_list(msk);
do {
struct sock *ssk = mptcp_subflow_recv_lookup(msk);
bool slowpath;
if (!ssk)
/* we can have data pending in the subflows only if the msk
* receive buffer was full at subflow_data_ready() time,
* that is an unlikely slow path.
*/
if (likely(!ssk))
break;
slowpath = lock_sock_fast(ssk);
mptcp_data_lock(sk);
done = __mptcp_move_skbs_from_subflow(msk, ssk, &moved);
mptcp_data_unlock(sk);
if (moved && rcv) {
WRITE_ONCE(msk->rmem_pending, min(rcv, moved));
tcp_cleanup_rbuf(ssk, 1);
......@@ -1586,11 +1884,19 @@ static bool __mptcp_move_skbs(struct mptcp_sock *msk, unsigned int rcv)
unlock_sock_fast(ssk, slowpath);
} while (!done);
if (mptcp_ofo_queue(msk) || moved > 0) {
mptcp_check_data_fin((struct sock *)msk);
return true;
/* acquire the data lock only if some input data is pending */
ret = moved > 0;
if (!RB_EMPTY_ROOT(&msk->out_of_order_queue) ||
!skb_queue_empty_lockless(&sk->sk_receive_queue)) {
mptcp_data_lock(sk);
__mptcp_update_rmem(sk);
ret |= __mptcp_ofo_queue(msk);
__mptcp_splice_receive_queue(sk);
mptcp_data_unlock(sk);
}
return false;
if (ret)
mptcp_check_data_fin((struct sock *)msk);
return !skb_queue_empty(&msk->receive_queue);
}
static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
......@@ -1604,7 +1910,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
if (msg->msg_flags & ~(MSG_WAITALL | MSG_DONTWAIT))
return -EOPNOTSUPP;
lock_sock(sk);
mptcp_lock_sock(sk, __mptcp_splice_receive_queue(sk));
if (unlikely(sk->sk_state == TCP_LISTEN)) {
copied = -ENOTCONN;
goto out_err;
......@@ -1614,7 +1920,6 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
len = min_t(size_t, len, INT_MAX);
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
__mptcp_flush_join_list(msk);
for (;;) {
int bytes_read, old_space;
......@@ -1628,7 +1933,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
copied += bytes_read;
if (skb_queue_empty(&sk->sk_receive_queue) &&
if (skb_queue_empty(&msk->receive_queue) &&
__mptcp_move_skbs(msk, len - copied))
continue;
......@@ -1659,8 +1964,14 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
if (test_and_clear_bit(MPTCP_WORK_EOF, &msk->flags))
mptcp_check_for_eof(msk);
if (sk->sk_shutdown & RCV_SHUTDOWN)
if (sk->sk_shutdown & RCV_SHUTDOWN) {
/* race breaker: the shutdown could be after the
* previous receive queue check
*/
if (__mptcp_move_skbs(msk, len - copied))
continue;
break;
}
if (sk->sk_state == TCP_CLOSE) {
copied = -ENOTCONN;
......@@ -1682,7 +1993,8 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
mptcp_wait_data(sk, &timeo);
}
if (skb_queue_empty(&sk->sk_receive_queue)) {
if (skb_queue_empty_lockless(&sk->sk_receive_queue) &&
skb_queue_empty(&msk->receive_queue)) {
/* entire backlog drained, clear DATA_READY. */
clear_bit(MPTCP_DATA_READY, &msk->flags);
......@@ -1698,7 +2010,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
out_err:
pr_debug("msk=%p data_ready=%d rx queue empty=%d copied=%d",
msk, test_bit(MPTCP_DATA_READY, &msk->flags),
skb_queue_empty(&sk->sk_receive_queue), copied);
skb_queue_empty_lockless(&sk->sk_receive_queue), copied);
mptcp_rcv_space_adjust(msk, copied);
release_sock(sk);
......@@ -1709,12 +2021,8 @@ static void mptcp_retransmit_handler(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
if (atomic64_read(&msk->snd_una) == READ_ONCE(msk->snd_nxt)) {
mptcp_stop_timer(sk);
} else {
set_bit(MPTCP_WORK_RTX, &msk->flags);
mptcp_schedule_work(sk);
}
set_bit(MPTCP_WORK_RTX, &msk->flags);
mptcp_schedule_work(sk);
}
static void mptcp_retransmit_timer(struct timer_list *t)
......@@ -1915,21 +2223,18 @@ static void mptcp_worker(struct work_struct *work)
if (unlikely(state == TCP_CLOSE))
goto unlock;
mptcp_clean_una_wakeup(sk);
mptcp_check_data_fin_ack(sk);
__mptcp_flush_join_list(msk);
if (test_and_clear_bit(MPTCP_WORK_CLOSE_SUBFLOW, &msk->flags))
__mptcp_close_subflow(msk);
if (mptcp_send_head(sk))
mptcp_push_pending(sk, 0);
if (msk->pm.status)
pm_work(msk);
if (test_and_clear_bit(MPTCP_WORK_EOF, &msk->flags))
mptcp_check_for_eof(msk);
__mptcp_check_send_data_fin(sk);
mptcp_check_data_fin(sk);
/* if the msk data is completely acked, or the socket timedout,
......@@ -1951,9 +2256,6 @@ static void mptcp_worker(struct work_struct *work)
if (!dfrag)
goto unlock;
if (!mptcp_ext_cache_refill(msk))
goto reset_unlock;
ssk = mptcp_subflow_get_retrans(msk);
if (!ssk)
goto reset_unlock;
......@@ -1964,6 +2266,9 @@ static void mptcp_worker(struct work_struct *work)
info.sent = 0;
info.limit = dfrag->already_sent;
while (info.sent < dfrag->already_sent) {
if (!mptcp_alloc_tx_skb(sk, ssk))
break;
ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
if (ret <= 0)
break;
......@@ -1971,9 +2276,6 @@ static void mptcp_worker(struct work_struct *work)
MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_RETRANSSEGS);
copied += ret;
info.sent += ret;
if (!mptcp_ext_cache_refill(msk))
break;
}
if (copied)
tcp_push(ssk, 0, info.mss_now, tcp_sk(ssk)->nonagle,
......@@ -2001,8 +2303,14 @@ static int __mptcp_init_sock(struct sock *sk)
INIT_LIST_HEAD(&msk->join_list);
INIT_LIST_HEAD(&msk->rtx_queue);
INIT_WORK(&msk->work, mptcp_worker);
__skb_queue_head_init(&msk->receive_queue);
__skb_queue_head_init(&msk->skb_tx_cache);
msk->out_of_order_queue = RB_ROOT;
msk->first_pending = NULL;
msk->wmem_reserved = 0;
msk->rmem_released = 0;
msk->tx_pending_data = 0;
msk->size_goal_cache = TCP_BASE_MSS;
msk->ack_hint = NULL;
msk->first = NULL;
......@@ -2046,12 +2354,15 @@ static void __mptcp_clear_xmit(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
struct mptcp_data_frag *dtmp, *dfrag;
sk_stop_timer(sk, &msk->sk.icsk_retransmit_timer);
struct sk_buff *skb;
WRITE_ONCE(msk->first_pending, NULL);
list_for_each_entry_safe(dfrag, dtmp, &msk->rtx_queue, list)
dfrag_clear(sk, dfrag);
while ((skb = __skb_dequeue(&msk->skb_tx_cache)) != NULL) {
sk->sk_forward_alloc += skb->truesize;
kfree_skb(skb);
}
}
static void mptcp_cancel_work(struct sock *sk)
......@@ -2186,7 +2497,7 @@ static void __mptcp_destroy_sock(struct sock *sk)
spin_unlock_bh(&msk->join_list_lock);
list_splice_init(&msk->conn_list, &conn_list);
__mptcp_clear_xmit(sk);
sk_stop_timer(sk, &msk->sk.icsk_retransmit_timer);
sk_stop_timer(sk, &sk->sk_timer);
msk->pm.status = 0;
......@@ -2197,6 +2508,8 @@ static void __mptcp_destroy_sock(struct sock *sk)
sk->sk_prot->destroy(sk);
WARN_ON_ONCE(msk->wmem_reserved);
WARN_ON_ONCE(msk->rmem_released);
sk_stream_kill_queues(sk);
xfrm_sk_free_policy(sk);
sk_refcnt_debug_release(sk);
......@@ -2326,8 +2639,8 @@ struct sock *mptcp_sk_clone(const struct sock *sk,
msk->write_seq = subflow_req->idsn + 1;
msk->snd_nxt = msk->write_seq;
atomic64_set(&msk->snd_una, msk->write_seq);
atomic64_set(&msk->wnd_end, msk->snd_nxt + req->rsk_rcv_wnd);
msk->snd_una = msk->write_seq;
msk->wnd_end = msk->snd_nxt + req->rsk_rcv_wnd;
if (mp_opt->mp_capable) {
msk->can_ack = true;
......@@ -2363,7 +2676,7 @@ void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk)
if (msk->rcvq_space.space == 0)
msk->rcvq_space.space = TCP_INIT_CWND * TCP_MSS_DEFAULT;
atomic64_set(&msk->wnd_end, msk->snd_nxt + tcp_sk(ssk)->snd_wnd);
WRITE_ONCE(msk->wnd_end, msk->snd_nxt + tcp_sk(ssk)->snd_wnd);
}
static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
......@@ -2414,6 +2727,13 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
void mptcp_destroy_common(struct mptcp_sock *msk)
{
struct sock *sk = (struct sock *)msk;
__mptcp_clear_xmit(sk);
/* move to sk_receive_queue, sk_stream_kill_queues will purge it */
skb_queue_splice_tail_init(&msk->receive_queue, &sk->sk_receive_queue);
skb_rbtree_purge(&msk->out_of_order_queue);
mptcp_token_destroy(msk);
mptcp_pm_free_anno_list(msk);
......@@ -2423,9 +2743,6 @@ static void mptcp_destroy(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
if (msk->cached_ext)
__skb_ext_put(msk->cached_ext);
mptcp_destroy_common(msk);
sk_sockets_allocated_dec(sk);
}
......@@ -2540,15 +2857,58 @@ static int mptcp_getsockopt(struct sock *sk, int level, int optname,
return -EOPNOTSUPP;
}
void __mptcp_data_acked(struct sock *sk)
{
if (!sock_owned_by_user(sk))
__mptcp_clean_una(sk);
else
set_bit(MPTCP_CLEAN_UNA, &mptcp_sk(sk)->flags);
if (mptcp_pending_data_fin_ack(sk))
mptcp_schedule_work(sk);
}
void __mptcp_wnd_updated(struct sock *sk, struct sock *ssk)
{
if (!mptcp_send_head(sk))
return;
if (!sock_owned_by_user(sk))
__mptcp_subflow_push_pending(sk, ssk);
else
set_bit(MPTCP_PUSH_PENDING, &mptcp_sk(sk)->flags);
}
#define MPTCP_DEFERRED_ALL (TCPF_WRITE_TIMER_DEFERRED)
/* this is very alike tcp_release_cb() but we must handle differently a
* different set of events
*/
/* processes deferred events and flush wmem */
static void mptcp_release_cb(struct sock *sk)
{
unsigned long flags, nflags;
/* push_pending may touch wmem_reserved, do it before the later
* cleanup
*/
if (test_and_clear_bit(MPTCP_CLEAN_UNA, &mptcp_sk(sk)->flags))
__mptcp_clean_una(sk);
if (test_and_clear_bit(MPTCP_PUSH_PENDING, &mptcp_sk(sk)->flags)) {
/* mptcp_push_pending() acquires the subflow socket lock
*
* 1) can't be invoked in atomic scope
* 2) must avoid ABBA deadlock with msk socket spinlock: the RX
* datapath acquires the msk socket spinlock while helding
* the subflow socket lock
*/
spin_unlock_bh(&sk->sk_lock.slock);
mptcp_push_pending(sk, 0);
spin_lock_bh(&sk->sk_lock.slock);
}
/* clear any wmem reservation and errors */
__mptcp_update_wmem(sk);
__mptcp_update_rmem(sk);
do {
flags = sk->sk_tsq_flags;
if (!(flags & MPTCP_DEFERRED_ALL))
......@@ -2619,7 +2979,7 @@ void mptcp_finish_connect(struct sock *ssk)
WRITE_ONCE(msk->ack_seq, ack_seq);
WRITE_ONCE(msk->rcv_wnd_sent, ack_seq);
WRITE_ONCE(msk->can_ack, 1);
atomic64_set(&msk->snd_una, msk->write_seq);
WRITE_ONCE(msk->snd_una, msk->write_seq);
mptcp_pm_new_connection(msk, 0);
......@@ -2880,24 +3240,9 @@ static __poll_t mptcp_check_readable(struct mptcp_sock *msk)
0;
}
static bool __mptcp_check_writeable(struct mptcp_sock *msk)
{
struct sock *sk = (struct sock *)msk;
bool mptcp_writable;
mptcp_clean_una(sk);
mptcp_writable = sk_stream_is_writeable(sk);
if (!mptcp_writable)
mptcp_nospace(msk);
return mptcp_writable;
}
static __poll_t mptcp_check_writeable(struct mptcp_sock *msk)
{
struct sock *sk = (struct sock *)msk;
__poll_t ret = 0;
bool slow;
if (unlikely(sk->sk_shutdown & SEND_SHUTDOWN))
return 0;
......@@ -2905,12 +3250,12 @@ static __poll_t mptcp_check_writeable(struct mptcp_sock *msk)
if (sk_stream_is_writeable(sk))
return EPOLLOUT | EPOLLWRNORM;
slow = lock_sock_fast(sk);
if (__mptcp_check_writeable(msk))
ret = EPOLLOUT | EPOLLWRNORM;
set_bit(MPTCP_NOSPACE, &msk->flags);
smp_mb__after_atomic(); /* msk->flags is changed by write_space cb */
if (sk_stream_is_writeable(sk))
return EPOLLOUT | EPOLLWRNORM;
unlock_sock_fast(sk, slow);
return ret;
return 0;
}
static __poll_t mptcp_poll(struct file *file, struct socket *sock,
......
......@@ -91,6 +91,8 @@
#define MPTCP_WORK_EOF 3
#define MPTCP_FALLBACK_DONE 4
#define MPTCP_WORK_CLOSE_SUBFLOW 5
#define MPTCP_PUSH_PENDING 6
#define MPTCP_CLEAN_UNA 7
static inline bool before64(__u64 seq1, __u64 seq2)
{
......@@ -218,14 +220,16 @@ struct mptcp_sock {
u64 ack_seq;
u64 rcv_wnd_sent;
u64 rcv_data_fin_seq;
int wmem_reserved;
struct sock *last_snd;
int snd_burst;
int old_wspace;
atomic64_t snd_una;
atomic64_t wnd_end;
u64 snd_una;
u64 wnd_end;
unsigned long timer_ival;
u32 token;
int rmem_pending;
int rmem_released;
unsigned long flags;
bool can_ack;
bool fully_established;
......@@ -237,11 +241,14 @@ struct mptcp_sock {
struct work_struct work;
struct sk_buff *ooo_last_skb;
struct rb_root out_of_order_queue;
struct sk_buff_head receive_queue;
struct sk_buff_head skb_tx_cache; /* this is wmem accounted */
int tx_pending_data;
int size_goal_cache;
struct list_head conn_list;
struct list_head rtx_queue;
struct mptcp_data_frag *first_pending;
struct list_head join_list;
struct skb_ext *cached_ext; /* for the next sendmsg */
struct socket *subflow; /* outgoing connect/listener/!mp_capable */
struct sock *first;
struct mptcp_pm_data pm;
......@@ -253,6 +260,22 @@ struct mptcp_sock {
} rcvq_space;
};
#define mptcp_lock_sock(___sk, cb) do { \
struct sock *__sk = (___sk); /* silence macro reuse warning */ \
might_sleep(); \
spin_lock_bh(&__sk->sk_lock.slock); \
if (__sk->sk_lock.owned) \
__lock_sock(__sk); \
cb; \
__sk->sk_lock.owned = 1; \
spin_unlock(&__sk->sk_lock.slock); \
mutex_acquire(&__sk->sk_lock.dep_map, 0, 0, _RET_IP_); \
local_bh_enable(); \
} while (0)
#define mptcp_data_lock(sk) spin_lock_bh(&(sk)->sk_lock.slock)
#define mptcp_data_unlock(sk) spin_unlock_bh(&(sk)->sk_lock.slock)
#define mptcp_for_each_subflow(__msk, __subflow) \
list_for_each_entry(__subflow, &((__msk)->conn_list), node)
......@@ -300,7 +323,7 @@ static inline struct mptcp_data_frag *mptcp_rtx_tail(const struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
if (!before64(msk->snd_nxt, atomic64_read(&msk->snd_una)))
if (!before64(msk->snd_nxt, READ_ONCE(msk->snd_una)))
return NULL;
return list_last_entry(&msk->rtx_queue, struct mptcp_data_frag, list);
......@@ -474,7 +497,8 @@ void mptcp_rcv_space_init(struct mptcp_sock *msk, const struct sock *ssk);
void mptcp_data_ready(struct sock *sk, struct sock *ssk);
bool mptcp_finish_join(struct sock *sk);
bool mptcp_schedule_work(struct sock *sk);
void mptcp_data_acked(struct sock *sk);
void __mptcp_wnd_updated(struct sock *sk, struct sock *ssk);
void __mptcp_data_acked(struct sock *sk);
void mptcp_subflow_eof(struct sock *sk);
bool mptcp_update_rcv_data_fin(struct mptcp_sock *msk, u64 data_fin_seq, bool use_64bit);
void __mptcp_flush_join_list(struct mptcp_sock *msk);
......
......@@ -995,19 +995,9 @@ static void subflow_data_ready(struct sock *sk)
mptcp_data_ready(parent, sk);
}
static void subflow_write_space(struct sock *sk)
static void subflow_write_space(struct sock *ssk)
{
struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(sk);
struct socket *sock = READ_ONCE(sk->sk_socket);
struct sock *parent = subflow->conn;
if (!sk_stream_is_writeable(sk))
return;
if (sock && sk_stream_is_writeable(parent))
clear_bit(SOCK_NOSPACE, &sock->flags);
sk_stream_write_space(parent);
/* we take action in __mptcp_clean_una() */
}
static struct inet_connection_sock_af_ops *
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment