Commit 7282c126 authored by David S. Miller's avatar David S. Miller

Merge branch 'smc-datapath-opts'

Dust Li says:

====================
net/smc: some datapath performance optimizations

This series tries to improve the performance of SMC in datapath.

- patch #1, add sysctl interface to support tuning the behaviour of
  SMC in container environment.

- patch #2/#3, add autocorking support which is very efficient for small
  messages without trade-off for latency.

- patch #4, send directly on setting TCP_NODELAY, without wake up the
  TX worker, this make it consistent with clearing TCP_CORK.

- patch #5, this correct the setting of RMB window update limit, so
  we don't send CDC messages to update peer's RMB window too frequently
  in some cases.

- patch #6, implemented something like NAPI in SMC, decrease the number
  of hardirq when busy.

- patch #7, this moves TX work doing in the BH to the user context when
  sock_lock is hold by user.

With this patchset applied, we can get a good performance gain:
- qperf tcp_bw test has shown a great improvement. Other benchmarks like
  'netperf TCP_STREAM' or 'sockperf throughput' has similar result.
- In my testing environment, running qperf tcp_bw and tcp_lat, SMC behaves
  better then TCP in most all message size.

Here are some test results with the following testing command:
client: smc_run taskset -c 1 qperf smc-server -oo msg_size:1:64K:*2 \
		-t 30 -vu tcp_{bw|lat}
server: smc_run taskset -c 1 qperf

==== Bandwidth ====
 MsgSize        Origin SMC              TCP                SMC with patches
       1         0.578 MB/s      2.392 MB/s(313.57%)      2.561 MB/s(342.83%)
       2         1.159 MB/s      4.780 MB/s(312.53%)      5.162 MB/s(345.46%)
       4         2.283 MB/s     10.266 MB/s(349.77%)     10.122 MB/s(343.46%)
       8         4.668 MB/s     19.040 MB/s(307.86%)     20.521 MB/s(339.59%)
      16         9.147 MB/s     38.904 MB/s(325.31%)     40.823 MB/s(346.29%)
      32        18.369 MB/s     79.587 MB/s(333.25%)     80.535 MB/s(338.42%)
      64        36.562 MB/s    148.668 MB/s(306.61%)    158.170 MB/s(332.60%)
     128        72.961 MB/s    274.913 MB/s(276.80%)    316.217 MB/s(333.41%)
     256       144.705 MB/s    512.059 MB/s(253.86%)    626.019 MB/s(332.62%)
     512       288.873 MB/s    884.977 MB/s(206.35%)   1221.596 MB/s(322.88%)
    1024       574.180 MB/s   1337.736 MB/s(132.98%)   2203.156 MB/s(283.70%)
    2048      1095.192 MB/s   1865.952 MB/s( 70.38%)   3036.448 MB/s(177.25%)
    4096      2066.157 MB/s   2380.337 MB/s( 15.21%)   3834.271 MB/s( 85.58%)
    8192      3717.198 MB/s   2733.073 MB/s(-26.47%)   4904.910 MB/s( 31.95%)
   16384      4742.221 MB/s   2958.693 MB/s(-37.61%)   5220.272 MB/s( 10.08%)
   32768      5349.550 MB/s   3061.285 MB/s(-42.77%)   5321.865 MB/s( -0.52%)
   65536      5162.919 MB/s   3731.408 MB/s(-27.73%)   5245.021 MB/s(  1.59%)
==== Latency ====
 MsgSize        Origin SMC              TCP                SMC with patches
       1        10.540 us     11.938 us( 13.26%)         10.356 us( -1.75%)
       2        10.996 us     11.992 us(  9.06%)         10.073 us( -8.39%)
       4        10.229 us     11.687 us( 14.25%)          9.996 us( -2.28%)
       8        10.203 us     11.653 us( 14.21%)         10.063 us( -1.37%)
      16        10.530 us     11.313 us(  7.44%)         10.013 us( -4.91%)
      32        10.241 us     11.586 us( 13.13%)         10.081 us( -1.56%)
      64        10.693 us     11.652 us(  8.97%)          9.986 us( -6.61%)
     128        10.597 us     11.579 us(  9.27%)         10.262 us( -3.16%)
     256        10.409 us     11.957 us( 14.87%)         10.148 us( -2.51%)
     512        11.088 us     12.505 us( 12.78%)         10.206 us( -7.95%)
    1024        11.240 us     12.255 us(  9.03%)         10.631 us( -5.42%)
    2048        11.485 us     16.970 us( 47.76%)         10.981 us( -4.39%)
    4096        12.077 us     13.948 us( 15.49%)         11.847 us( -1.90%)
    8192        13.683 us     16.693 us( 22.00%)         13.336 us( -2.54%)
   16384        16.470 us     23.615 us( 43.38%)         16.519 us(  0.30%)
   32768        22.540 us     40.966 us( 81.75%)         22.452 us( -0.39%)
   65536        34.192 us     73.003 us(113.51%)         33.916 us( -0.81%)

------------
Test environment notes:
1. Testing is run on 2 VMs within the same physical host
2. The NIC is ConnectX-4Lx, using SRIOV, and passing through 2 VFs to the
   2 VMs respectively.
3. To decrease jitter, VM's vCPU are binded to each physical CPU, and those
   physical CPUs are all isolated using boot parameter `isolcpus=xxx`
4. The queue number are set to 1, and interrupt from the queue is binded to
   CPU0 in the guest
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 1e385c08 6b88af83
.. SPDX-License-Identifier: GPL-2.0
=========
SMC Sysctl
=========
/proc/sys/net/smc/* Variables
==============================
autocorking_size - INTEGER
Setting SMC auto corking size:
SMC auto corking is like TCP auto corking from the application's
perspective of view. When applications do consecutive small
write()/sendmsg() system calls, we try to coalesce these small writes
as much as possible, to lower total amount of CDC and RDMA Write been
sent.
autocorking_size limits the maximum corked bytes that can be sent to
the under device in 1 single sending. If set to 0, the SMC auto corking
is disabled.
Applications can still use TCP_CORK for optimal behavior when they
know how/when to uncork their sockets.
Default: 64K
...@@ -14,5 +14,9 @@ struct netns_smc { ...@@ -14,5 +14,9 @@ struct netns_smc {
struct smc_stats_rsn *fback_rsn; struct smc_stats_rsn *fback_rsn;
bool limit_smc_hs; /* constraint on handshake */ bool limit_smc_hs; /* constraint on handshake */
#ifdef CONFIG_SYSCTL
struct ctl_table_header *smc_hdr;
#endif
unsigned int sysctl_autocorking_size;
}; };
#endif #endif
...@@ -4,4 +4,4 @@ obj-$(CONFIG_SMC) += smc.o ...@@ -4,4 +4,4 @@ obj-$(CONFIG_SMC) += smc.o
obj-$(CONFIG_SMC_DIAG) += smc_diag.o obj-$(CONFIG_SMC_DIAG) += smc_diag.o
smc-y := af_smc.o smc_pnet.o smc_ib.o smc_clc.o smc_core.o smc_wr.o smc_llc.o smc-y := af_smc.o smc_pnet.o smc_ib.o smc_clc.o smc_core.o smc_wr.o smc_llc.o
smc-y += smc_cdc.o smc_tx.o smc_rx.o smc_close.o smc_ism.o smc_netlink.o smc_stats.o smc-y += smc_cdc.o smc_tx.o smc_rx.o smc_close.o smc_ism.o smc_netlink.o smc_stats.o
smc-y += smc_tracepoint.o smc-y += smc_tracepoint.o smc_sysctl.o
...@@ -51,6 +51,7 @@ ...@@ -51,6 +51,7 @@
#include "smc_close.h" #include "smc_close.h"
#include "smc_stats.h" #include "smc_stats.h"
#include "smc_tracepoint.h" #include "smc_tracepoint.h"
#include "smc_sysctl.h"
static DEFINE_MUTEX(smc_server_lgr_pending); /* serialize link group static DEFINE_MUTEX(smc_server_lgr_pending); /* serialize link group
* creation on server * creation on server
...@@ -192,12 +193,27 @@ void smc_unhash_sk(struct sock *sk) ...@@ -192,12 +193,27 @@ void smc_unhash_sk(struct sock *sk)
} }
EXPORT_SYMBOL_GPL(smc_unhash_sk); EXPORT_SYMBOL_GPL(smc_unhash_sk);
/* This will be called before user really release sock_lock. So do the
* work which we didn't do because of user hold the sock_lock in the
* BH context
*/
static void smc_release_cb(struct sock *sk)
{
struct smc_sock *smc = smc_sk(sk);
if (smc->conn.tx_in_release_sock) {
smc_tx_pending(&smc->conn);
smc->conn.tx_in_release_sock = false;
}
}
struct proto smc_proto = { struct proto smc_proto = {
.name = "SMC", .name = "SMC",
.owner = THIS_MODULE, .owner = THIS_MODULE,
.keepalive = smc_set_keepalive, .keepalive = smc_set_keepalive,
.hash = smc_hash_sk, .hash = smc_hash_sk,
.unhash = smc_unhash_sk, .unhash = smc_unhash_sk,
.release_cb = smc_release_cb,
.obj_size = sizeof(struct smc_sock), .obj_size = sizeof(struct smc_sock),
.h.smc_hash = &smc_v4_hashinfo, .h.smc_hash = &smc_v4_hashinfo,
.slab_flags = SLAB_TYPESAFE_BY_RCU, .slab_flags = SLAB_TYPESAFE_BY_RCU,
...@@ -210,6 +226,7 @@ struct proto smc_proto6 = { ...@@ -210,6 +226,7 @@ struct proto smc_proto6 = {
.keepalive = smc_set_keepalive, .keepalive = smc_set_keepalive,
.hash = smc_hash_sk, .hash = smc_hash_sk,
.unhash = smc_unhash_sk, .unhash = smc_unhash_sk,
.release_cb = smc_release_cb,
.obj_size = sizeof(struct smc_sock), .obj_size = sizeof(struct smc_sock),
.h.smc_hash = &smc_v6_hashinfo, .h.smc_hash = &smc_v6_hashinfo,
.slab_flags = SLAB_TYPESAFE_BY_RCU, .slab_flags = SLAB_TYPESAFE_BY_RCU,
...@@ -2795,8 +2812,8 @@ static int smc_setsockopt(struct socket *sock, int level, int optname, ...@@ -2795,8 +2812,8 @@ static int smc_setsockopt(struct socket *sock, int level, int optname,
sk->sk_state != SMC_CLOSED) { sk->sk_state != SMC_CLOSED) {
if (val) { if (val) {
SMC_STAT_INC(smc, ndly_cnt); SMC_STAT_INC(smc, ndly_cnt);
mod_delayed_work(smc->conn.lgr->tx_wq, smc_tx_pending(&smc->conn);
&smc->conn.tx_work, 0); cancel_delayed_work(&smc->conn.tx_work);
} }
} }
break; break;
...@@ -3273,9 +3290,17 @@ static int __init smc_init(void) ...@@ -3273,9 +3290,17 @@ static int __init smc_init(void)
goto out_sock; goto out_sock;
} }
rc = smc_sysctl_init();
if (rc) {
pr_err("%s: sysctl_init fails with %d\n", __func__, rc);
goto out_ulp;
}
static_branch_enable(&tcp_have_smc); static_branch_enable(&tcp_have_smc);
return 0; return 0;
out_ulp:
tcp_unregister_ulp(&smc_ulp_ops);
out_sock: out_sock:
sock_unregister(PF_SMC); sock_unregister(PF_SMC);
out_proto6: out_proto6:
...@@ -3303,6 +3328,7 @@ static int __init smc_init(void) ...@@ -3303,6 +3328,7 @@ static int __init smc_init(void)
static void __exit smc_exit(void) static void __exit smc_exit(void)
{ {
static_branch_disable(&tcp_have_smc); static_branch_disable(&tcp_have_smc);
smc_sysctl_exit();
tcp_unregister_ulp(&smc_ulp_ops); tcp_unregister_ulp(&smc_ulp_ops);
sock_unregister(PF_SMC); sock_unregister(PF_SMC);
smc_core_exit(); smc_core_exit();
......
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
#define SMC_MAX_ISM_DEVS 8 /* max # of proposed non-native ISM #define SMC_MAX_ISM_DEVS 8 /* max # of proposed non-native ISM
* devices * devices
*/ */
#define SMC_AUTOCORKING_DEFAULT_SIZE 0x10000 /* 64K by default */
extern struct proto smc_proto; extern struct proto smc_proto;
extern struct proto smc_proto6; extern struct proto smc_proto6;
...@@ -192,6 +193,7 @@ struct smc_connection { ...@@ -192,6 +193,7 @@ struct smc_connection {
* - dec on polled tx cqe * - dec on polled tx cqe
*/ */
wait_queue_head_t cdc_pend_tx_wq; /* wakeup on no cdc_pend_tx_wr*/ wait_queue_head_t cdc_pend_tx_wq; /* wakeup on no cdc_pend_tx_wr*/
atomic_t tx_pushing; /* nr_threads trying tx push */
struct delayed_work tx_work; /* retry of smc_cdc_msg_send */ struct delayed_work tx_work; /* retry of smc_cdc_msg_send */
u32 tx_off; /* base offset in peer rmb */ u32 tx_off; /* base offset in peer rmb */
...@@ -211,6 +213,10 @@ struct smc_connection { ...@@ -211,6 +213,10 @@ struct smc_connection {
* data still pending * data still pending
*/ */
char urg_rx_byte; /* urgent byte */ char urg_rx_byte; /* urgent byte */
bool tx_in_release_sock;
/* flush pending tx data in
* sock release_cb()
*/
atomic_t bytes_to_rcv; /* arrived data, atomic_t bytes_to_rcv; /* arrived data,
* not yet received * not yet received
*/ */
......
...@@ -48,9 +48,19 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd, ...@@ -48,9 +48,19 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd,
conn->tx_cdc_seq_fin = cdcpend->ctrl_seq; conn->tx_cdc_seq_fin = cdcpend->ctrl_seq;
} }
if (atomic_dec_and_test(&conn->cdc_pend_tx_wr) && if (atomic_dec_and_test(&conn->cdc_pend_tx_wr)) {
unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq))) /* If user owns the sock_lock, mark the connection need sending.
wake_up(&conn->cdc_pend_tx_wq); * User context will later try to send when it release sock_lock
* in smc_release_cb()
*/
if (sock_owned_by_user(&smc->sk))
conn->tx_in_release_sock = true;
else
smc_tx_pending(conn);
if (unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq)))
wake_up(&conn->cdc_pend_tx_wq);
}
WARN_ON(atomic_read(&conn->cdc_pend_tx_wr) < 0); WARN_ON(atomic_read(&conn->cdc_pend_tx_wr) < 0);
smc_tx_sndbuf_nonfull(smc); smc_tx_sndbuf_nonfull(smc);
...@@ -350,8 +360,12 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc, ...@@ -350,8 +360,12 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc,
/* trigger sndbuf consumer: RDMA write into peer RMBE and CDC */ /* trigger sndbuf consumer: RDMA write into peer RMBE and CDC */
if ((diff_cons && smc_tx_prepared_sends(conn)) || if ((diff_cons && smc_tx_prepared_sends(conn)) ||
conn->local_rx_ctrl.prod_flags.cons_curs_upd_req || conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
conn->local_rx_ctrl.prod_flags.urg_data_pending) conn->local_rx_ctrl.prod_flags.urg_data_pending) {
smc_tx_sndbuf_nonempty(conn); if (!sock_owned_by_user(&smc->sk))
smc_tx_pending(conn);
else
conn->tx_in_release_sock = true;
}
if (diff_cons && conn->urg_tx_pend && if (diff_cons && conn->urg_tx_pend &&
atomic_read(&conn->peer_rmbe_space) == conn->peer_rmbe_size) { atomic_read(&conn->peer_rmbe_space) == conn->peer_rmbe_size) {
......
...@@ -1988,7 +1988,7 @@ static struct smc_buf_desc *smc_buf_get_slot(int compressed_bufsize, ...@@ -1988,7 +1988,7 @@ static struct smc_buf_desc *smc_buf_get_slot(int compressed_bufsize,
*/ */
static inline int smc_rmb_wnd_update_limit(int rmbe_size) static inline int smc_rmb_wnd_update_limit(int rmbe_size)
{ {
return min_t(int, rmbe_size / 10, SOCK_MIN_SNDBUF / 2); return max_t(int, rmbe_size / 10, SOCK_MIN_SNDBUF / 2);
} }
/* map an rmb buf to a link */ /* map an rmb buf to a link */
......
// SPDX-License-Identifier: GPL-2.0
/*
* Shared Memory Communications over RDMA (SMC-R) and RoCE
*
* smc_sysctl.c: sysctl interface to SMC subsystem.
*
* Copyright (c) 2022, Alibaba Inc.
*
* Author: Tony Lu <tonylu@linux.alibaba.com>
*
*/
#include <linux/init.h>
#include <linux/sysctl.h>
#include <net/net_namespace.h>
#include "smc.h"
#include "smc_sysctl.h"
static struct ctl_table smc_table[] = {
{
.procname = "autocorking_size",
.data = &init_net.smc.sysctl_autocorking_size,
.maxlen = sizeof(unsigned int),
.mode = 0644,
.proc_handler = proc_douintvec,
},
{ }
};
static __net_init int smc_sysctl_init_net(struct net *net)
{
struct ctl_table *table;
table = smc_table;
if (!net_eq(net, &init_net)) {
int i;
table = kmemdup(table, sizeof(smc_table), GFP_KERNEL);
if (!table)
goto err_alloc;
for (i = 0; i < ARRAY_SIZE(smc_table) - 1; i++)
table[i].data += (void *)net - (void *)&init_net;
}
net->smc.smc_hdr = register_net_sysctl(net, "net/smc", table);
if (!net->smc.smc_hdr)
goto err_reg;
net->smc.sysctl_autocorking_size = SMC_AUTOCORKING_DEFAULT_SIZE;
return 0;
err_reg:
if (!net_eq(net, &init_net))
kfree(table);
err_alloc:
return -ENOMEM;
}
static __net_exit void smc_sysctl_exit_net(struct net *net)
{
unregister_net_sysctl_table(net->smc.smc_hdr);
}
static struct pernet_operations smc_sysctl_ops __net_initdata = {
.init = smc_sysctl_init_net,
.exit = smc_sysctl_exit_net,
};
int __init smc_sysctl_init(void)
{
return register_pernet_subsys(&smc_sysctl_ops);
}
void smc_sysctl_exit(void)
{
unregister_pernet_subsys(&smc_sysctl_ops);
}
/* SPDX-License-Identifier: GPL-2.0 */
/*
* Shared Memory Communications over RDMA (SMC-R) and RoCE
*
* smc_sysctl.c: sysctl interface to SMC subsystem.
*
* Copyright (c) 2022, Alibaba Inc.
*
* Author: Tony Lu <tonylu@linux.alibaba.com>
*
*/
#ifndef _SMC_SYSCTL_H
#define _SMC_SYSCTL_H
#ifdef CONFIG_SYSCTL
int smc_sysctl_init(void);
void smc_sysctl_exit(void);
#else
int smc_sysctl_init(void)
{
return 0;
}
void smc_sysctl_exit(void) { }
#endif /* CONFIG_SYSCTL */
#endif /* _SMC_SYSCTL_H */
...@@ -131,6 +131,51 @@ static bool smc_tx_is_corked(struct smc_sock *smc) ...@@ -131,6 +131,51 @@ static bool smc_tx_is_corked(struct smc_sock *smc)
return (tp->nonagle & TCP_NAGLE_CORK) ? true : false; return (tp->nonagle & TCP_NAGLE_CORK) ? true : false;
} }
/* If we have pending CDC messages, do not send:
* Because CQE of this CDC message will happen shortly, it gives
* a chance to coalesce future sendmsg() payload in to one RDMA Write,
* without need for a timer, and with no latency trade off.
* Algorithm here:
* 1. First message should never cork
* 2. If we have pending Tx CDC messages, wait for the first CDC
* message's completion
* 3. Don't cork to much data in a single RDMA Write to prevent burst
* traffic, total corked message should not exceed sendbuf/2
*/
static bool smc_should_autocork(struct smc_sock *smc)
{
struct smc_connection *conn = &smc->conn;
int corking_size;
corking_size = min(sock_net(&smc->sk)->smc.sysctl_autocorking_size,
conn->sndbuf_desc->len >> 1);
if (atomic_read(&conn->cdc_pend_tx_wr) == 0 ||
smc_tx_prepared_sends(conn) > corking_size)
return false;
return true;
}
static bool smc_tx_should_cork(struct smc_sock *smc, struct msghdr *msg)
{
struct smc_connection *conn = &smc->conn;
if (smc_should_autocork(smc))
return true;
/* for a corked socket defer the RDMA writes if
* sndbuf_space is still available. The applications
* should known how/when to uncork it.
*/
if ((msg->msg_flags & MSG_MORE ||
smc_tx_is_corked(smc) ||
msg->msg_flags & MSG_SENDPAGE_NOTLAST) &&
atomic_read(&conn->sndbuf_space))
return true;
return false;
}
/* sndbuf producer: main API called by socket layer. /* sndbuf producer: main API called by socket layer.
* called under sock lock. * called under sock lock.
*/ */
...@@ -235,13 +280,10 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len) ...@@ -235,13 +280,10 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
*/ */
if ((msg->msg_flags & MSG_OOB) && !send_remaining) if ((msg->msg_flags & MSG_OOB) && !send_remaining)
conn->urg_tx_pend = true; conn->urg_tx_pend = true;
/* for a corked socket defer the RDMA writes if /* If we need to cork, do nothing and wait for the next
* sndbuf_space is still available. The applications * sendmsg() call or push on tx completion
* should known how/when to uncork it.
*/ */
if (!((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc) || if (!smc_tx_should_cork(smc, msg))
msg->msg_flags & MSG_SENDPAGE_NOTLAST) &&
atomic_read(&conn->sndbuf_space)))
smc_tx_sndbuf_nonempty(conn); smc_tx_sndbuf_nonempty(conn);
trace_smc_tx_sendmsg(smc, copylen); trace_smc_tx_sendmsg(smc, copylen);
...@@ -589,13 +631,26 @@ static int smcd_tx_sndbuf_nonempty(struct smc_connection *conn) ...@@ -589,13 +631,26 @@ static int smcd_tx_sndbuf_nonempty(struct smc_connection *conn)
return rc; return rc;
} }
int smc_tx_sndbuf_nonempty(struct smc_connection *conn) static int __smc_tx_sndbuf_nonempty(struct smc_connection *conn)
{ {
int rc; struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
int rc = 0;
/* No data in the send queue */
if (unlikely(smc_tx_prepared_sends(conn) <= 0))
goto out;
/* Peer don't have RMBE space */
if (unlikely(atomic_read(&conn->peer_rmbe_space) <= 0)) {
SMC_STAT_RMB_TX_PEER_FULL(smc, !conn->lnk);
goto out;
}
if (conn->killed || if (conn->killed ||
conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
return -EPIPE; /* connection being aborted */ rc = -EPIPE; /* connection being aborted */
goto out;
}
if (conn->lgr->is_smcd) if (conn->lgr->is_smcd)
rc = smcd_tx_sndbuf_nonempty(conn); rc = smcd_tx_sndbuf_nonempty(conn);
else else
...@@ -603,10 +658,38 @@ int smc_tx_sndbuf_nonempty(struct smc_connection *conn) ...@@ -603,10 +658,38 @@ int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
if (!rc) { if (!rc) {
/* trigger socket release if connection is closing */ /* trigger socket release if connection is closing */
struct smc_sock *smc = container_of(conn, struct smc_sock,
conn);
smc_close_wake_tx_prepared(smc); smc_close_wake_tx_prepared(smc);
} }
out:
return rc;
}
int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
{
int rc;
/* This make sure only one can send simultaneously to prevent wasting
* of CPU and CDC slot.
* Record whether someone has tried to push while we are pushing.
*/
if (atomic_inc_return(&conn->tx_pushing) > 1)
return 0;
again:
atomic_set(&conn->tx_pushing, 1);
smp_wmb(); /* Make sure tx_pushing is 1 before real send */
rc = __smc_tx_sndbuf_nonempty(conn);
/* We need to check whether someone else have added some data into
* the send queue and tried to push but failed after the atomic_set()
* when we are pushing.
* If so, we need to push again to prevent those data hang in the send
* queue.
*/
if (unlikely(!atomic_dec_and_test(&conn->tx_pushing)))
goto again;
return rc; return rc;
} }
......
...@@ -137,25 +137,28 @@ static void smc_wr_tx_tasklet_fn(struct tasklet_struct *t) ...@@ -137,25 +137,28 @@ static void smc_wr_tx_tasklet_fn(struct tasklet_struct *t)
{ {
struct smc_ib_device *dev = from_tasklet(dev, t, send_tasklet); struct smc_ib_device *dev = from_tasklet(dev, t, send_tasklet);
struct ib_wc wc[SMC_WR_MAX_POLL_CQE]; struct ib_wc wc[SMC_WR_MAX_POLL_CQE];
int i = 0, rc; int i, rc;
int polled = 0;
again: again:
polled++;
do { do {
memset(&wc, 0, sizeof(wc)); memset(&wc, 0, sizeof(wc));
rc = ib_poll_cq(dev->roce_cq_send, SMC_WR_MAX_POLL_CQE, wc); rc = ib_poll_cq(dev->roce_cq_send, SMC_WR_MAX_POLL_CQE, wc);
if (polled == 1) {
ib_req_notify_cq(dev->roce_cq_send,
IB_CQ_NEXT_COMP |
IB_CQ_REPORT_MISSED_EVENTS);
}
if (!rc)
break;
for (i = 0; i < rc; i++) for (i = 0; i < rc; i++)
smc_wr_tx_process_cqe(&wc[i]); smc_wr_tx_process_cqe(&wc[i]);
if (rc < SMC_WR_MAX_POLL_CQE)
/* If < SMC_WR_MAX_POLL_CQE, the CQ should have been
* drained, no need to poll again. --Guangguan Wang
*/
break;
} while (rc > 0); } while (rc > 0);
if (polled == 1)
/* IB_CQ_REPORT_MISSED_EVENTS make sure if ib_req_notify_cq() returns
* 0, it is safe to wait for the next event.
* Else we must poll the CQ again to make sure we won't miss any event
*/
if (ib_req_notify_cq(dev->roce_cq_send,
IB_CQ_NEXT_COMP |
IB_CQ_REPORT_MISSED_EVENTS))
goto again; goto again;
} }
...@@ -478,24 +481,28 @@ static void smc_wr_rx_tasklet_fn(struct tasklet_struct *t) ...@@ -478,24 +481,28 @@ static void smc_wr_rx_tasklet_fn(struct tasklet_struct *t)
{ {
struct smc_ib_device *dev = from_tasklet(dev, t, recv_tasklet); struct smc_ib_device *dev = from_tasklet(dev, t, recv_tasklet);
struct ib_wc wc[SMC_WR_MAX_POLL_CQE]; struct ib_wc wc[SMC_WR_MAX_POLL_CQE];
int polled = 0;
int rc; int rc;
again: again:
polled++;
do { do {
memset(&wc, 0, sizeof(wc)); memset(&wc, 0, sizeof(wc));
rc = ib_poll_cq(dev->roce_cq_recv, SMC_WR_MAX_POLL_CQE, wc); rc = ib_poll_cq(dev->roce_cq_recv, SMC_WR_MAX_POLL_CQE, wc);
if (polled == 1) { if (rc > 0)
ib_req_notify_cq(dev->roce_cq_recv, smc_wr_rx_process_cqes(&wc[0], rc);
IB_CQ_SOLICITED_MASK if (rc < SMC_WR_MAX_POLL_CQE)
| IB_CQ_REPORT_MISSED_EVENTS); /* If < SMC_WR_MAX_POLL_CQE, the CQ should have been
} * drained, no need to poll again. --Guangguan Wang
if (!rc) */
break; break;
smc_wr_rx_process_cqes(&wc[0], rc);
} while (rc > 0); } while (rc > 0);
if (polled == 1)
/* IB_CQ_REPORT_MISSED_EVENTS make sure if ib_req_notify_cq() returns
* 0, it is safe to wait for the next event.
* Else we must poll the CQ again to make sure we won't miss any event
*/
if (ib_req_notify_cq(dev->roce_cq_recv,
IB_CQ_SOLICITED_MASK |
IB_CQ_REPORT_MISSED_EVENTS))
goto again; goto again;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment