Commit 86ef6250 authored by Amir Shehata's avatar Amir Shehata Committed by Greg Kroah-Hartman

staging: lustre: Dynamic LNet Configuration (DLC) dynamic routing

This is the second patch of a set of patches that enables DLC.

This patch adds the following features to LNET.  Currently these
features are not driven by user space.
- Enabling Routing on Demand.  The default number of router
  buffers are allocated.
- Disable Routing on demand. Unused router buffers are freed and
  used router buffers are freed when they are no longer in use.
  The following time routing is enabled the default router buffer
  values are used.  It has been decided that remembering the
  user set router buffer values should be remembered and re-set
  by user space scripts.
- Increase the number of router buffers on demand, by allocating
  new ones.
- Decrease the number of router buffers.  Exccess buffers are freed
  if they are not in use.  Otherwise they are freed once they are
  no longer in use.
Signed-off-by: default avatarAmir Shehata <amir.shehata@intel.com>
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-2456
Change-Id: Id07d4ad424d8f5ba72475d4149380afe2ac54e77
Reviewed-on: http://review.whamcloud.com/9831Reviewed-by: default avatarJames Simmons <uja.ornl@gmail.com>
Reviewed-by: default avatarDoug Oucharek <doug.s.oucharek@intel.com>
Reviewed-by: default avatarLiang Zhen <liang.zhen@intel.com>
Reviewed-by: default avatarOleg Drokin <oleg.drokin@intel.com>
Signed-off-by: default avatarGreg Kroah-Hartman <gregkh@linuxfoundation.org>
parent 21602c7d
......@@ -461,7 +461,11 @@ int lnet_get_route(int idx, __u32 *net, __u32 *hops,
void lnet_router_debugfs_init(void);
void lnet_router_debugfs_fini(void);
int lnet_rtrpools_alloc(int im_a_router);
void lnet_rtrpools_free(void);
void lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages);
int lnet_rtrpools_adjust(int tiny, int small, int large);
int lnet_rtrpools_enable(void);
void lnet_rtrpools_disable(void);
void lnet_rtrpools_free(int keep_pools);
lnet_remotenet_t *lnet_find_net_locked(__u32 net);
int lnet_islocalnid(lnet_nid_t nid);
......@@ -481,6 +485,8 @@ void lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
int lnet_send(lnet_nid_t nid, lnet_msg_t *msg, lnet_nid_t rtr_nid);
void lnet_return_tx_credits_locked(lnet_msg_t *msg);
void lnet_return_rx_credits_locked(lnet_msg_t *msg);
void lnet_schedule_blocked_locked(lnet_rtrbufpool_t *rbp);
void lnet_drop_routed_msgs_locked(struct list_head *list, int cpt);
/* portals functions */
/* portals attributes */
......
......@@ -285,6 +285,7 @@ typedef struct lnet_ni {
#define LNET_PING_FEAT_INVAL (0) /* no feature */
#define LNET_PING_FEAT_BASE (1 << 0) /* just a ping */
#define LNET_PING_FEAT_NI_STATUS (1 << 1) /* return NI status */
#define LNET_PING_FEAT_RTE_DISABLED (1 << 2) /* Routing enabled */
#define LNET_PING_FEAT_MASK (LNET_PING_FEAT_BASE | \
LNET_PING_FEAT_NI_STATUS)
......@@ -410,7 +411,12 @@ typedef struct {
#define LNET_PEER_HASHSIZE 503 /* prime! */
#define LNET_NRBPOOLS 3 /* # different router buffer pools */
#define LNET_TINY_BUF_IDX 0
#define LNET_SMALL_BUF_IDX 1
#define LNET_LARGE_BUF_IDX 2
/* # different router buffer pools */
#define LNET_NRBPOOLS (LNET_LARGE_BUF_IDX + 1)
enum {
/* Didn't match anything */
......
......@@ -638,7 +638,7 @@ lnet_unprepare(void)
lnet_msg_containers_destroy();
lnet_peer_tables_destroy();
lnet_rtrpools_free();
lnet_rtrpools_free(0);
if (the_lnet.ln_counters) {
cfs_percpt_free(the_lnet.ln_counters);
......@@ -1501,6 +1501,8 @@ lnet_create_ping_info(void)
pinfo->pi_pid = the_lnet.ln_pid;
pinfo->pi_magic = LNET_PROTO_PING_MAGIC;
pinfo->pi_features = LNET_PING_FEAT_NI_STATUS;
if (!the_lnet.ln_routing)
pinfo->pi_features |= LNET_PING_FEAT_RTE_DISABLED;
for (i = 0; i < n; i++) {
lnet_ni_status_t *ns = &pinfo->pi_ni[i];
......
......@@ -945,9 +945,6 @@ lnet_post_routed_recv_locked(lnet_msg_t *msg, int do_recv)
rbp = lnet_msg2bufpool(msg);
if (!msg->msg_rtrcredit) {
LASSERT((rbp->rbp_credits < 0) ==
!list_empty(&rbp->rbp_msgs));
msg->msg_rtrcredit = 1;
rbp->rbp_credits--;
if (rbp->rbp_credits < rbp->rbp_mincredits)
......@@ -1038,6 +1035,43 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg)
}
}
void
lnet_schedule_blocked_locked(lnet_rtrbufpool_t *rbp)
{
lnet_msg_t *msg;
if (list_empty(&rbp->rbp_msgs))
return;
msg = list_entry(rbp->rbp_msgs.next,
lnet_msg_t, msg_list);
list_del(&msg->msg_list);
(void)lnet_post_routed_recv_locked(msg, 1);
}
void
lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
{
struct list_head drop;
lnet_msg_t *msg;
lnet_msg_t *tmp;
INIT_LIST_HEAD(&drop);
list_splice_init(list, &drop);
lnet_net_unlock(cpt);
list_for_each_entry_safe(msg, tmp, &drop, msg_list) {
lnet_ni_recv(msg->msg_rxpeer->lp_ni, msg->msg_private, NULL,
0, 0, 0, msg->msg_hdr.payload_length);
list_del_init(&msg->msg_list);
lnet_finalize(NULL, msg, -ECANCELED);
}
lnet_net_lock(cpt);
}
void
lnet_return_rx_credits_locked(lnet_msg_t *msg)
{
......@@ -1058,27 +1092,41 @@ lnet_return_rx_credits_locked(lnet_msg_t *msg)
rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]);
rbp = rb->rb_pool;
LASSERT(rbp == lnet_msg2bufpool(msg));
msg->msg_kiov = NULL;
msg->msg_rtrcredit = 0;
LASSERT((rbp->rbp_credits < 0) ==
!list_empty(&rbp->rbp_msgs));
LASSERT(rbp == lnet_msg2bufpool(msg));
LASSERT((rbp->rbp_credits > 0) ==
!list_empty(&rbp->rbp_bufs));
list_add(&rb->rb_list, &rbp->rbp_bufs);
rbp->rbp_credits++;
if (rbp->rbp_credits <= 0) {
msg2 = list_entry(rbp->rbp_msgs.next,
lnet_msg_t, msg_list);
list_del(&msg2->msg_list);
/*
* If routing is now turned off, we just drop this buffer and
* don't bother trying to return credits.
*/
if (!the_lnet.ln_routing) {
lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
goto routing_off;
}
(void) lnet_post_routed_recv_locked(msg2, 1);
/*
* It is possible that a user has lowered the desired number of
* buffers in this pool. Make sure we never put back
* more buffers than the stated number.
*/
if (rbp->rbp_credits >= rbp->rbp_nbuffers) {
/* Discard this buffer so we don't have too many. */
lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
} else {
list_add(&rb->rb_list, &rbp->rbp_bufs);
rbp->rbp_credits++;
if (rbp->rbp_credits <= 0)
lnet_schedule_blocked_locked(rbp);
}
}
routing_off:
if (msg->msg_peerrtrcredit) {
/* give back peer router credits */
msg->msg_peerrtrcredit = 0;
......@@ -1087,7 +1135,14 @@ lnet_return_rx_credits_locked(lnet_msg_t *msg)
!list_empty(&rxpeer->lp_rtrq));
rxpeer->lp_rtrcredits++;
if (rxpeer->lp_rtrcredits <= 0) {
/*
* drop all messages which are queued to be routed on that
* peer.
*/
if (!the_lnet.ln_routing) {
lnet_drop_routed_msgs_locked(&rxpeer->lp_rtrq,
msg->msg_rx_cpt);
} else if (rxpeer->lp_rtrcredits <= 0) {
msg2 = list_entry(rxpeer->lp_rtrq.next,
lnet_msg_t, msg_list);
list_del(&msg2->msg_list);
......@@ -1646,6 +1701,9 @@ lnet_parse_forward_locked(lnet_ni_t *ni, lnet_msg_t *msg)
{
int rc = 0;
if (!the_lnet.ln_routing)
return -ECANCELED;
if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
lnet_msg2bufpool(msg)->rbp_credits <= 0) {
if (!ni->ni_lnd->lnd_eager_recv) {
......@@ -1799,9 +1857,8 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
if (the_lnet.ln_routing &&
ni->ni_last_alive != ktime_get_real_seconds()) {
lnet_ni_lock(ni);
/* NB: so far here is the only place to set NI status to "up */
lnet_ni_lock(ni);
ni->ni_last_alive = ktime_get_real_seconds();
if (ni->ni_status &&
ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment