Commit 1d5d5ec1 authored by Fan Yong's avatar Fan Yong Committed by Greg Kroah-Hartman

staging: lustre: obdclass: unified flow control interfaces

Unify the flow control interfaces for MDC RPC and FLD RPC.
We allow to adjust the maximum inflight RPCs count via /sys
interface.
Signed-off-by: default avatarFan Yong <fan.yong@intel.com>
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-4687
Reviewed-on: http://review.whamcloud.com/9562Reviewed-by: default avatarNiu Yawei <yawei.niu@intel.com>
Reviewed-by: default avatarAlex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: default avatarOleg Drokin <oleg.drokin@intel.com>
Signed-off-by: default avatarJames Simmons <jsimmons@infradead.org>
Signed-off-by: default avatarGreg Kroah-Hartman <gregkh@linuxfoundation.org>
parent 9ef3754c
......@@ -53,57 +53,6 @@
#include "../include/lustre_mdc.h"
#include "fld_internal.h"
/* TODO: these 3 functions are copies of flow-control code from mdc_lib.c
* It should be common thing. The same about mdc RPC lock
*/
static int fld_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw)
{
int rc;
spin_lock(&cli->cl_loi_list_lock);
rc = list_empty(&mcw->mcw_entry);
spin_unlock(&cli->cl_loi_list_lock);
return rc;
};
static void fld_enter_request(struct client_obd *cli)
{
struct mdc_cache_waiter mcw;
struct l_wait_info lwi = { 0 };
spin_lock(&cli->cl_loi_list_lock);
if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters);
init_waitqueue_head(&mcw.mcw_waitq);
spin_unlock(&cli->cl_loi_list_lock);
l_wait_event(mcw.mcw_waitq, fld_req_avail(cli, &mcw), &lwi);
} else {
cli->cl_r_in_flight++;
spin_unlock(&cli->cl_loi_list_lock);
}
}
static void fld_exit_request(struct client_obd *cli)
{
struct list_head *l, *tmp;
struct mdc_cache_waiter *mcw;
spin_lock(&cli->cl_loi_list_lock);
cli->cl_r_in_flight--;
list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
/* No free request slots anymore */
break;
}
mcw = list_entry(l, struct mdc_cache_waiter, mcw_entry);
list_del_init(&mcw->mcw_entry);
cli->cl_r_in_flight++;
wake_up(&mcw->mcw_waitq);
}
spin_unlock(&cli->cl_loi_list_lock);
}
static int fld_rrb_hash(struct lu_client_fld *fld, u64 seq)
{
LASSERT(fld->lcf_count > 0);
......@@ -439,9 +388,9 @@ int fld_client_rpc(struct obd_export *exp,
req->rq_reply_portal = MDC_REPLY_PORTAL;
ptlrpc_at_set_req_timeout(req);
fld_enter_request(&exp->exp_obd->u.cli);
obd_get_request_slot(&exp->exp_obd->u.cli);
rc = ptlrpc_queue_wait(req);
fld_exit_request(&exp->exp_obd->u.cli);
obd_put_request_slot(&exp->exp_obd->u.cli);
if (rc)
goto out_req;
......
......@@ -179,11 +179,6 @@ static inline void mdc_update_max_ea_from_body(struct obd_export *exp,
}
}
struct mdc_cache_waiter {
struct list_head mcw_entry;
wait_queue_head_t mcw_waitq;
};
/* mdc/mdc_locks.c */
int it_open_error(int phase, struct lookup_intent *it);
......
......@@ -211,11 +211,12 @@ struct timeout_item {
struct list_head ti_chain;
};
#define OSC_MAX_RIF_DEFAULT 8
#define OSC_MAX_RIF_MAX 256
#define OSC_MAX_DIRTY_DEFAULT (OSC_MAX_RIF_DEFAULT * 4)
#define OSC_MAX_DIRTY_MB_MAX 2048 /* arbitrary, but < MAX_LONG bytes */
#define OSC_DEFAULT_RESENDS 10
#define OBD_MAX_RIF_DEFAULT 8
#define OBD_MAX_RIF_MAX 512
#define OSC_MAX_RIF_MAX 256
#define OSC_MAX_DIRTY_DEFAULT (OBD_MAX_RIF_DEFAULT * 4)
#define OSC_MAX_DIRTY_MB_MAX 2048 /* arbitrary, but < MAX_LONG bytes */
#define OSC_DEFAULT_RESENDS 10
/* possible values for fo_sync_lock_cancel */
enum {
......@@ -225,9 +226,6 @@ enum {
NUM_SYNC_ON_CANCEL_STATES
};
#define MDC_MAX_RIF_DEFAULT 8
#define MDC_MAX_RIF_MAX 512
enum obd_cl_sem_lock_class {
OBD_CLI_SEM_NORMAL,
OBD_CLI_SEM_MGC,
......
......@@ -97,6 +97,11 @@ int obd_zombie_impexp_init(void);
void obd_zombie_impexp_stop(void);
void obd_zombie_barrier(void);
int obd_get_request_slot(struct client_obd *cli);
void obd_put_request_slot(struct client_obd *cli);
__u32 obd_get_max_rpcs_in_flight(struct client_obd *cli);
int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max);
struct llog_handle;
struct llog_rec_hdr;
typedef int (*llog_cb_t)(const struct lu_env *, struct llog_handle *,
......
......@@ -360,7 +360,7 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
cli->cl_chunkbits = PAGE_SHIFT;
if (!strcmp(name, LUSTRE_MDC_NAME)) {
cli->cl_max_rpcs_in_flight = MDC_MAX_RIF_DEFAULT;
cli->cl_max_rpcs_in_flight = OBD_MAX_RIF_DEFAULT;
} else if (totalram_pages >> (20 - PAGE_SHIFT) <= 128 /* MB */) {
cli->cl_max_rpcs_in_flight = 2;
} else if (totalram_pages >> (20 - PAGE_SHIFT) <= 256 /* MB */) {
......@@ -368,7 +368,7 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
} else if (totalram_pages >> (20 - PAGE_SHIFT) <= 512 /* MB */) {
cli->cl_max_rpcs_in_flight = 4;
} else {
cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
cli->cl_max_rpcs_in_flight = OBD_MAX_RIF_DEFAULT;
}
rc = ldlm_get_ref();
if (rc) {
......
......@@ -43,11 +43,10 @@ static ssize_t max_rpcs_in_flight_show(struct kobject *kobj,
int len;
struct obd_device *dev = container_of(kobj, struct obd_device,
obd_kobj);
struct client_obd *cli = &dev->u.cli;
__u32 max;
spin_lock(&cli->cl_loi_list_lock);
len = sprintf(buf, "%u\n", cli->cl_max_rpcs_in_flight);
spin_unlock(&cli->cl_loi_list_lock);
max = obd_get_max_rpcs_in_flight(&dev->u.cli);
len = sprintf(buf, "%u\n", max);
return len;
}
......@@ -59,7 +58,6 @@ static ssize_t max_rpcs_in_flight_store(struct kobject *kobj,
{
struct obd_device *dev = container_of(kobj, struct obd_device,
obd_kobj);
struct client_obd *cli = &dev->u.cli;
int rc;
unsigned long val;
......@@ -67,12 +65,9 @@ static ssize_t max_rpcs_in_flight_store(struct kobject *kobj,
if (rc)
return rc;
if (val < 1 || val > MDC_MAX_RIF_MAX)
return -ERANGE;
spin_lock(&cli->cl_loi_list_lock);
cli->cl_max_rpcs_in_flight = val;
spin_unlock(&cli->cl_loi_list_lock);
rc = obd_set_max_rpcs_in_flight(&dev->u.cli, val);
if (rc)
count = rc;
return count;
}
......
......@@ -61,8 +61,6 @@ void mdc_link_pack(struct ptlrpc_request *req, struct md_op_data *op_data);
void mdc_rename_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
const char *old, int oldlen, const char *new, int newlen);
void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data);
int mdc_enter_request(struct client_obd *cli);
void mdc_exit_request(struct client_obd *cli);
/* mdc/mdc_locks.c */
int mdc_set_lock_data(struct obd_export *exp,
......
......@@ -484,67 +484,3 @@ void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data)
mdc_ioepoch_pack(epoch, op_data);
mdc_hsm_release_pack(req, op_data);
}
static int mdc_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw)
{
int rc;
spin_lock(&cli->cl_loi_list_lock);
rc = list_empty(&mcw->mcw_entry);
spin_unlock(&cli->cl_loi_list_lock);
return rc;
};
/* We record requests in flight in cli->cl_r_in_flight here.
* There is only one write rpc possible in mdc anyway. If this to change
* in the future - the code may need to be revisited.
*/
int mdc_enter_request(struct client_obd *cli)
{
int rc = 0;
struct mdc_cache_waiter mcw;
struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
spin_lock(&cli->cl_loi_list_lock);
if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters);
init_waitqueue_head(&mcw.mcw_waitq);
spin_unlock(&cli->cl_loi_list_lock);
rc = l_wait_event(mcw.mcw_waitq, mdc_req_avail(cli, &mcw),
&lwi);
if (rc) {
spin_lock(&cli->cl_loi_list_lock);
if (list_empty(&mcw.mcw_entry))
cli->cl_r_in_flight--;
list_del_init(&mcw.mcw_entry);
spin_unlock(&cli->cl_loi_list_lock);
}
} else {
cli->cl_r_in_flight++;
spin_unlock(&cli->cl_loi_list_lock);
}
return rc;
}
void mdc_exit_request(struct client_obd *cli)
{
struct list_head *l, *tmp;
struct mdc_cache_waiter *mcw;
spin_lock(&cli->cl_loi_list_lock);
cli->cl_r_in_flight--;
list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) {
/* No free request slots anymore */
break;
}
mcw = list_entry(l, struct mdc_cache_waiter, mcw_entry);
list_del_init(&mcw->mcw_entry);
cli->cl_r_in_flight++;
wake_up(&mcw->mcw_waitq);
}
/* Empty waiting list? Decrease reqs in-flight number */
spin_unlock(&cli->cl_loi_list_lock);
}
......@@ -809,7 +809,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
*/
if (it) {
mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
rc = mdc_enter_request(&obddev->u.cli);
rc = obd_get_request_slot(&obddev->u.cli);
if (rc != 0) {
mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
mdc_clear_replay_flag(req, 0);
......@@ -837,7 +837,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
return rc;
}
mdc_exit_request(&obddev->u.cli);
obd_put_request_slot(&obddev->u.cli);
mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
if (rc < 0) {
......@@ -1179,7 +1179,7 @@ static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
obddev = class_exp2obd(exp);
mdc_exit_request(&obddev->u.cli);
obd_put_request_slot(&obddev->u.cli);
if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
rc = -ETIMEDOUT;
......@@ -1239,7 +1239,7 @@ int mdc_intent_getattr_async(struct obd_export *exp,
if (IS_ERR(req))
return PTR_ERR(req);
rc = mdc_enter_request(&obddev->u.cli);
rc = obd_get_request_slot(&obddev->u.cli);
if (rc != 0) {
ptlrpc_req_finished(req);
return rc;
......@@ -1248,7 +1248,7 @@ int mdc_intent_getattr_async(struct obd_export *exp,
rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
0, LVB_T_NONE, &minfo->mi_lockh, 1);
if (rc < 0) {
mdc_exit_request(&obddev->u.cli);
obd_put_request_slot(&obddev->u.cli);
ptlrpc_req_finished(req);
return rc;
}
......
......@@ -58,16 +58,16 @@ static inline int mdc_queue_wait(struct ptlrpc_request *req)
struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
int rc;
/* mdc_enter_request() ensures that this client has no more
/* obd_get_request_slot() ensures that this client has no more
* than cl_max_rpcs_in_flight RPCs simultaneously inf light
* against an MDT.
*/
rc = mdc_enter_request(cli);
rc = obd_get_request_slot(cli);
if (rc != 0)
return rc;
rc = ptlrpc_queue_wait(req);
mdc_exit_request(cli);
obd_put_request_slot(cli);
return rc;
}
......
......@@ -1312,3 +1312,135 @@ void obd_zombie_impexp_stop(void)
obd_zombie_impexp_notify();
wait_for_completion(&obd_zombie_stop);
}
struct obd_request_slot_waiter {
struct list_head orsw_entry;
wait_queue_head_t orsw_waitq;
bool orsw_signaled;
};
static bool obd_request_slot_avail(struct client_obd *cli,
struct obd_request_slot_waiter *orsw)
{
bool avail;
spin_lock(&cli->cl_loi_list_lock);
avail = !!list_empty(&orsw->orsw_entry);
spin_unlock(&cli->cl_loi_list_lock);
return avail;
};
/*
* For network flow control, the RPC sponsor needs to acquire a credit
* before sending the RPC. The credits count for a connection is defined
* by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
* the subsequent RPC sponsors need to wait until others released their
* credits, or the administrator increased the "cl_max_rpcs_in_flight".
*/
int obd_get_request_slot(struct client_obd *cli)
{
struct obd_request_slot_waiter orsw;
struct l_wait_info lwi;
int rc;
spin_lock(&cli->cl_loi_list_lock);
if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
cli->cl_r_in_flight++;
spin_unlock(&cli->cl_loi_list_lock);
return 0;
}
init_waitqueue_head(&orsw.orsw_waitq);
list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
orsw.orsw_signaled = false;
spin_unlock(&cli->cl_loi_list_lock);
lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
rc = l_wait_event(orsw.orsw_waitq,
obd_request_slot_avail(cli, &orsw) ||
orsw.orsw_signaled,
&lwi);
/*
* Here, we must take the lock to avoid the on-stack 'orsw' to be
* freed but other (such as obd_put_request_slot) is using it.
*/
spin_lock(&cli->cl_loi_list_lock);
if (rc) {
if (!orsw.orsw_signaled) {
if (list_empty(&orsw.orsw_entry))
cli->cl_r_in_flight--;
else
list_del(&orsw.orsw_entry);
}
}
if (orsw.orsw_signaled) {
LASSERT(list_empty(&orsw.orsw_entry));
rc = -EINTR;
}
spin_unlock(&cli->cl_loi_list_lock);
return rc;
}
EXPORT_SYMBOL(obd_get_request_slot);
void obd_put_request_slot(struct client_obd *cli)
{
struct obd_request_slot_waiter *orsw;
spin_lock(&cli->cl_loi_list_lock);
cli->cl_r_in_flight--;
/* If there is free slot, wakeup the first waiter. */
if (!list_empty(&cli->cl_loi_read_list) &&
likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
orsw = list_entry(cli->cl_loi_read_list.next,
struct obd_request_slot_waiter, orsw_entry);
list_del_init(&orsw->orsw_entry);
cli->cl_r_in_flight++;
wake_up(&orsw->orsw_waitq);
}
spin_unlock(&cli->cl_loi_list_lock);
}
EXPORT_SYMBOL(obd_put_request_slot);
__u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
{
return cli->cl_max_rpcs_in_flight;
}
EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
{
struct obd_request_slot_waiter *orsw;
__u32 old;
int diff;
int i;
if (max > OBD_MAX_RIF_MAX || max < 1)
return -ERANGE;
spin_lock(&cli->cl_loi_list_lock);
old = cli->cl_max_rpcs_in_flight;
cli->cl_max_rpcs_in_flight = max;
diff = max - old;
/* We increase the max_rpcs_in_flight, then wakeup some waiters. */
for (i = 0; i < diff; i++) {
if (list_empty(&cli->cl_loi_read_list))
break;
orsw = list_entry(cli->cl_loi_read_list.next,
struct obd_request_slot_waiter, orsw_entry);
list_del_init(&orsw->orsw_entry);
cli->cl_r_in_flight++;
wake_up(&orsw->orsw_waitq);
}
spin_unlock(&cli->cl_loi_list_lock);
return 0;
}
EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment