Commit 68e995a2 authored by Pradeep Satyanarayana's avatar Pradeep Satyanarayana Committed by Roland Dreier

IPoIB/cm: Add connected mode support for devices without SRQs

Some IB adapters (notably IBM's eHCA) do not implement SRQs (shared
receive queues).  The current IPoIB connected mode support only works
on devices that support SRQs.

Fix this by adding support for using the receive queue of each
connected mode receive QP.  The disadvantage of this compared to using
an SRQ is that it means a full queue of receives must be posted for
each remote connected mode peer, which means that total memory usage
is potentially much higher than when using SRQs.  To manage this, add
a new module parameter "max_nonsrq_conn_qp" that limits the number of
connections allowed per interface.

The rest of the changes are fairly straightforward: we use a table of
struct ipoib_cm_rx to hold all the active connections, and put the
table index of the connection in the high bits of receive WR IDs.
This is needed because we cannot rely on the struct ib_wc.qp field for
non-SRQ receive completions.  Most of the rest of the changes just
test whether or not an SRQ is available, and post receives or find
received packets in the right place depending on the answer.

Cleaning up dead connections actually becomes simpler, because we do
not have to do the "last WQE reached" dance that is required to
destroy QPs attached to an SRQ.  We just move the QP to the error
state and wait for all pending receives to be flushed.
Signed-off-by: default avatarPradeep Satyanarayana <pradeeps@linux.vnet.ibm.com>

[ Completely rewritten and split up, based on Pradeep's work.  Several
  bugs fixed and no doubt several bugs introduced.  - Roland ]
Signed-off-by: default avatarRoland Dreier <rolandd@cisco.com>
parent efcd9971
......@@ -69,6 +69,7 @@ enum {
IPOIB_TX_RING_SIZE = 64,
IPOIB_MAX_QUEUE_SIZE = 8192,
IPOIB_MIN_QUEUE_SIZE = 2,
IPOIB_CM_MAX_CONN_QP = 4096,
IPOIB_NUM_WC = 4,
......@@ -188,10 +189,12 @@ enum ipoib_cm_state {
struct ipoib_cm_rx {
struct ib_cm_id *id;
struct ib_qp *qp;
struct ipoib_cm_rx_buf *rx_ring;
struct list_head list;
struct net_device *dev;
unsigned long jiffies;
enum ipoib_cm_state state;
int recv_count;
};
struct ipoib_cm_tx {
......@@ -234,6 +237,7 @@ struct ipoib_cm_dev_priv {
struct ib_wc ibwc[IPOIB_NUM_WC];
struct ib_sge rx_sge[IPOIB_CM_RX_SG];
struct ib_recv_wr rx_wr;
int nonsrq_conn_qp;
};
/*
......@@ -461,6 +465,8 @@ void ipoib_drain_cq(struct net_device *dev);
/* We don't support UC connections at the moment */
#define IPOIB_CM_SUPPORTED(ha) (ha[0] & (IPOIB_FLAGS_RC))
extern int ipoib_max_conn_qp;
static inline int ipoib_cm_admin_enabled(struct net_device *dev)
{
struct ipoib_dev_priv *priv = netdev_priv(dev);
......@@ -491,6 +497,12 @@ static inline void ipoib_cm_set(struct ipoib_neigh *neigh, struct ipoib_cm_tx *t
neigh->cm = tx;
}
static inline int ipoib_cm_has_srq(struct net_device *dev)
{
struct ipoib_dev_priv *priv = netdev_priv(dev);
return !!priv->cm.srq;
}
void ipoib_cm_send(struct net_device *dev, struct sk_buff *skb, struct ipoib_cm_tx *tx);
int ipoib_cm_dev_open(struct net_device *dev);
void ipoib_cm_dev_stop(struct net_device *dev);
......@@ -508,6 +520,8 @@ void ipoib_cm_handle_tx_wc(struct net_device *dev, struct ib_wc *wc);
struct ipoib_cm_tx;
#define ipoib_max_conn_qp 0
static inline int ipoib_cm_admin_enabled(struct net_device *dev)
{
return 0;
......@@ -533,6 +547,11 @@ static inline void ipoib_cm_set(struct ipoib_neigh *neigh, struct ipoib_cm_tx *t
{
}
static inline int ipoib_cm_has_srq(struct net_device *dev)
{
return 0;
}
static inline
void ipoib_cm_send(struct net_device *dev, struct sk_buff *skb, struct ipoib_cm_tx *tx)
{
......
......@@ -39,6 +39,15 @@
#include <linux/icmpv6.h>
#include <linux/delay.h>
#include "ipoib.h"
int ipoib_max_conn_qp = 128;
module_param_named(max_nonsrq_conn_qp, ipoib_max_conn_qp, int, 0444);
MODULE_PARM_DESC(max_nonsrq_conn_qp,
"Max number of connected-mode QPs per interface "
"(applied only if shared receive queue is not available)");
#ifdef CONFIG_INFINIBAND_IPOIB_DEBUG_DATA
static int data_debug_level;
......@@ -47,8 +56,6 @@ MODULE_PARM_DESC(cm_data_debug_level,
"Enable data path debug tracing for connected mode if > 0");
#endif
#include "ipoib.h"
#define IPOIB_CM_IETF_ID 0x1000000000000000ULL
#define IPOIB_CM_RX_UPDATE_TIME (256 * HZ)
......@@ -81,7 +88,7 @@ static void ipoib_cm_dma_unmap_rx(struct ipoib_dev_priv *priv, int frags,
ib_dma_unmap_single(priv->ca, mapping[i + 1], PAGE_SIZE, DMA_FROM_DEVICE);
}
static int ipoib_cm_post_receive(struct net_device *dev, int id)
static int ipoib_cm_post_receive_srq(struct net_device *dev, int id)
{
struct ipoib_dev_priv *priv = netdev_priv(dev);
struct ib_recv_wr *bad_wr;
......@@ -104,7 +111,33 @@ static int ipoib_cm_post_receive(struct net_device *dev, int id)
return ret;
}
static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev, int id, int frags,
static int ipoib_cm_post_receive_nonsrq(struct net_device *dev,
struct ipoib_cm_rx *rx, int id)
{
struct ipoib_dev_priv *priv = netdev_priv(dev);
struct ib_recv_wr *bad_wr;
int i, ret;
priv->cm.rx_wr.wr_id = id | IPOIB_OP_CM | IPOIB_OP_RECV;
for (i = 0; i < IPOIB_CM_RX_SG; ++i)
priv->cm.rx_sge[i].addr = rx->rx_ring[id].mapping[i];
ret = ib_post_recv(rx->qp, &priv->cm.rx_wr, &bad_wr);
if (unlikely(ret)) {
ipoib_warn(priv, "post recv failed for buf %d (%d)\n", id, ret);
ipoib_cm_dma_unmap_rx(priv, IPOIB_CM_RX_SG - 1,
rx->rx_ring[id].mapping);
dev_kfree_skb_any(rx->rx_ring[id].skb);
rx->rx_ring[id].skb = NULL;
}
return ret;
}
static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev,
struct ipoib_cm_rx_buf *rx_ring,
int id, int frags,
u64 mapping[IPOIB_CM_RX_SG])
{
struct ipoib_dev_priv *priv = netdev_priv(dev);
......@@ -141,7 +174,7 @@ static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev, int id, int
goto partial_error;
}
priv->cm.srq_ring[id].skb = skb;
rx_ring[id].skb = skb;
return skb;
partial_error:
......@@ -224,6 +257,12 @@ static struct ib_qp *ipoib_cm_create_rx_qp(struct net_device *dev,
.qp_type = IB_QPT_RC,
.qp_context = p,
};
if (!ipoib_cm_has_srq(dev)) {
attr.cap.max_recv_wr = ipoib_recvq_size;
attr.cap.max_recv_sge = IPOIB_CM_RX_SG;
}
return ib_create_qp(priv->pd, &attr);
}
......@@ -282,6 +321,60 @@ static int ipoib_cm_modify_rx_qp(struct net_device *dev,
return 0;
}
static int ipoib_cm_nonsrq_init_rx(struct net_device *dev, struct ib_cm_id *cm_id,
struct ipoib_cm_rx *rx)
{
struct ipoib_dev_priv *priv = netdev_priv(dev);
int ret;
int i;
rx->rx_ring = kcalloc(ipoib_recvq_size, sizeof *rx->rx_ring, GFP_KERNEL);
if (!rx->rx_ring)
return -ENOMEM;
spin_lock_irq(&priv->lock);
if (priv->cm.nonsrq_conn_qp >= ipoib_max_conn_qp) {
spin_unlock_irq(&priv->lock);
ib_send_cm_rej(cm_id, IB_CM_REJ_NO_QP, NULL, 0, NULL, 0);
ret = -EINVAL;
goto err_free;
} else
++priv->cm.nonsrq_conn_qp;
spin_unlock_irq(&priv->lock);
for (i = 0; i < ipoib_recvq_size; ++i) {
if (!ipoib_cm_alloc_rx_skb(dev, rx->rx_ring, i, IPOIB_CM_RX_SG - 1,
rx->rx_ring[i].mapping)) {
ipoib_warn(priv, "failed to allocate receive buffer %d\n", i);
ret = -ENOMEM;
goto err_count;
}
ret = ipoib_cm_post_receive_nonsrq(dev, rx, i);
if (ret) {
ipoib_warn(priv, "ipoib_cm_post_receive_nonsrq "
"failed for buf %d\n", i);
ret = -EIO;
goto err_count;
}
}
rx->recv_count = ipoib_recvq_size;
return 0;
err_count:
spin_lock_irq(&priv->lock);
--priv->cm.nonsrq_conn_qp;
spin_unlock_irq(&priv->lock);
err_free:
ipoib_cm_free_rx_ring(dev, rx->rx_ring);
return ret;
}
static int ipoib_cm_send_rep(struct net_device *dev, struct ib_cm_id *cm_id,
struct ib_qp *qp, struct ib_cm_req_event_param *req,
unsigned psn)
......@@ -297,7 +390,7 @@ static int ipoib_cm_send_rep(struct net_device *dev, struct ib_cm_id *cm_id,
rep.private_data_len = sizeof data;
rep.flow_control = 0;
rep.rnr_retry_count = req->rnr_retry_count;
rep.srq = 1;
rep.srq = ipoib_cm_has_srq(dev);
rep.qp_num = qp->qp_num;
rep.starting_psn = psn;
return ib_send_cm_rep(cm_id, &rep);
......@@ -333,6 +426,12 @@ static int ipoib_cm_req_handler(struct ib_cm_id *cm_id, struct ib_cm_event *even
if (ret)
goto err_modify;
if (!ipoib_cm_has_srq(dev)) {
ret = ipoib_cm_nonsrq_init_rx(dev, cm_id, p);
if (ret)
goto err_modify;
}
spin_lock_irq(&priv->lock);
queue_delayed_work(ipoib_workqueue,
&priv->cm.stale_task, IPOIB_CM_RX_DELAY);
......@@ -417,12 +516,14 @@ static void skb_put_frags(struct sk_buff *skb, unsigned int hdr_space,
void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
{
struct ipoib_dev_priv *priv = netdev_priv(dev);
struct ipoib_cm_rx_buf *rx_ring;
unsigned int wr_id = wc->wr_id & ~(IPOIB_OP_CM | IPOIB_OP_RECV);
struct sk_buff *skb, *newskb;
struct ipoib_cm_rx *p;
unsigned long flags;
u64 mapping[IPOIB_CM_RX_SG];
int frags;
int has_srq;
ipoib_dbg_data(priv, "cm recv completion: id %d, status: %d\n",
wr_id, wc->status);
......@@ -440,18 +541,32 @@ void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
return;
}
skb = priv->cm.srq_ring[wr_id].skb;
p = wc->qp->qp_context;
has_srq = ipoib_cm_has_srq(dev);
rx_ring = has_srq ? priv->cm.srq_ring : p->rx_ring;
skb = rx_ring[wr_id].skb;
if (unlikely(wc->status != IB_WC_SUCCESS)) {
ipoib_dbg(priv, "cm recv error "
"(status=%d, wrid=%d vend_err %x)\n",
wc->status, wr_id, wc->vendor_err);
++dev->stats.rx_dropped;
if (has_srq)
goto repost;
else {
if (!--p->recv_count) {
spin_lock_irqsave(&priv->lock, flags);
list_move(&p->list, &priv->cm.rx_reap_list);
spin_unlock_irqrestore(&priv->lock, flags);
queue_work(ipoib_workqueue, &priv->cm.rx_reap_task);
}
return;
}
}
if (unlikely(!(wr_id & IPOIB_CM_RX_UPDATE_MASK))) {
p = wc->qp->qp_context;
if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
spin_lock_irqsave(&priv->lock, flags);
p->jiffies = jiffies;
......@@ -466,7 +581,7 @@ void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
(unsigned)IPOIB_CM_HEAD_SIZE)) / PAGE_SIZE;
newskb = ipoib_cm_alloc_rx_skb(dev, wr_id, frags, mapping);
newskb = ipoib_cm_alloc_rx_skb(dev, rx_ring, wr_id, frags, mapping);
if (unlikely(!newskb)) {
/*
* If we can't allocate a new RX buffer, dump
......@@ -477,8 +592,8 @@ void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
goto repost;
}
ipoib_cm_dma_unmap_rx(priv, frags, priv->cm.srq_ring[wr_id].mapping);
memcpy(priv->cm.srq_ring[wr_id].mapping, mapping, (frags + 1) * sizeof *mapping);
ipoib_cm_dma_unmap_rx(priv, frags, rx_ring[wr_id].mapping);
memcpy(rx_ring[wr_id].mapping, mapping, (frags + 1) * sizeof *mapping);
ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
wc->byte_len, wc->slid);
......@@ -499,9 +614,17 @@ void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
netif_receive_skb(skb);
repost:
if (unlikely(ipoib_cm_post_receive(dev, wr_id)))
ipoib_warn(priv, "ipoib_cm_post_receive failed "
if (has_srq) {
if (unlikely(ipoib_cm_post_receive_srq(dev, wr_id)))
ipoib_warn(priv, "ipoib_cm_post_receive_srq failed "
"for buf %d\n", wr_id);
} else {
if (unlikely(ipoib_cm_post_receive_nonsrq(dev, p, wr_id))) {
--p->recv_count;
ipoib_warn(priv, "ipoib_cm_post_receive_nonsrq failed "
"for buf %d\n", wr_id);
}
}
}
static inline int post_send(struct ipoib_dev_priv *priv,
......@@ -686,6 +809,12 @@ static void ipoib_cm_free_rx_reap_list(struct net_device *dev)
list_for_each_entry_safe(rx, n, &list, list) {
ib_destroy_cm_id(rx->id);
ib_destroy_qp(rx->qp);
if (!ipoib_cm_has_srq(dev)) {
ipoib_cm_free_rx_ring(priv->dev, rx->rx_ring);
spin_lock_irq(&priv->lock);
--priv->cm.nonsrq_conn_qp;
spin_unlock_irq(&priv->lock);
}
kfree(rx);
}
}
......@@ -864,7 +993,7 @@ static int ipoib_cm_send_req(struct net_device *dev,
req.retry_count = 0; /* RFC draft warns against retries */
req.rnr_retry_count = 0; /* RFC draft warns against retries */
req.max_cm_retries = 15;
req.srq = 1;
req.srq = ipoib_cm_has_srq(dev);
return ib_send_cm_req(id, &req);
}
......@@ -1270,7 +1399,7 @@ int ipoib_cm_add_mode_attr(struct net_device *dev)
return device_create_file(&dev->dev, &dev_attr_mode);
}
static int ipoib_cm_create_srq(struct net_device *dev)
static void ipoib_cm_create_srq(struct net_device *dev)
{
struct ipoib_dev_priv *priv = netdev_priv(dev);
struct ib_srq_init_attr srq_init_attr = {
......@@ -1279,32 +1408,30 @@ static int ipoib_cm_create_srq(struct net_device *dev)
.max_sge = IPOIB_CM_RX_SG
}
};
int ret;
priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
if (IS_ERR(priv->cm.srq)) {
ret = PTR_ERR(priv->cm.srq);
if (PTR_ERR(priv->cm.srq) != -ENOSYS)
printk(KERN_WARNING "%s: failed to allocate SRQ, error %ld\n",
priv->ca->name, PTR_ERR(priv->cm.srq));
priv->cm.srq = NULL;
return ret;
return;
}
priv->cm.srq_ring = kzalloc(ipoib_recvq_size * sizeof *priv->cm.srq_ring,
GFP_KERNEL);
if (!priv->cm.srq_ring) {
printk(KERN_WARNING "%s: failed to allocate CM ring (%d entries)\n",
printk(KERN_WARNING "%s: failed to allocate CM SRQ ring (%d entries)\n",
priv->ca->name, ipoib_recvq_size);
ib_destroy_srq(priv->cm.srq);
priv->cm.srq = NULL;
return -ENOMEM;
}
return 0;
}
int ipoib_cm_dev_init(struct net_device *dev)
{
struct ipoib_dev_priv *priv = netdev_priv(dev);
int ret, i;
int i;
INIT_LIST_HEAD(&priv->cm.passive_ids);
INIT_LIST_HEAD(&priv->cm.reap_list);
......@@ -1331,23 +1458,27 @@ int ipoib_cm_dev_init(struct net_device *dev)
priv->cm.rx_wr.sg_list = priv->cm.rx_sge;
priv->cm.rx_wr.num_sge = IPOIB_CM_RX_SG;
ret = ipoib_cm_create_srq(dev);
if (ret)
return ret;
ipoib_cm_create_srq(dev);
if (ipoib_cm_has_srq(dev)) {
for (i = 0; i < ipoib_recvq_size; ++i) {
if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
if (!ipoib_cm_alloc_rx_skb(dev, priv->cm.srq_ring, i,
IPOIB_CM_RX_SG - 1,
priv->cm.srq_ring[i].mapping)) {
ipoib_warn(priv, "failed to allocate receive buffer %d\n", i);
ipoib_warn(priv, "failed to allocate "
"receive buffer %d\n", i);
ipoib_cm_dev_cleanup(dev);
return -ENOMEM;
}
if (ipoib_cm_post_receive(dev, i)) {
ipoib_warn(priv, "ipoib_ib_post_receive failed for buf %d\n", i);
if (ipoib_cm_post_receive_srq(dev, i)) {
ipoib_warn(priv, "ipoib_cm_post_receive_srq "
"failed for buf %d\n", i);
ipoib_cm_dev_cleanup(dev);
return -EIO;
}
}
}
priv->dev->dev_addr[0] = IPOIB_FLAGS_RC;
return 0;
......
......@@ -1268,6 +1268,9 @@ static int __init ipoib_init_module(void)
ipoib_sendq_size = roundup_pow_of_two(ipoib_sendq_size);
ipoib_sendq_size = min(ipoib_sendq_size, IPOIB_MAX_QUEUE_SIZE);
ipoib_sendq_size = max(ipoib_sendq_size, IPOIB_MIN_QUEUE_SIZE);
#ifdef CONFIG_INFINIBAND_IPOIB_CM
ipoib_max_conn_qp = min(ipoib_max_conn_qp, IPOIB_CM_MAX_CONN_QP);
#endif
ret = ipoib_register_debugfs();
if (ret)
......
......@@ -172,8 +172,12 @@ int ipoib_transport_dev_init(struct net_device *dev, struct ib_device *ca)
size = ipoib_sendq_size + ipoib_recvq_size + 1;
ret = ipoib_cm_dev_init(dev);
if (!ret)
size += ipoib_recvq_size + 1 /* 1 extra for rx_drain_qp */;
if (!ret) {
if (ipoib_cm_has_srq(dev))
size += ipoib_recvq_size + 1; /* 1 extra for rx_drain_qp */
else
size += ipoib_recvq_size * ipoib_max_conn_qp;
}
priv->cq = ib_create_cq(priv->ca, ipoib_ib_completion, NULL, dev, size, 0);
if (IS_ERR(priv->cq)) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment