Commit e93a3082 authored by Andrew Perepechko's avatar Andrew Perepechko Committed by Greg Kroah-Hartman

lustre/xattr: separate ACL and XATTR caches

This patch separates ACL and XATTR caches, so that
when updating an ACL only LOOKUP lock is needed and
when updating another XATTR only XATTR lock is needed.

This patch also reverts XATTR cache support for setxattr
because client performing REINT under even PR lock
will deadlock if an active server operation (like unlink)
attempts to cancel all locks, and setxattr has to wait
for it (MDC max-in-flight is 1).

This patch disables the r/o cache if the data is
unreasonably large (larger than maximum single EA
size).
Signed-off-by: default avatarAndrew Perepechko <andrew_perepechko@xyratex.com>
Signed-off-by: default avatarNathaniel Clark <nathaniel.l.clark@intel.com>
Reviewed-on: http://review.whamcloud.com/7208
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-3669Reviewed-by: default avatarAndreas Dilger <andreas.dilger@intel.com>
Reviewed-by: default avatarJohn L. Hammond <john.hammond@intel.com>
Signed-off-by: default avatarOleg Drokin <oleg.drokin@intel.com>
Signed-off-by: default avatarGreg Kroah-Hartman <gregkh@linuxfoundation.org>
parent 08a78a27
...@@ -1747,7 +1747,6 @@ static inline __u32 lov_mds_md_size(__u16 stripes, __u32 lmm_magic) ...@@ -1747,7 +1747,6 @@ static inline __u32 lov_mds_md_size(__u16 stripes, __u32 lmm_magic)
OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLNLINK | \ OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLNLINK | \
OBD_MD_FLGENER | OBD_MD_FLRDEV | OBD_MD_FLGROUP) OBD_MD_FLGENER | OBD_MD_FLRDEV | OBD_MD_FLGROUP)
#define OBD_MD_FLXATTRLOCKED OBD_MD_FLGETATTRLOCK
#define OBD_MD_FLXATTRALL (OBD_MD_FLXATTR | OBD_MD_FLXATTRLS) #define OBD_MD_FLXATTRALL (OBD_MD_FLXATTR | OBD_MD_FLXATTRLS)
/* don't forget obdo_fid which is way down at the bottom so it can /* don't forget obdo_fid which is way down at the bottom so it can
......
...@@ -145,8 +145,6 @@ char *ldlm_it2str(int it) ...@@ -145,8 +145,6 @@ char *ldlm_it2str(int it)
return "getxattr"; return "getxattr";
case IT_LAYOUT: case IT_LAYOUT:
return "layout"; return "layout";
case IT_SETXATTR:
return "setxattr";
default: default:
CERROR("Unknown intent %d\n", it); CERROR("Unknown intent %d\n", it);
return "UNKNOWN"; return "UNKNOWN";
......
...@@ -296,13 +296,6 @@ int ll_xattr_cache_get(struct inode *inode, ...@@ -296,13 +296,6 @@ int ll_xattr_cache_get(struct inode *inode,
size_t size, size_t size,
__u64 valid); __u64 valid);
int ll_xattr_cache_update(struct inode *inode,
const char *name,
const char *newval,
size_t size,
__u64 valid,
int flags);
/* /*
* Locking to guarantee consistency of non-atomic updates to long long i_size, * Locking to guarantee consistency of non-atomic updates to long long i_size,
* consistency between file size and KMS. * consistency between file size and KMS.
......
...@@ -183,17 +183,11 @@ int ll_setxattr_common(struct inode *inode, const char *name, ...@@ -183,17 +183,11 @@ int ll_setxattr_common(struct inode *inode, const char *name,
valid |= rce_ops2valid(rce->rce_ops); valid |= rce_ops2valid(rce->rce_ops);
} }
#endif #endif
if (sbi->ll_xattr_cache_enabled &&
(rce == NULL || rce->rce_ops == RMT_LSETFACL)) {
rc = ll_xattr_cache_update(inode, name, pv, size, valid, flags);
} else {
oc = ll_mdscapa_get(inode); oc = ll_mdscapa_get(inode);
rc = md_setxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, rc = md_setxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
valid, name, pv, size, 0, flags, valid, name, pv, size, 0, flags,
ll_i2suppgid(inode), &req); ll_i2suppgid(inode), &req);
capa_put(oc); capa_put(oc);
}
#ifdef CONFIG_FS_POSIX_ACL #ifdef CONFIG_FS_POSIX_ACL
if (new_value != NULL) if (new_value != NULL)
lustre_posix_acl_xattr_free(new_value, size); lustre_posix_acl_xattr_free(new_value, size);
...@@ -292,6 +286,7 @@ int ll_getxattr_common(struct inode *inode, const char *name, ...@@ -292,6 +286,7 @@ int ll_getxattr_common(struct inode *inode, const char *name,
void *xdata; void *xdata;
struct obd_capa *oc; struct obd_capa *oc;
struct rmtacl_ctl_entry *rce = NULL; struct rmtacl_ctl_entry *rce = NULL;
struct ll_inode_info *lli = ll_i2info(inode);
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n",
inode->i_ino, inode->i_generation, inode); inode->i_ino, inode->i_generation, inode);
...@@ -339,7 +334,7 @@ int ll_getxattr_common(struct inode *inode, const char *name, ...@@ -339,7 +334,7 @@ int ll_getxattr_common(struct inode *inode, const char *name,
*/ */
if (xattr_type == XATTR_ACL_ACCESS_T && if (xattr_type == XATTR_ACL_ACCESS_T &&
!(sbi->ll_flags & LL_SBI_RMT_CLIENT)) { !(sbi->ll_flags & LL_SBI_RMT_CLIENT)) {
struct ll_inode_info *lli = ll_i2info(inode);
struct posix_acl *acl; struct posix_acl *acl;
spin_lock(&lli->lli_lock); spin_lock(&lli->lli_lock);
...@@ -358,13 +353,27 @@ int ll_getxattr_common(struct inode *inode, const char *name, ...@@ -358,13 +353,27 @@ int ll_getxattr_common(struct inode *inode, const char *name,
#endif #endif
do_getxattr: do_getxattr:
if (sbi->ll_xattr_cache_enabled && (rce == NULL || if (sbi->ll_xattr_cache_enabled && xattr_type != XATTR_ACL_ACCESS_T) {
rce->rce_ops == RMT_LGETFACL ||
rce->rce_ops == RMT_LSETFACL)) {
rc = ll_xattr_cache_get(inode, name, buffer, size, valid); rc = ll_xattr_cache_get(inode, name, buffer, size, valid);
if (rc == -EAGAIN)
goto getxattr_nocache;
if (rc < 0) if (rc < 0)
GOTO(out_xattr, rc); GOTO(out_xattr, rc);
/* Add "system.posix_acl_access" to the list */
if (lli->lli_posix_acl != NULL && valid & OBD_MD_FLXATTRLS) {
if (size == 0) {
rc += sizeof(XATTR_NAME_ACL_ACCESS);
} else if (size - rc >= sizeof(XATTR_NAME_ACL_ACCESS)) {
memcpy(buffer + rc, XATTR_NAME_ACL_ACCESS,
sizeof(XATTR_NAME_ACL_ACCESS));
rc += sizeof(XATTR_NAME_ACL_ACCESS);
} else {
GOTO(out_xattr, rc = -ERANGE);
}
}
} else { } else {
getxattr_nocache:
oc = ll_mdscapa_get(inode); oc = ll_mdscapa_get(inode);
rc = md_getxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc, rc = md_getxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
valid | (rce ? rce_ops2valid(rce->rce_ops) : 0), valid | (rce ? rce_ops2valid(rce->rce_ops) : 0),
......
...@@ -98,13 +98,13 @@ static int ll_xattr_cache_find(struct list_head *cache, ...@@ -98,13 +98,13 @@ static int ll_xattr_cache_find(struct list_head *cache,
} }
/** /**
* This adds or updates an xattr. * This adds an xattr.
* *
* Add @xattr_name attr with @xattr_val value and @xattr_val_len length, * Add @xattr_name attr with @xattr_val value and @xattr_val_len length,
* if the attribute already exists, then update its value.
* *
* \retval 0 success * \retval 0 success
* \retval -ENOMEM if no memory could be allocated for the cached attr * \retval -ENOMEM if no memory could be allocated for the cached attr
* \retval -EPROTO if duplicate xattr is being added
*/ */
static int ll_xattr_cache_add(struct list_head *cache, static int ll_xattr_cache_add(struct list_head *cache,
const char *xattr_name, const char *xattr_name,
...@@ -116,27 +116,8 @@ static int ll_xattr_cache_add(struct list_head *cache, ...@@ -116,27 +116,8 @@ static int ll_xattr_cache_add(struct list_head *cache,
if (ll_xattr_cache_find(cache, xattr_name, &xattr) == 0) { if (ll_xattr_cache_find(cache, xattr_name, &xattr) == 0) {
/* Found a cached EA, update it */ CDEBUG(D_CACHE, "duplicate xattr: [%s]\n", xattr_name);
return -EPROTO;
if (xattr_val_len != xattr->xe_vallen) {
char *val;
OBD_ALLOC(val, xattr_val_len);
if (val == NULL) {
CDEBUG(D_CACHE,
"failed to allocate %u bytes for xattr %s update\n",
xattr_val_len, xattr_name);
return -ENOMEM;
}
OBD_FREE(xattr->xe_value, xattr->xe_vallen);
xattr->xe_value = val;
xattr->xe_vallen = xattr_val_len;
}
memcpy(xattr->xe_value, xattr_val, xattr_val_len);
CDEBUG(D_CACHE, "update: [%s]=%.*s\n", xattr_name,
xattr_val_len, xattr_val);
return 0;
} }
OBD_SLAB_ALLOC_PTR_GFP(xattr, xattr_kmem, __GFP_IO); OBD_SLAB_ALLOC_PTR_GFP(xattr, xattr_kmem, __GFP_IO);
...@@ -292,7 +273,7 @@ int ll_xattr_cache_destroy(struct inode *inode) ...@@ -292,7 +273,7 @@ int ll_xattr_cache_destroy(struct inode *inode)
} }
/** /**
* Match or enqueue a PR or PW LDLM lock. * Match or enqueue a PR lock.
* *
* Find or request an LDLM lock with xattr data. * Find or request an LDLM lock with xattr data.
* Since LDLM does not provide API for atomic match_or_enqueue, * Since LDLM does not provide API for atomic match_or_enqueue,
...@@ -322,9 +303,7 @@ static int ll_xattr_find_get_lock(struct inode *inode, ...@@ -322,9 +303,7 @@ static int ll_xattr_find_get_lock(struct inode *inode,
mutex_lock(&lli->lli_xattrs_enq_lock); mutex_lock(&lli->lli_xattrs_enq_lock);
/* Try matching first. */ /* Try matching first. */
mode = ll_take_md_lock(inode, MDS_INODELOCK_XATTR, &lockh, 0, mode = ll_take_md_lock(inode, MDS_INODELOCK_XATTR, &lockh, 0, LCK_PR);
oit->it_op == IT_SETXATTR ? LCK_PW :
(LCK_PR | LCK_PW));
if (mode != 0) { if (mode != 0) {
/* fake oit in mdc_revalidate_lock() manner */ /* fake oit in mdc_revalidate_lock() manner */
oit->d.lustre.it_lock_handle = lockh.cookie; oit->d.lustre.it_lock_handle = lockh.cookie;
...@@ -340,13 +319,7 @@ static int ll_xattr_find_get_lock(struct inode *inode, ...@@ -340,13 +319,7 @@ static int ll_xattr_find_get_lock(struct inode *inode,
return PTR_ERR(op_data); return PTR_ERR(op_data);
} }
op_data->op_valid = OBD_MD_FLXATTR | OBD_MD_FLXATTRLS | op_data->op_valid = OBD_MD_FLXATTR | OBD_MD_FLXATTRLS;
OBD_MD_FLXATTRLOCKED;
#ifdef CONFIG_FS_POSIX_ACL
/* If working with ACLs, we would like to cache local ACLs */
if (sbi->ll_flags & LL_SBI_RMT_CLIENT)
op_data->op_valid |= OBD_MD_FLRMTLGETFACL;
#endif
rc = md_enqueue(exp, &einfo, oit, op_data, &lockh, NULL, 0, NULL, 0); rc = md_enqueue(exp, &einfo, oit, op_data, &lockh, NULL, 0, NULL, 0);
ll_finish_md_op_data(op_data); ll_finish_md_op_data(op_data);
...@@ -409,7 +382,11 @@ static int ll_xattr_cache_refill(struct inode *inode, struct lookup_intent *oit) ...@@ -409,7 +382,11 @@ static int ll_xattr_cache_refill(struct inode *inode, struct lookup_intent *oit)
if (oit->d.lustre.it_status < 0) { if (oit->d.lustre.it_status < 0) {
CDEBUG(D_CACHE, "getxattr intent returned %d for fid "DFID"\n", CDEBUG(D_CACHE, "getxattr intent returned %d for fid "DFID"\n",
oit->d.lustre.it_status, PFID(ll_inode2fid(inode))); oit->d.lustre.it_status, PFID(ll_inode2fid(inode)));
GOTO(out_destroy, rc = oit->d.lustre.it_status); rc = oit->d.lustre.it_status;
/* xattr data is so large that we don't want to cache it */
if (rc == -ERANGE)
rc = -EAGAIN;
GOTO(out_destroy, rc);
} }
body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
...@@ -447,6 +424,11 @@ static int ll_xattr_cache_refill(struct inode *inode, struct lookup_intent *oit) ...@@ -447,6 +424,11 @@ static int ll_xattr_cache_refill(struct inode *inode, struct lookup_intent *oit)
rc = -EPROTO; rc = -EPROTO;
} else if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_XATTR_ENOMEM)) { } else if (OBD_FAIL_CHECK(OBD_FAIL_LLITE_XATTR_ENOMEM)) {
rc = -ENOMEM; rc = -ENOMEM;
} else if (!strcmp(xdata, XATTR_NAME_ACL_ACCESS)) {
/* Filter out ACL ACCESS since it's cached separately */
CDEBUG(D_CACHE, "not caching %s\n",
XATTR_NAME_ACL_ACCESS);
rc = 0;
} else { } else {
rc = ll_xattr_cache_add(&lli->lli_xattrs, xdata, xval, rc = ll_xattr_cache_add(&lli->lli_xattrs, xdata, xval,
*xsizes); *xsizes);
...@@ -467,8 +449,7 @@ static int ll_xattr_cache_refill(struct inode *inode, struct lookup_intent *oit) ...@@ -467,8 +449,7 @@ static int ll_xattr_cache_refill(struct inode *inode, struct lookup_intent *oit)
GOTO(out_maybe_drop, rc); GOTO(out_maybe_drop, rc);
out_maybe_drop: out_maybe_drop:
/* drop lock on error or getxattr */
if (rc != 0 || oit->it_op != IT_SETXATTR)
ll_intent_drop_lock(oit); ll_intent_drop_lock(oit);
if (rc != 0) if (rc != 0)
...@@ -553,65 +534,3 @@ int ll_xattr_cache_get(struct inode *inode, ...@@ -553,65 +534,3 @@ int ll_xattr_cache_get(struct inode *inode,
return rc; return rc;
} }
/**
* Set/update an xattr value or remove xattr using the write-through cache.
*
* Set/update the xattr value (if @valid has OBD_MD_FLXATTR) of @name to @newval
* or
* remove the xattr @name (@valid has OBD_MD_FLXATTRRM set) from @inode.
* @flags is either XATTR_CREATE or XATTR_REPLACE as defined by setxattr(2)
*
* \retval 0 no error occured
* \retval -EPROTO network protocol error
* \retval -ENOMEM not enough memory for the cache
* \retval -ERANGE the buffer is not large enough
* \retval -ENODATA no such attr (in the removal case)
*/
int ll_xattr_cache_update(struct inode *inode,
const char *name,
const char *newval,
size_t size,
__u64 valid,
int flags)
{
struct lookup_intent oit = { .it_op = IT_SETXATTR };
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct ptlrpc_request *req = NULL;
struct ll_inode_info *lli = ll_i2info(inode);
struct obd_capa *oc;
int rc;
LASSERT(!!(valid & OBD_MD_FLXATTR) ^ !!(valid & OBD_MD_FLXATTRRM));
rc = ll_xattr_cache_refill(inode, &oit);
if (rc)
return rc;
oc = ll_mdscapa_get(inode);
rc = md_setxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
valid | OBD_MD_FLXATTRLOCKED, name, newval,
size, 0, flags, ll_i2suppgid(inode), &req);
capa_put(oc);
if (rc) {
ll_intent_drop_lock(&oit);
GOTO(out, rc);
}
if (valid & OBD_MD_FLXATTR)
rc = ll_xattr_cache_add(&lli->lli_xattrs, name, newval, size);
else if (valid & OBD_MD_FLXATTRRM)
rc = ll_xattr_cache_del(&lli->lli_xattrs, name);
ll_intent_drop_lock(&oit);
GOTO(out, rc);
out:
up_write(&lli->lli_xattrs_list_rwsem);
ptlrpc_req_finished(req);
return rc;
}
...@@ -101,7 +101,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, ...@@ -101,7 +101,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
struct lustre_handle *lockh, void *lmm, int lmmsize, struct lustre_handle *lockh, void *lmm, int lmmsize,
struct ptlrpc_request **req, __u64 extra_lock_flags); struct ptlrpc_request **req, __u64 extra_lock_flags);
int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid, int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid,
struct list_head *cancels, ldlm_mode_t mode, struct list_head *cancels, ldlm_mode_t mode,
__u64 bits); __u64 bits);
/* mdc/mdc_request.c */ /* mdc/mdc_request.c */
......
...@@ -378,13 +378,6 @@ mdc_intent_getxattr_pack(struct obd_export *exp, ...@@ -378,13 +378,6 @@ mdc_intent_getxattr_pack(struct obd_export *exp,
mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
if (it->it_op == IT_SETXATTR)
/* If we want to upgrade to LCK_PW, let's cancel LCK_PR
* locks now. This avoids unnecessary ASTs. */
count = mdc_resource_get_unused(exp, &op_data->op_fid1,
&cancels, LCK_PW,
MDS_INODELOCK_XATTR);
rc = ldlm_prep_enqueue_req(exp, req, &cancels, count); rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
if (rc) { if (rc) {
ptlrpc_request_free(req); ptlrpc_request_free(req);
...@@ -842,7 +835,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, ...@@ -842,7 +835,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
return -EOPNOTSUPP; return -EOPNOTSUPP;
req = mdc_intent_layout_pack(exp, it, op_data); req = mdc_intent_layout_pack(exp, it, op_data);
lvb_type = LVB_T_LAYOUT; lvb_type = LVB_T_LAYOUT;
} else if (it->it_op & (IT_GETXATTR | IT_SETXATTR)) { } else if (it->it_op & IT_GETXATTR) {
req = mdc_intent_getxattr_pack(exp, it, op_data); req = mdc_intent_getxattr_pack(exp, it, op_data);
} else { } else {
LBUG(); LBUG();
......
...@@ -66,7 +66,7 @@ static int mdc_reint(struct ptlrpc_request *request, ...@@ -66,7 +66,7 @@ static int mdc_reint(struct ptlrpc_request *request,
/* Find and cancel locally locks matched by inode @bits & @mode in the resource /* Find and cancel locally locks matched by inode @bits & @mode in the resource
* found by @fid. Found locks are added into @cancel list. Returns the amount of * found by @fid. Found locks are added into @cancel list. Returns the amount of
* locks added to @cancels list. */ * locks added to @cancels list. */
int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid, int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid,
struct list_head *cancels, ldlm_mode_t mode, struct list_head *cancels, ldlm_mode_t mode,
__u64 bits) __u64 bits)
{ {
......
...@@ -355,10 +355,32 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, ...@@ -355,10 +355,32 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
input_size); input_size);
} }
rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, opcode); /* Flush local XATTR locks to get rid of a possible cancel RPC */
if (rc) { if (opcode == MDS_REINT && fid_is_sane(fid) &&
ptlrpc_request_free(req); exp->exp_connect_data.ocd_ibits_known & MDS_INODELOCK_XATTR) {
return rc; LIST_HEAD(cancels);
int count;
/* Without that packing would fail */
if (input_size == 0)
req_capsule_set_size(&req->rq_pill, &RMF_EADATA,
RCL_CLIENT, 0);
count = mdc_resource_get_unused(exp, fid,
&cancels, LCK_EX,
MDS_INODELOCK_XATTR);
rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
if (rc) {
ptlrpc_request_free(req);
return rc;
}
} else {
rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, opcode);
if (rc) {
ptlrpc_request_free(req);
return rc;
}
} }
if (opcode == MDS_REINT) { if (opcode == MDS_REINT) {
......
...@@ -295,7 +295,8 @@ static const struct req_msg_field *mds_reint_setxattr_client[] = { ...@@ -295,7 +295,8 @@ static const struct req_msg_field *mds_reint_setxattr_client[] = {
&RMF_REC_REINT, &RMF_REC_REINT,
&RMF_CAPA1, &RMF_CAPA1,
&RMF_NAME, &RMF_NAME,
&RMF_EADATA &RMF_EADATA,
&RMF_DLM_REQ
}; };
static const struct req_msg_field *mdt_swap_layouts[] = { static const struct req_msg_field *mdt_swap_layouts[] = {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment