Commit 8d69af03 authored by Amir Shehata's avatar Amir Shehata Committed by Greg Kroah-Hartman

staging: lustre: ptlrpc: Introduce iovec to bulk descriptor

Added a union to ptlrpc_bulk_desc for KVEC and KIOV buffers.
bd_type has been changed to be a bit mask. Bits are set in
bd_type to specify {put,get}{source,sink}{kvec,kiov}
changed all instances in the code to access the union properly
ASSUMPTION: all current code only works with KIOV and new DNE code
to be added will introduce a different code path that uses IOVEC

As part of the implementation buffer operations are added
as callbacks to be defined by users of ptlrpc.  This enables
buffer users to define how to allocate and release buffers.
Signed-off-by: default avatarAmir Shehata <amir.shehata@intel.com>
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-5835
Reviewed-on: http://review.whamcloud.com/12525Reviewed-by: default avatarwangdi <di.wang@intel.com>
Reviewed-by: default avatarLiang Zhen <liang.zhen@intel.com>
Reviewed-by: default avatarJames Simmons <uja.ornl@yahoo.com>
Reviewed-by: default avatarOleg Drokin <oleg.drokin@intel.com>
Signed-off-by: default avatarJames Simmons <jsimmons@infradead.org>
Signed-off-by: default avatarGreg Kroah-Hartman <gregkh@linuxfoundation.org>
parent 662e035c
...@@ -50,6 +50,7 @@ ...@@ -50,6 +50,7 @@
* @{ * @{
*/ */
#include <linux/uio.h>
#include "../../include/linux/libcfs/libcfs.h" #include "../../include/linux/libcfs/libcfs.h"
#include "../../include/linux/lnet/nidstr.h" #include "../../include/linux/lnet/nidstr.h"
#include "../../include/linux/lnet/api.h" #include "../../include/linux/lnet/api.h"
...@@ -1083,10 +1084,93 @@ struct ptlrpc_bulk_page { ...@@ -1083,10 +1084,93 @@ struct ptlrpc_bulk_page {
struct page *bp_page; struct page *bp_page;
}; };
#define BULK_GET_SOURCE 0 enum ptlrpc_bulk_op_type {
#define BULK_PUT_SINK 1 PTLRPC_BULK_OP_ACTIVE = 0x00000001,
#define BULK_GET_SINK 2 PTLRPC_BULK_OP_PASSIVE = 0x00000002,
#define BULK_PUT_SOURCE 3 PTLRPC_BULK_OP_PUT = 0x00000004,
PTLRPC_BULK_OP_GET = 0x00000008,
PTLRPC_BULK_BUF_KVEC = 0x00000010,
PTLRPC_BULK_BUF_KIOV = 0x00000020,
PTLRPC_BULK_GET_SOURCE = PTLRPC_BULK_OP_PASSIVE | PTLRPC_BULK_OP_GET,
PTLRPC_BULK_PUT_SINK = PTLRPC_BULK_OP_PASSIVE | PTLRPC_BULK_OP_PUT,
PTLRPC_BULK_GET_SINK = PTLRPC_BULK_OP_ACTIVE | PTLRPC_BULK_OP_GET,
PTLRPC_BULK_PUT_SOURCE = PTLRPC_BULK_OP_ACTIVE | PTLRPC_BULK_OP_PUT,
};
static inline bool ptlrpc_is_bulk_op_get(enum ptlrpc_bulk_op_type type)
{
return (type & PTLRPC_BULK_OP_GET) == PTLRPC_BULK_OP_GET;
}
static inline bool ptlrpc_is_bulk_get_source(enum ptlrpc_bulk_op_type type)
{
return (type & PTLRPC_BULK_GET_SOURCE) == PTLRPC_BULK_GET_SOURCE;
}
static inline bool ptlrpc_is_bulk_put_sink(enum ptlrpc_bulk_op_type type)
{
return (type & PTLRPC_BULK_PUT_SINK) == PTLRPC_BULK_PUT_SINK;
}
static inline bool ptlrpc_is_bulk_get_sink(enum ptlrpc_bulk_op_type type)
{
return (type & PTLRPC_BULK_GET_SINK) == PTLRPC_BULK_GET_SINK;
}
static inline bool ptlrpc_is_bulk_put_source(enum ptlrpc_bulk_op_type type)
{
return (type & PTLRPC_BULK_PUT_SOURCE) == PTLRPC_BULK_PUT_SOURCE;
}
static inline bool ptlrpc_is_bulk_desc_kvec(enum ptlrpc_bulk_op_type type)
{
return ((type & PTLRPC_BULK_BUF_KVEC) | (type & PTLRPC_BULK_BUF_KIOV))
== PTLRPC_BULK_BUF_KVEC;
}
static inline bool ptlrpc_is_bulk_desc_kiov(enum ptlrpc_bulk_op_type type)
{
return ((type & PTLRPC_BULK_BUF_KVEC) | (type & PTLRPC_BULK_BUF_KIOV))
== PTLRPC_BULK_BUF_KIOV;
}
static inline bool ptlrpc_is_bulk_op_active(enum ptlrpc_bulk_op_type type)
{
return ((type & PTLRPC_BULK_OP_ACTIVE) |
(type & PTLRPC_BULK_OP_PASSIVE)) == PTLRPC_BULK_OP_ACTIVE;
}
static inline bool ptlrpc_is_bulk_op_passive(enum ptlrpc_bulk_op_type type)
{
return ((type & PTLRPC_BULK_OP_ACTIVE) |
(type & PTLRPC_BULK_OP_PASSIVE)) == PTLRPC_BULK_OP_PASSIVE;
}
struct ptlrpc_bulk_frag_ops {
/**
* Add a page \a page to the bulk descriptor \a desc
* Data to transfer in the page starts at offset \a pageoffset and
* amount of data to transfer from the page is \a len
*/
void (*add_kiov_frag)(struct ptlrpc_bulk_desc *desc,
struct page *page, int pageoffset, int len);
/*
* Add a \a fragment to the bulk descriptor \a desc.
* Data to transfer in the fragment is pointed to by \a frag
* The size of the fragment is \a len
*/
int (*add_iov_frag)(struct ptlrpc_bulk_desc *desc, void *frag, int len);
/**
* Uninitialize and free bulk descriptor \a desc.
* Works on bulk descriptors both from server and client side.
*/
void (*release_frags)(struct ptlrpc_bulk_desc *desc);
};
extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops;
extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops;
/** /**
* Definition of bulk descriptor. * Definition of bulk descriptor.
...@@ -1101,14 +1185,14 @@ struct ptlrpc_bulk_page { ...@@ -1101,14 +1185,14 @@ struct ptlrpc_bulk_page {
struct ptlrpc_bulk_desc { struct ptlrpc_bulk_desc {
/** completed with failure */ /** completed with failure */
unsigned long bd_failure:1; unsigned long bd_failure:1;
/** {put,get}{source,sink} */
unsigned long bd_type:2;
/** client side */ /** client side */
unsigned long bd_registered:1; unsigned long bd_registered:1;
/** For serialization with callback */ /** For serialization with callback */
spinlock_t bd_lock; spinlock_t bd_lock;
/** Import generation when request for this bulk was sent */ /** Import generation when request for this bulk was sent */
int bd_import_generation; int bd_import_generation;
/** {put,get}{source,sink}{kvec,kiov} */
enum ptlrpc_bulk_op_type bd_type;
/** LNet portal for this bulk */ /** LNet portal for this bulk */
__u32 bd_portal; __u32 bd_portal;
/** Server side - export this bulk created for */ /** Server side - export this bulk created for */
...@@ -1117,6 +1201,7 @@ struct ptlrpc_bulk_desc { ...@@ -1117,6 +1201,7 @@ struct ptlrpc_bulk_desc {
struct obd_import *bd_import; struct obd_import *bd_import;
/** Back pointer to the request */ /** Back pointer to the request */
struct ptlrpc_request *bd_req; struct ptlrpc_request *bd_req;
struct ptlrpc_bulk_frag_ops *bd_frag_ops;
wait_queue_head_t bd_waitq; /* server side only WQ */ wait_queue_head_t bd_waitq; /* server side only WQ */
int bd_iov_count; /* # entries in bd_iov */ int bd_iov_count; /* # entries in bd_iov */
int bd_max_iov; /* allocated size of bd_iov */ int bd_max_iov; /* allocated size of bd_iov */
...@@ -1132,14 +1217,31 @@ struct ptlrpc_bulk_desc { ...@@ -1132,14 +1217,31 @@ struct ptlrpc_bulk_desc {
/** array of associated MDs */ /** array of associated MDs */
lnet_handle_md_t bd_mds[PTLRPC_BULK_OPS_COUNT]; lnet_handle_md_t bd_mds[PTLRPC_BULK_OPS_COUNT];
/* union {
* encrypt iov, size is either 0 or bd_iov_count. struct {
*/ /*
lnet_kiov_t *bd_enc_iov; * encrypt iov, size is either 0 or bd_iov_count.
*/
lnet_kiov_t bd_iov[0]; struct bio_vec *bd_enc_vec;
struct bio_vec *bd_vec; /* Array of bio_vecs */
} bd_kiov;
struct {
struct kvec *bd_enc_kvec;
struct kvec *bd_kvec; /* Array of kvecs */
} bd_kvec;
} bd_u;
}; };
#define GET_KIOV(desc) ((desc)->bd_u.bd_kiov.bd_vec)
#define BD_GET_KIOV(desc, i) ((desc)->bd_u.bd_kiov.bd_vec[i])
#define GET_ENC_KIOV(desc) ((desc)->bd_u.bd_kiov.bd_enc_vec)
#define BD_GET_ENC_KIOV(desc, i) ((desc)->bd_u.bd_kiov.bd_enc_vec[i])
#define GET_KVEC(desc) ((desc)->bd_u.bd_kvec.bd_kvec)
#define BD_GET_KVEC(desc, i) ((desc)->bd_u.bd_kvec.bd_kvec[i])
#define GET_ENC_KVEC(desc) ((desc)->bd_u.bd_kvec.bd_enc_kvec)
#define BD_GET_ENC_KVEC(desc, i) ((desc)->bd_u.bd_kvec.bd_enc_kvec[i])
enum { enum {
SVC_STOPPED = 1 << 0, SVC_STOPPED = 1 << 0,
SVC_STOPPING = 1 << 1, SVC_STOPPING = 1 << 1,
...@@ -1754,21 +1856,17 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request, ...@@ -1754,21 +1856,17 @@ int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
void ptlrpc_req_finished(struct ptlrpc_request *request); void ptlrpc_req_finished(struct ptlrpc_request *request);
struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req); struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req);
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req, struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
unsigned npages, unsigned max_brw, unsigned int nfrags,
unsigned type, unsigned portal); unsigned int max_brw,
void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk, int pin); unsigned int type,
static inline void ptlrpc_free_bulk_pin(struct ptlrpc_bulk_desc *bulk) unsigned int portal,
{ const struct ptlrpc_bulk_frag_ops *ops);
__ptlrpc_free_bulk(bulk, 1);
} int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc,
void *frag, int len);
static inline void ptlrpc_free_bulk_nopin(struct ptlrpc_bulk_desc *bulk)
{
__ptlrpc_free_bulk(bulk, 0);
}
void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc, void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
struct page *page, int pageoffset, int len, int); struct page *page, int pageoffset, int len,
int pin);
static inline void ptlrpc_prep_bulk_page_pin(struct ptlrpc_bulk_desc *desc, static inline void ptlrpc_prep_bulk_page_pin(struct ptlrpc_bulk_desc *desc,
struct page *page, int pageoffset, struct page *page, int pageoffset,
int len) int len)
...@@ -1783,6 +1881,16 @@ static inline void ptlrpc_prep_bulk_page_nopin(struct ptlrpc_bulk_desc *desc, ...@@ -1783,6 +1881,16 @@ static inline void ptlrpc_prep_bulk_page_nopin(struct ptlrpc_bulk_desc *desc,
__ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 0); __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 0);
} }
void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk);
static inline void ptlrpc_release_bulk_page_pin(struct ptlrpc_bulk_desc *desc)
{
int i;
for (i = 0; i < desc->bd_iov_count ; i++)
put_page(BD_GET_KIOV(desc, i).bv_page);
}
void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
struct obd_import *imp); struct obd_import *imp);
__u64 ptlrpc_next_xid(void); __u64 ptlrpc_next_xid(void);
......
...@@ -859,8 +859,10 @@ static int mdc_getpage(struct obd_export *exp, const struct lu_fid *fid, ...@@ -859,8 +859,10 @@ static int mdc_getpage(struct obd_export *exp, const struct lu_fid *fid,
req->rq_request_portal = MDS_READPAGE_PORTAL; req->rq_request_portal = MDS_READPAGE_PORTAL;
ptlrpc_at_set_req_timeout(req); ptlrpc_at_set_req_timeout(req);
desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK, desc = ptlrpc_prep_bulk_imp(req, npages, 1,
MDS_BULK_PORTAL); PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV,
MDS_BULK_PORTAL,
&ptlrpc_bulk_kiov_pin_ops);
if (!desc) { if (!desc) {
ptlrpc_request_free(req); ptlrpc_request_free(req);
return -ENOMEM; return -ENOMEM;
...@@ -868,7 +870,7 @@ static int mdc_getpage(struct obd_export *exp, const struct lu_fid *fid, ...@@ -868,7 +870,7 @@ static int mdc_getpage(struct obd_export *exp, const struct lu_fid *fid,
/* NB req now owns desc and will free it when it gets freed */ /* NB req now owns desc and will free it when it gets freed */
for (i = 0; i < npages; i++) for (i = 0; i < npages; i++)
ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_SIZE); desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0, PAGE_SIZE);
mdc_readdir_pack(req, offset, PAGE_SIZE * npages, fid); mdc_readdir_pack(req, offset, PAGE_SIZE * npages, fid);
......
...@@ -1386,15 +1386,17 @@ static int mgc_process_recover_log(struct obd_device *obd, ...@@ -1386,15 +1386,17 @@ static int mgc_process_recover_log(struct obd_device *obd,
body->mcb_units = nrpages; body->mcb_units = nrpages;
/* allocate bulk transfer descriptor */ /* allocate bulk transfer descriptor */
desc = ptlrpc_prep_bulk_imp(req, nrpages, 1, BULK_PUT_SINK, desc = ptlrpc_prep_bulk_imp(req, nrpages, 1,
MGS_BULK_PORTAL); PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV,
MGS_BULK_PORTAL,
&ptlrpc_bulk_kiov_pin_ops);
if (!desc) { if (!desc) {
rc = -ENOMEM; rc = -ENOMEM;
goto out; goto out;
} }
for (i = 0; i < nrpages; i++) for (i = 0; i < nrpages; i++)
ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_SIZE); desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0, PAGE_SIZE);
ptlrpc_request_set_replen(req); ptlrpc_request_set_replen(req);
rc = ptlrpc_queue_wait(req); rc = ptlrpc_queue_wait(req);
......
...@@ -776,8 +776,10 @@ static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc, ...@@ -776,8 +776,10 @@ static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc,
int count = 0; int count = 0;
int i; int i;
LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
for (i = 0; i < page_count; i++) { for (i = 0; i < page_count; i++) {
pg_data_t *pgdat = page_pgdat(desc->bd_iov[i].bv_page); pg_data_t *pgdat = page_pgdat(BD_GET_KIOV(desc, i).bv_page);
if (likely(pgdat == last)) { if (likely(pgdat == last)) {
++count; ++count;
......
...@@ -1030,8 +1030,9 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli, ...@@ -1030,8 +1030,9 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,
desc = ptlrpc_prep_bulk_imp(req, page_count, desc = ptlrpc_prep_bulk_imp(req, page_count,
cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS, cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK, (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE :
OST_BULK_PORTAL); PTLRPC_BULK_PUT_SINK) | PTLRPC_BULK_BUF_KIOV, OST_BULK_PORTAL,
&ptlrpc_bulk_kiov_pin_ops);
if (!desc) { if (!desc) {
rc = -ENOMEM; rc = -ENOMEM;
...@@ -1079,7 +1080,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli, ...@@ -1079,7 +1080,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,
LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
(pg->flag & OBD_BRW_SRVLOCK)); (pg->flag & OBD_BRW_SRVLOCK));
ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count); desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count);
requested_nob += pg->count; requested_nob += pg->count;
if (i > 0 && can_merge_pages(pg_prev, pg)) { if (i > 0 && can_merge_pages(pg_prev, pg)) {
......
...@@ -43,6 +43,18 @@ ...@@ -43,6 +43,18 @@
#include "ptlrpc_internal.h" #include "ptlrpc_internal.h"
const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops = {
.add_kiov_frag = ptlrpc_prep_bulk_page_pin,
.release_frags = ptlrpc_release_bulk_page_pin,
};
EXPORT_SYMBOL(ptlrpc_bulk_kiov_pin_ops);
const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops = {
.add_kiov_frag = ptlrpc_prep_bulk_page_nopin,
.release_frags = NULL,
};
EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops);
static int ptlrpc_send_new_req(struct ptlrpc_request *req); static int ptlrpc_send_new_req(struct ptlrpc_request *req);
static int ptlrpcd_check_work(struct ptlrpc_request *req); static int ptlrpcd_check_work(struct ptlrpc_request *req);
static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async); static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async);
...@@ -95,24 +107,43 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid) ...@@ -95,24 +107,43 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
* Allocate and initialize new bulk descriptor on the sender. * Allocate and initialize new bulk descriptor on the sender.
* Returns pointer to the descriptor or NULL on error. * Returns pointer to the descriptor or NULL on error.
*/ */
struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw, struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags,
unsigned type, unsigned portal) unsigned int max_brw,
enum ptlrpc_bulk_op_type type,
unsigned int portal,
const struct ptlrpc_bulk_frag_ops *ops)
{ {
struct ptlrpc_bulk_desc *desc; struct ptlrpc_bulk_desc *desc;
int i; int i;
desc = kzalloc(offsetof(struct ptlrpc_bulk_desc, bd_iov[npages]), /* ensure that only one of KIOV or IOVEC is set but not both */
GFP_NOFS); LASSERT((ptlrpc_is_bulk_desc_kiov(type) && ops->add_kiov_frag) ||
(ptlrpc_is_bulk_desc_kvec(type) && ops->add_iov_frag));
desc = kzalloc(sizeof(*desc), GFP_NOFS);
if (!desc) if (!desc)
return NULL; return NULL;
if (type & PTLRPC_BULK_BUF_KIOV) {
GET_KIOV(desc) = kcalloc(nfrags, sizeof(*GET_KIOV(desc)),
GFP_NOFS);
if (!GET_KIOV(desc))
goto out;
} else {
GET_KVEC(desc) = kcalloc(nfrags, sizeof(*GET_KVEC(desc)),
GFP_NOFS);
if (!GET_KVEC(desc))
goto out;
}
spin_lock_init(&desc->bd_lock); spin_lock_init(&desc->bd_lock);
init_waitqueue_head(&desc->bd_waitq); init_waitqueue_head(&desc->bd_waitq);
desc->bd_max_iov = npages; desc->bd_max_iov = nfrags;
desc->bd_iov_count = 0; desc->bd_iov_count = 0;
desc->bd_portal = portal; desc->bd_portal = portal;
desc->bd_type = type; desc->bd_type = type;
desc->bd_md_count = 0; desc->bd_md_count = 0;
desc->bd_frag_ops = (struct ptlrpc_bulk_frag_ops *)ops;
LASSERT(max_brw > 0); LASSERT(max_brw > 0);
desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT); desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
/* /*
...@@ -123,24 +154,30 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw, ...@@ -123,24 +154,30 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
LNetInvalidateHandle(&desc->bd_mds[i]); LNetInvalidateHandle(&desc->bd_mds[i]);
return desc; return desc;
out:
return NULL;
} }
/** /**
* Prepare bulk descriptor for specified outgoing request \a req that * Prepare bulk descriptor for specified outgoing request \a req that
* can fit \a npages * pages. \a type is bulk type. \a portal is where * can fit \a nfrags * pages. \a type is bulk type. \a portal is where
* the bulk to be sent. Used on client-side. * the bulk to be sent. Used on client-side.
* Returns pointer to newly allocated initialized bulk descriptor or NULL on * Returns pointer to newly allocated initialized bulk descriptor or NULL on
* error. * error.
*/ */
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req, struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
unsigned npages, unsigned max_brw, unsigned int nfrags,
unsigned type, unsigned portal) unsigned int max_brw,
unsigned int type,
unsigned int portal,
const struct ptlrpc_bulk_frag_ops *ops)
{ {
struct obd_import *imp = req->rq_import; struct obd_import *imp = req->rq_import;
struct ptlrpc_bulk_desc *desc; struct ptlrpc_bulk_desc *desc;
LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE); LASSERT(ptlrpc_is_bulk_op_passive(type));
desc = ptlrpc_new_bulk(npages, max_brw, type, portal);
desc = ptlrpc_new_bulk(nfrags, max_brw, type, portal, ops);
if (!desc) if (!desc)
return NULL; return NULL;
...@@ -158,56 +195,82 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req, ...@@ -158,56 +195,82 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
} }
EXPORT_SYMBOL(ptlrpc_prep_bulk_imp); EXPORT_SYMBOL(ptlrpc_prep_bulk_imp);
/**
* Add a page \a page to the bulk descriptor \a desc.
* Data to transfer in the page starts at offset \a pageoffset and
* amount of data to transfer from the page is \a len
*/
void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc, void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
struct page *page, int pageoffset, int len, int pin) struct page *page, int pageoffset, int len, int pin)
{ {
struct bio_vec *kiov;
LASSERT(desc->bd_iov_count < desc->bd_max_iov); LASSERT(desc->bd_iov_count < desc->bd_max_iov);
LASSERT(page); LASSERT(page);
LASSERT(pageoffset >= 0); LASSERT(pageoffset >= 0);
LASSERT(len > 0); LASSERT(len > 0);
LASSERT(pageoffset + len <= PAGE_SIZE); LASSERT(pageoffset + len <= PAGE_SIZE);
LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
kiov = &BD_GET_KIOV(desc, desc->bd_iov_count);
desc->bd_nob += len; desc->bd_nob += len;
if (pin) if (pin)
get_page(page); get_page(page);
ptlrpc_add_bulk_page(desc, page, pageoffset, len); kiov->bv_page = page;
kiov->bv_offset = pageoffset;
kiov->bv_len = len;
desc->bd_iov_count++;
} }
EXPORT_SYMBOL(__ptlrpc_prep_bulk_page); EXPORT_SYMBOL(__ptlrpc_prep_bulk_page);
/** int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc,
* Uninitialize and free bulk descriptor \a desc. void *frag, int len)
* Works on bulk descriptors both from server and client side.
*/
void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc, int unpin)
{ {
int i; struct kvec *iovec;
LASSERT(desc->bd_iov_count < desc->bd_max_iov);
LASSERT(frag);
LASSERT(len > 0);
LASSERT(ptlrpc_is_bulk_desc_kvec(desc->bd_type));
iovec = &BD_GET_KVEC(desc, desc->bd_iov_count);
desc->bd_nob += len;
iovec->iov_base = frag;
iovec->iov_len = len;
desc->bd_iov_count++;
return desc->bd_nob;
}
EXPORT_SYMBOL(ptlrpc_prep_bulk_frag);
void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
{
LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */ LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
LASSERT(desc->bd_md_count == 0); /* network hands off */ LASSERT(desc->bd_md_count == 0); /* network hands off */
LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL)); LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
LASSERT(desc->bd_frag_ops);
sptlrpc_enc_pool_put_pages(desc); if (ptlrpc_is_bulk_desc_kiov(desc->bd_type))
sptlrpc_enc_pool_put_pages(desc);
if (desc->bd_export) if (desc->bd_export)
class_export_put(desc->bd_export); class_export_put(desc->bd_export);
else else
class_import_put(desc->bd_import); class_import_put(desc->bd_import);
if (unpin) { if (desc->bd_frag_ops->release_frags)
for (i = 0; i < desc->bd_iov_count; i++) desc->bd_frag_ops->release_frags(desc);
put_page(desc->bd_iov[i].bv_page);
} if (ptlrpc_is_bulk_desc_kiov(desc->bd_type))
kfree(GET_KIOV(desc));
else
kfree(GET_KVEC(desc));
kfree(desc); kfree(desc);
} }
EXPORT_SYMBOL(__ptlrpc_free_bulk); EXPORT_SYMBOL(ptlrpc_free_bulk);
/** /**
* Set server timelimit for this req, i.e. how long are we willing to wait * Set server timelimit for this req, i.e. how long are we willing to wait
...@@ -2266,7 +2329,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) ...@@ -2266,7 +2329,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
request->rq_import = NULL; request->rq_import = NULL;
} }
if (request->rq_bulk) if (request->rq_bulk)
ptlrpc_free_bulk_pin(request->rq_bulk); ptlrpc_free_bulk(request->rq_bulk);
if (request->rq_reqbuf || request->rq_clrbuf) if (request->rq_reqbuf || request->rq_clrbuf)
sptlrpc_cli_free_reqbuf(request); sptlrpc_cli_free_reqbuf(request);
......
...@@ -182,9 +182,9 @@ void client_bulk_callback(lnet_event_t *ev) ...@@ -182,9 +182,9 @@ void client_bulk_callback(lnet_event_t *ev)
struct ptlrpc_bulk_desc *desc = cbid->cbid_arg; struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
struct ptlrpc_request *req; struct ptlrpc_request *req;
LASSERT((desc->bd_type == BULK_PUT_SINK && LASSERT((ptlrpc_is_bulk_put_sink(desc->bd_type) &&
ev->type == LNET_EVENT_PUT) || ev->type == LNET_EVENT_PUT) ||
(desc->bd_type == BULK_GET_SOURCE && (ptlrpc_is_bulk_get_source(desc->bd_type) &&
ev->type == LNET_EVENT_GET) || ev->type == LNET_EVENT_GET) ||
ev->type == LNET_EVENT_UNLINK); ev->type == LNET_EVENT_UNLINK);
LASSERT(ev->unlinked); LASSERT(ev->unlinked);
......
...@@ -127,8 +127,7 @@ static int ptlrpc_register_bulk(struct ptlrpc_request *req) ...@@ -127,8 +127,7 @@ static int ptlrpc_register_bulk(struct ptlrpc_request *req)
LASSERT(desc->bd_md_max_brw <= PTLRPC_BULK_OPS_COUNT); LASSERT(desc->bd_md_max_brw <= PTLRPC_BULK_OPS_COUNT);
LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES); LASSERT(desc->bd_iov_count <= PTLRPC_MAX_BRW_PAGES);
LASSERT(desc->bd_req); LASSERT(desc->bd_req);
LASSERT(desc->bd_type == BULK_PUT_SINK || LASSERT(ptlrpc_is_bulk_op_passive(desc->bd_type));
desc->bd_type == BULK_GET_SOURCE);
/* cleanup the state of the bulk for it will be reused */ /* cleanup the state of the bulk for it will be reused */
if (req->rq_resend || req->rq_send_state == LUSTRE_IMP_REPLAY) if (req->rq_resend || req->rq_send_state == LUSTRE_IMP_REPLAY)
...@@ -168,7 +167,7 @@ static int ptlrpc_register_bulk(struct ptlrpc_request *req) ...@@ -168,7 +167,7 @@ static int ptlrpc_register_bulk(struct ptlrpc_request *req)
for (posted_md = 0; posted_md < total_md; posted_md++, xid++) { for (posted_md = 0; posted_md < total_md; posted_md++, xid++) {
md.options = PTLRPC_MD_OPTIONS | md.options = PTLRPC_MD_OPTIONS |
((desc->bd_type == BULK_GET_SOURCE) ? (ptlrpc_is_bulk_op_get(desc->bd_type) ?
LNET_MD_OP_GET : LNET_MD_OP_PUT); LNET_MD_OP_GET : LNET_MD_OP_PUT);
ptlrpc_fill_bulk_md(&md, desc, posted_md); ptlrpc_fill_bulk_md(&md, desc, posted_md);
...@@ -223,7 +222,7 @@ static int ptlrpc_register_bulk(struct ptlrpc_request *req) ...@@ -223,7 +222,7 @@ static int ptlrpc_register_bulk(struct ptlrpc_request *req)
CDEBUG(D_NET, "Setup %u bulk %s buffers: %u pages %u bytes, xid x%#llx-%#llx, portal %u\n", CDEBUG(D_NET, "Setup %u bulk %s buffers: %u pages %u bytes, xid x%#llx-%#llx, portal %u\n",
desc->bd_md_count, desc->bd_md_count,
desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink", ptlrpc_is_bulk_op_get(desc->bd_type) ? "get-source" : "put-sink",
desc->bd_iov_count, desc->bd_nob, desc->bd_iov_count, desc->bd_nob,
desc->bd_last_xid, req->rq_xid, desc->bd_portal); desc->bd_last_xid, req->rq_xid, desc->bd_portal);
......
...@@ -43,6 +43,8 @@ ...@@ -43,6 +43,8 @@
void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc, void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc,
int mdidx) int mdidx)
{ {
int offset = mdidx * LNET_MAX_IOV;
CLASSERT(PTLRPC_MAX_BRW_PAGES < LI_POISON); CLASSERT(PTLRPC_MAX_BRW_PAGES < LI_POISON);
LASSERT(mdidx < desc->bd_md_max_brw); LASSERT(mdidx < desc->bd_md_max_brw);
...@@ -50,23 +52,20 @@ void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc, ...@@ -50,23 +52,20 @@ void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc,
LASSERT(!(md->options & (LNET_MD_IOVEC | LNET_MD_KIOV | LASSERT(!(md->options & (LNET_MD_IOVEC | LNET_MD_KIOV |
LNET_MD_PHYS))); LNET_MD_PHYS)));
md->options |= LNET_MD_KIOV;
md->length = max(0, desc->bd_iov_count - mdidx * LNET_MAX_IOV); md->length = max(0, desc->bd_iov_count - mdidx * LNET_MAX_IOV);
md->length = min_t(unsigned int, LNET_MAX_IOV, md->length); md->length = min_t(unsigned int, LNET_MAX_IOV, md->length);
if (desc->bd_enc_iov)
md->start = &desc->bd_enc_iov[mdidx * LNET_MAX_IOV];
else
md->start = &desc->bd_iov[mdidx * LNET_MAX_IOV];
}
void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, struct page *page,
int pageoffset, int len)
{
lnet_kiov_t *kiov = &desc->bd_iov[desc->bd_iov_count];
kiov->bv_page = page;
kiov->bv_offset = pageoffset;
kiov->bv_len = len;
desc->bd_iov_count++; if (ptlrpc_is_bulk_desc_kiov(desc->bd_type)) {
md->options |= LNET_MD_KIOV;
if (GET_ENC_KIOV(desc))
md->start = &BD_GET_ENC_KIOV(desc, offset);
else
md->start = &BD_GET_KIOV(desc, offset);
} else {
md->options |= LNET_MD_IOVEC;
if (GET_ENC_KVEC(desc))
md->start = &BD_GET_ENC_KVEC(desc, offset);
else
md->start = &BD_GET_KVEC(desc, offset);
}
} }
...@@ -55,8 +55,11 @@ int ptlrpcd_start(struct ptlrpcd_ctl *pc); ...@@ -55,8 +55,11 @@ int ptlrpcd_start(struct ptlrpcd_ctl *pc);
/* client.c */ /* client.c */
void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req, void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
unsigned int service_time); unsigned int service_time);
struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw, struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags,
unsigned type, unsigned portal); unsigned int max_brw,
enum ptlrpc_bulk_op_type type,
unsigned int portal,
const struct ptlrpc_bulk_frag_ops *ops);
int ptlrpc_request_cache_init(void); int ptlrpc_request_cache_init(void);
void ptlrpc_request_cache_fini(void); void ptlrpc_request_cache_fini(void);
struct ptlrpc_request *ptlrpc_request_cache_alloc(gfp_t flags); struct ptlrpc_request *ptlrpc_request_cache_alloc(gfp_t flags);
...@@ -226,8 +229,6 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink); ...@@ -226,8 +229,6 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink);
/* pers.c */ /* pers.c */
void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc, void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc,
int mdcnt); int mdcnt);
void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, struct page *page,
int pageoffset, int len);
/* pack_generic.c */ /* pack_generic.c */
struct ptlrpc_reply_state * struct ptlrpc_reply_state *
......
...@@ -311,7 +311,9 @@ void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc) ...@@ -311,7 +311,9 @@ void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
int p_idx, g_idx; int p_idx, g_idx;
int i; int i;
if (!desc->bd_enc_iov) LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
if (!GET_ENC_KIOV(desc))
return; return;
LASSERT(desc->bd_iov_count > 0); LASSERT(desc->bd_iov_count > 0);
...@@ -326,12 +328,12 @@ void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc) ...@@ -326,12 +328,12 @@ void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
LASSERT(page_pools.epp_pools[p_idx]); LASSERT(page_pools.epp_pools[p_idx]);
for (i = 0; i < desc->bd_iov_count; i++) { for (i = 0; i < desc->bd_iov_count; i++) {
LASSERT(desc->bd_enc_iov[i].bv_page); LASSERT(BD_GET_ENC_KIOV(desc, i).bv_page);
LASSERT(g_idx != 0 || page_pools.epp_pools[p_idx]); LASSERT(g_idx != 0 || page_pools.epp_pools[p_idx]);
LASSERT(!page_pools.epp_pools[p_idx][g_idx]); LASSERT(!page_pools.epp_pools[p_idx][g_idx]);
page_pools.epp_pools[p_idx][g_idx] = page_pools.epp_pools[p_idx][g_idx] =
desc->bd_enc_iov[i].bv_page; BD_GET_ENC_KIOV(desc, i).bv_page;
if (++g_idx == PAGES_PER_POOL) { if (++g_idx == PAGES_PER_POOL) {
p_idx++; p_idx++;
...@@ -345,8 +347,8 @@ void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc) ...@@ -345,8 +347,8 @@ void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
spin_unlock(&page_pools.epp_lock); spin_unlock(&page_pools.epp_lock);
kfree(desc->bd_enc_iov); kfree(GET_ENC_KIOV(desc));
desc->bd_enc_iov = NULL; GET_ENC_KIOV(desc) = NULL;
} }
static inline void enc_pools_alloc(void) static inline void enc_pools_alloc(void)
...@@ -520,10 +522,11 @@ int sptlrpc_get_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u8 alg, ...@@ -520,10 +522,11 @@ int sptlrpc_get_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u8 alg,
hashsize = cfs_crypto_hash_digestsize(cfs_hash_alg_id[alg]); hashsize = cfs_crypto_hash_digestsize(cfs_hash_alg_id[alg]);
for (i = 0; i < desc->bd_iov_count; i++) { for (i = 0; i < desc->bd_iov_count; i++) {
cfs_crypto_hash_update_page(hdesc, desc->bd_iov[i].bv_page, cfs_crypto_hash_update_page(hdesc,
desc->bd_iov[i].bv_offset & BD_GET_KIOV(desc, i).bv_page,
BD_GET_KIOV(desc, i).bv_offset &
~PAGE_MASK, ~PAGE_MASK,
desc->bd_iov[i].bv_len); BD_GET_KIOV(desc, i).bv_len);
} }
if (hashsize > buflen) { if (hashsize > buflen) {
......
...@@ -153,14 +153,16 @@ static void corrupt_bulk_data(struct ptlrpc_bulk_desc *desc) ...@@ -153,14 +153,16 @@ static void corrupt_bulk_data(struct ptlrpc_bulk_desc *desc)
char *ptr; char *ptr;
unsigned int off, i; unsigned int off, i;
LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
for (i = 0; i < desc->bd_iov_count; i++) { for (i = 0; i < desc->bd_iov_count; i++) {
if (desc->bd_iov[i].bv_len == 0) if (!BD_GET_KIOV(desc, i).bv_len)
continue; continue;
ptr = kmap(desc->bd_iov[i].bv_page); ptr = kmap(BD_GET_KIOV(desc, i).bv_page);
off = desc->bd_iov[i].bv_offset & ~PAGE_MASK; off = BD_GET_KIOV(desc, i).bv_offset & ~PAGE_MASK;
ptr[off] ^= 0x1; ptr[off] ^= 0x1;
kunmap(desc->bd_iov[i].bv_page); kunmap(BD_GET_KIOV(desc, i).bv_page);
return; return;
} }
} }
...@@ -352,11 +354,11 @@ int plain_cli_unwrap_bulk(struct ptlrpc_cli_ctx *ctx, ...@@ -352,11 +354,11 @@ int plain_cli_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
/* fix the actual data size */ /* fix the actual data size */
for (i = 0, nob = 0; i < desc->bd_iov_count; i++) { for (i = 0, nob = 0; i < desc->bd_iov_count; i++) {
if (desc->bd_iov[i].bv_len + nob > desc->bd_nob_transferred) { struct bio_vec bv_desc = BD_GET_KIOV(desc, i);
desc->bd_iov[i].bv_len =
desc->bd_nob_transferred - nob; if (bv_desc.bv_len + nob > desc->bd_nob_transferred)
} bv_desc.bv_len = desc->bd_nob_transferred - nob;
nob += desc->bd_iov[i].bv_len; nob += bv_desc.bv_len;
} }
rc = plain_verify_bulk_csum(desc, req->rq_flvr.u_bulk.hash.hash_alg, rc = plain_verify_bulk_csum(desc, req->rq_flvr.u_bulk.hash.hash_alg,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment